From a2dea897a8e0e8f79b71f1ac55c7a1d8a1eb0a5b Mon Sep 17 00:00:00 2001 From: Trey Dockendorf Date: Fri, 2 Oct 2020 16:24:03 -0400 Subject: [PATCH] Parallelize cgroup loads and process info collection --- cgroup_exporter.go | 129 +++++++++++++++++++++++----------------- cgroup_exporter_test.go | 40 ++++++++----- 2 files changed, 101 insertions(+), 68 deletions(-) diff --git a/cgroup_exporter.go b/cgroup_exporter.go index c8467b6..80b6783 100644 --- a/cgroup_exporter.go +++ b/cgroup_exporter.go @@ -24,6 +24,7 @@ import ( "regexp" "strconv" "strings" + "sync" "github.com/containerd/cgroups" "github.com/go-kit/kit/log" @@ -51,6 +52,7 @@ var ( procRoot = kingpin.Flag("path.proc.root", "Root path to proc fs").Default(defProcRoot).String() collectProc = kingpin.Flag("collect.proc", "Boolean that sets if to collect proc information").Default("false").Bool() collectProcMaxExec = kingpin.Flag("collect.proc.max-exec", "Max length of process executable to record").Default("100").Int() + metricLock = sync.RWMutex{} ) type CgroupMetric struct { @@ -220,24 +222,34 @@ func getProcInfo(pids []int, metric *CgroupMetric, logger log.Logger) { level.Error(logger).Log("msg", "Unable to open procfs", "path", *procRoot) return } + wg := &sync.WaitGroup{} + wg.Add(len(pids)) for _, pid := range pids { - proc, err := procFS.Proc(pid) - if err != nil { - level.Error(logger).Log("msg", "Unable to read PID", "pid", pid) - continue - } - executable, err := proc.Executable() - if err != nil { - level.Error(logger).Log("msg", "Unable to get executable for PID", "pid", pid) - continue - } - if len(executable) > *collectProcMaxExec { - level.Debug(logger).Log("msg", "Executable will be truncated", "executable", executable, "len", len(executable), "pid", pid) - executable = executable[len(executable)-*collectProcMaxExec:] - executable = fmt.Sprintf("...%s", executable) - } - executables[executable] += 1 + go func(p int) { + proc, err := procFS.Proc(p) + if err != nil { + level.Error(logger).Log("msg", "Unable to read PID", "pid", p) + wg.Done() + return + } + executable, err := proc.Executable() + if err != nil { + level.Error(logger).Log("msg", "Unable to get executable for PID", "pid", p) + wg.Done() + return + } + if len(executable) > *collectProcMaxExec { + level.Debug(logger).Log("msg", "Executable will be truncated", "executable", executable, "len", len(executable), "pid", p) + executable = executable[len(executable)-*collectProcMaxExec:] + executable = fmt.Sprintf("...%s", executable) + } + metricLock.Lock() + executables[executable] += 1 + metricLock.Unlock() + wg.Done() + }(pid) } + wg.Wait() metric.processExec = executables } @@ -305,6 +317,45 @@ func NewExporter(paths []string, logger log.Logger) *Exporter { } } +func (e *Exporter) getMetrics(name string, pids map[string][]int) (CgroupMetric, error) { + metric := CgroupMetric{name: name} + level.Debug(e.logger).Log("msg", "Loading cgroup", "path", name) + ctrl, err := cgroups.Load(subsystem, func(subsystem cgroups.Name) (string, error) { + return name, nil + }) + if err != nil { + level.Error(e.logger).Log("msg", "Failed to load cgroups", "path", name, "err", err) + metric.err = true + return metric, err + } + stats, _ := ctrl.Stat(cgroups.IgnoreNotExist) + metric.cpuUser = float64(stats.CPU.Usage.User) / 1000000000.0 + metric.cpuSystem = float64(stats.CPU.Usage.Kernel) / 1000000000.0 + metric.cpuTotal = float64(stats.CPU.Usage.Total) / 1000000000.0 + metric.memoryRSS = float64(stats.Memory.TotalRSS) + metric.memoryCache = float64(stats.Memory.TotalCache) + metric.memoryUsed = float64(stats.Memory.Usage.Usage) + metric.memoryTotal = float64(stats.Memory.Usage.Limit) + metric.memoryFailCount = float64(stats.Memory.Usage.Failcnt) + metric.memswUsed = float64(stats.Memory.Swap.Usage) + metric.memswTotal = float64(stats.Memory.Swap.Limit) + metric.memswFailCount = float64(stats.Memory.Swap.Failcnt) + if cpus, err := getCPUs(name, e.logger); err == nil { + metric.cpus = len(cpus) + metric.cpu_list = strings.Join(cpus, ",") + } + getInfo(name, &metric, e.logger) + if *collectProc { + if val, ok := pids[name]; ok { + level.Debug(e.logger).Log("msg", "Get process info", "pids", fmt.Sprintf("%v", val)) + getProcInfo(val, &metric, e.logger) + } else { + level.Error(e.logger).Log("msg", "Unable to get PIDs", "path", name) + } + } + return metric, nil +} + func (e *Exporter) collect() ([]CgroupMetric, error) { var names []string var metrics []CgroupMetric @@ -345,47 +396,19 @@ func (e *Exporter) collect() ([]CgroupMetric, error) { pids[name] = []int{p.Pid} } } + wg := &sync.WaitGroup{} + wg.Add(len(names)) for _, name := range names { - metric := CgroupMetric{name: name} - level.Debug(e.logger).Log("msg", "Loading cgroup", "path", name) - ctrl, err := cgroups.Load(subsystem, func(subsystem cgroups.Name) (string, error) { - return name, nil - }) - if err != nil { - level.Error(e.logger).Log("msg", "Failed to load cgroups", "path", name, "err", err) - metric.err = true + go func(n string, p map[string][]int) { + metric, _ := e.getMetrics(n, p) + metricLock.Lock() metrics = append(metrics, metric) - continue - } - stats, _ := ctrl.Stat(cgroups.IgnoreNotExist) - metric.cpuUser = float64(stats.CPU.Usage.User) / 1000000000.0 - metric.cpuSystem = float64(stats.CPU.Usage.Kernel) / 1000000000.0 - metric.cpuTotal = float64(stats.CPU.Usage.Total) / 1000000000.0 - metric.memoryRSS = float64(stats.Memory.TotalRSS) - metric.memoryCache = float64(stats.Memory.TotalCache) - metric.memoryUsed = float64(stats.Memory.Usage.Usage) - metric.memoryTotal = float64(stats.Memory.Usage.Limit) - metric.memoryFailCount = float64(stats.Memory.Usage.Failcnt) - metric.memswUsed = float64(stats.Memory.Swap.Usage) - metric.memswTotal = float64(stats.Memory.Swap.Limit) - metric.memswFailCount = float64(stats.Memory.Swap.Failcnt) - if cpus, err := getCPUs(name, e.logger); err == nil { - metric.cpus = len(cpus) - metric.cpu_list = strings.Join(cpus, ",") - } - getInfo(name, &metric, e.logger) - if *collectProc { - if val, ok := pids[name]; ok { - level.Debug(e.logger).Log("msg", "Get process info", "pids", fmt.Sprintf("%v", val)) - getProcInfo(val, &metric, e.logger) - } else { - level.Error(e.logger).Log("msg", "Unable to get PIDs", "path", name) - } - } - metrics = append(metrics, metric) + metricLock.Unlock() + wg.Done() + }(name, pids) } + wg.Wait() } - return metrics, nil } diff --git a/cgroup_exporter_test.go b/cgroup_exporter_test.go index a72854b..8bcbfc6 100644 --- a/cgroup_exporter_test.go +++ b/cgroup_exporter_test.go @@ -182,49 +182,59 @@ func TestCollectSLURM(t *testing.T) { t.Errorf("Unexpected number of metrics, got %d expected 2", val) return } - if val := metrics[0].cpuUser; val != 0 { + var m CgroupMetric + for _, metric := range metrics { + if metric.jobid == "10" { + m = metric + } + } + if m.jobid == "" { + t.Errorf("Metrics with jobid=10 not found") + return + } + if val := m.cpuUser; val != 0 { t.Errorf("Unexpected value for cpuUser, got %v", val) } - if val := metrics[0].cpuSystem; val != 0 { + if val := m.cpuSystem; val != 0 { t.Errorf("Unexpected value for cpuSystem, got %v", val) } - if val := metrics[0].cpuTotal; val != 0.007710215 { + if val := m.cpuTotal; val != 0.007710215 { t.Errorf("Unexpected value for cpuTotal, got %v", val) } - if val := metrics[0].cpus; val != 2 { + if val := m.cpus; val != 2 { t.Errorf("Unexpected value for cpus, got %v", val) } - if val := metrics[0].memoryRSS; val != 311296 { + if val := m.memoryRSS; val != 311296 { t.Errorf("Unexpected value for memoryRSS, got %v", val) } - if val := metrics[0].memoryCache; val != 4096 { + if val := m.memoryCache; val != 4096 { t.Errorf("Unexpected value for memoryCache, got %v", val) } - if val := metrics[0].memoryUsed; val != 356352 { + if val := m.memoryUsed; val != 356352 { t.Errorf("Unexpected value for memoryUsed, got %v", val) } - if val := metrics[0].memoryTotal; val != 2147483648 { + if val := m.memoryTotal; val != 2147483648 { t.Errorf("Unexpected value for memoryTotal, got %v", val) } - if val := metrics[0].memoryFailCount; val != 0 { + if val := m.memoryFailCount; val != 0 { t.Errorf("Unexpected value for memoryFailCount, got %v", val) } - if val := metrics[0].memswUsed; val != 356352 { + if val := m.memswUsed; val != 356352 { t.Errorf("Unexpected value for swapUsed, got %v", val) } - if val := metrics[0].memswTotal; val != 2147483648 { + if val := m.memswTotal; val != 2147483648 { t.Errorf("Unexpected value for swapTotal, got %v", val) } - if val := metrics[0].memswFailCount; val != 0 { + if val := m.memswFailCount; val != 0 { t.Errorf("Unexpected value for swapFailCount, got %v", val) } - if val := metrics[0].uid; val != "20821" { + if val := m.uid; val != "20821" { t.Errorf("Unexpected value for uid, got %v", val) } - if val := metrics[0].jobid; val != "10" { + if val := m.jobid; val != "10" { t.Errorf("Unexpected value for jobid, got %v", val) } - if val, ok := metrics[0].processExec["/bin/bash"]; !ok { + if val, ok := m.processExec["/bin/bash"]; !ok { t.Errorf("processExec does not contain /bin/bash") } else { if val != 2 {