Parallelize cgroup loads and process info collection

This commit is contained in:
Trey Dockendorf 2020-10-02 16:24:03 -04:00
parent 946ae521b8
commit a2dea897a8
2 changed files with 101 additions and 68 deletions

View File

@ -24,6 +24,7 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"github.com/containerd/cgroups"
"github.com/go-kit/kit/log"
@ -51,6 +52,7 @@ var (
procRoot = kingpin.Flag("path.proc.root", "Root path to proc fs").Default(defProcRoot).String()
collectProc = kingpin.Flag("collect.proc", "Boolean that sets if to collect proc information").Default("false").Bool()
collectProcMaxExec = kingpin.Flag("collect.proc.max-exec", "Max length of process executable to record").Default("100").Int()
metricLock = sync.RWMutex{}
)
type CgroupMetric struct {
@ -220,24 +222,34 @@ func getProcInfo(pids []int, metric *CgroupMetric, logger log.Logger) {
level.Error(logger).Log("msg", "Unable to open procfs", "path", *procRoot)
return
}
wg := &sync.WaitGroup{}
wg.Add(len(pids))
for _, pid := range pids {
proc, err := procFS.Proc(pid)
if err != nil {
level.Error(logger).Log("msg", "Unable to read PID", "pid", pid)
continue
}
executable, err := proc.Executable()
if err != nil {
level.Error(logger).Log("msg", "Unable to get executable for PID", "pid", pid)
continue
}
if len(executable) > *collectProcMaxExec {
level.Debug(logger).Log("msg", "Executable will be truncated", "executable", executable, "len", len(executable), "pid", pid)
executable = executable[len(executable)-*collectProcMaxExec:]
executable = fmt.Sprintf("...%s", executable)
}
executables[executable] += 1
go func(p int) {
proc, err := procFS.Proc(p)
if err != nil {
level.Error(logger).Log("msg", "Unable to read PID", "pid", p)
wg.Done()
return
}
executable, err := proc.Executable()
if err != nil {
level.Error(logger).Log("msg", "Unable to get executable for PID", "pid", p)
wg.Done()
return
}
if len(executable) > *collectProcMaxExec {
level.Debug(logger).Log("msg", "Executable will be truncated", "executable", executable, "len", len(executable), "pid", p)
executable = executable[len(executable)-*collectProcMaxExec:]
executable = fmt.Sprintf("...%s", executable)
}
metricLock.Lock()
executables[executable] += 1
metricLock.Unlock()
wg.Done()
}(pid)
}
wg.Wait()
metric.processExec = executables
}
@ -305,6 +317,45 @@ func NewExporter(paths []string, logger log.Logger) *Exporter {
}
}
func (e *Exporter) getMetrics(name string, pids map[string][]int) (CgroupMetric, error) {
metric := CgroupMetric{name: name}
level.Debug(e.logger).Log("msg", "Loading cgroup", "path", name)
ctrl, err := cgroups.Load(subsystem, func(subsystem cgroups.Name) (string, error) {
return name, nil
})
if err != nil {
level.Error(e.logger).Log("msg", "Failed to load cgroups", "path", name, "err", err)
metric.err = true
return metric, err
}
stats, _ := ctrl.Stat(cgroups.IgnoreNotExist)
metric.cpuUser = float64(stats.CPU.Usage.User) / 1000000000.0
metric.cpuSystem = float64(stats.CPU.Usage.Kernel) / 1000000000.0
metric.cpuTotal = float64(stats.CPU.Usage.Total) / 1000000000.0
metric.memoryRSS = float64(stats.Memory.TotalRSS)
metric.memoryCache = float64(stats.Memory.TotalCache)
metric.memoryUsed = float64(stats.Memory.Usage.Usage)
metric.memoryTotal = float64(stats.Memory.Usage.Limit)
metric.memoryFailCount = float64(stats.Memory.Usage.Failcnt)
metric.memswUsed = float64(stats.Memory.Swap.Usage)
metric.memswTotal = float64(stats.Memory.Swap.Limit)
metric.memswFailCount = float64(stats.Memory.Swap.Failcnt)
if cpus, err := getCPUs(name, e.logger); err == nil {
metric.cpus = len(cpus)
metric.cpu_list = strings.Join(cpus, ",")
}
getInfo(name, &metric, e.logger)
if *collectProc {
if val, ok := pids[name]; ok {
level.Debug(e.logger).Log("msg", "Get process info", "pids", fmt.Sprintf("%v", val))
getProcInfo(val, &metric, e.logger)
} else {
level.Error(e.logger).Log("msg", "Unable to get PIDs", "path", name)
}
}
return metric, nil
}
func (e *Exporter) collect() ([]CgroupMetric, error) {
var names []string
var metrics []CgroupMetric
@ -345,47 +396,19 @@ func (e *Exporter) collect() ([]CgroupMetric, error) {
pids[name] = []int{p.Pid}
}
}
wg := &sync.WaitGroup{}
wg.Add(len(names))
for _, name := range names {
metric := CgroupMetric{name: name}
level.Debug(e.logger).Log("msg", "Loading cgroup", "path", name)
ctrl, err := cgroups.Load(subsystem, func(subsystem cgroups.Name) (string, error) {
return name, nil
})
if err != nil {
level.Error(e.logger).Log("msg", "Failed to load cgroups", "path", name, "err", err)
metric.err = true
go func(n string, p map[string][]int) {
metric, _ := e.getMetrics(n, p)
metricLock.Lock()
metrics = append(metrics, metric)
continue
}
stats, _ := ctrl.Stat(cgroups.IgnoreNotExist)
metric.cpuUser = float64(stats.CPU.Usage.User) / 1000000000.0
metric.cpuSystem = float64(stats.CPU.Usage.Kernel) / 1000000000.0
metric.cpuTotal = float64(stats.CPU.Usage.Total) / 1000000000.0
metric.memoryRSS = float64(stats.Memory.TotalRSS)
metric.memoryCache = float64(stats.Memory.TotalCache)
metric.memoryUsed = float64(stats.Memory.Usage.Usage)
metric.memoryTotal = float64(stats.Memory.Usage.Limit)
metric.memoryFailCount = float64(stats.Memory.Usage.Failcnt)
metric.memswUsed = float64(stats.Memory.Swap.Usage)
metric.memswTotal = float64(stats.Memory.Swap.Limit)
metric.memswFailCount = float64(stats.Memory.Swap.Failcnt)
if cpus, err := getCPUs(name, e.logger); err == nil {
metric.cpus = len(cpus)
metric.cpu_list = strings.Join(cpus, ",")
}
getInfo(name, &metric, e.logger)
if *collectProc {
if val, ok := pids[name]; ok {
level.Debug(e.logger).Log("msg", "Get process info", "pids", fmt.Sprintf("%v", val))
getProcInfo(val, &metric, e.logger)
} else {
level.Error(e.logger).Log("msg", "Unable to get PIDs", "path", name)
}
}
metrics = append(metrics, metric)
metricLock.Unlock()
wg.Done()
}(name, pids)
}
wg.Wait()
}
return metrics, nil
}

View File

@ -182,49 +182,59 @@ func TestCollectSLURM(t *testing.T) {
t.Errorf("Unexpected number of metrics, got %d expected 2", val)
return
}
if val := metrics[0].cpuUser; val != 0 {
var m CgroupMetric
for _, metric := range metrics {
if metric.jobid == "10" {
m = metric
}
}
if m.jobid == "" {
t.Errorf("Metrics with jobid=10 not found")
return
}
if val := m.cpuUser; val != 0 {
t.Errorf("Unexpected value for cpuUser, got %v", val)
}
if val := metrics[0].cpuSystem; val != 0 {
if val := m.cpuSystem; val != 0 {
t.Errorf("Unexpected value for cpuSystem, got %v", val)
}
if val := metrics[0].cpuTotal; val != 0.007710215 {
if val := m.cpuTotal; val != 0.007710215 {
t.Errorf("Unexpected value for cpuTotal, got %v", val)
}
if val := metrics[0].cpus; val != 2 {
if val := m.cpus; val != 2 {
t.Errorf("Unexpected value for cpus, got %v", val)
}
if val := metrics[0].memoryRSS; val != 311296 {
if val := m.memoryRSS; val != 311296 {
t.Errorf("Unexpected value for memoryRSS, got %v", val)
}
if val := metrics[0].memoryCache; val != 4096 {
if val := m.memoryCache; val != 4096 {
t.Errorf("Unexpected value for memoryCache, got %v", val)
}
if val := metrics[0].memoryUsed; val != 356352 {
if val := m.memoryUsed; val != 356352 {
t.Errorf("Unexpected value for memoryUsed, got %v", val)
}
if val := metrics[0].memoryTotal; val != 2147483648 {
if val := m.memoryTotal; val != 2147483648 {
t.Errorf("Unexpected value for memoryTotal, got %v", val)
}
if val := metrics[0].memoryFailCount; val != 0 {
if val := m.memoryFailCount; val != 0 {
t.Errorf("Unexpected value for memoryFailCount, got %v", val)
}
if val := metrics[0].memswUsed; val != 356352 {
if val := m.memswUsed; val != 356352 {
t.Errorf("Unexpected value for swapUsed, got %v", val)
}
if val := metrics[0].memswTotal; val != 2147483648 {
if val := m.memswTotal; val != 2147483648 {
t.Errorf("Unexpected value for swapTotal, got %v", val)
}
if val := metrics[0].memswFailCount; val != 0 {
if val := m.memswFailCount; val != 0 {
t.Errorf("Unexpected value for swapFailCount, got %v", val)
}
if val := metrics[0].uid; val != "20821" {
if val := m.uid; val != "20821" {
t.Errorf("Unexpected value for uid, got %v", val)
}
if val := metrics[0].jobid; val != "10" {
if val := m.jobid; val != "10" {
t.Errorf("Unexpected value for jobid, got %v", val)
}
if val, ok := metrics[0].processExec["/bin/bash"]; !ok {
if val, ok := m.processExec["/bin/bash"]; !ok {
t.Errorf("processExec does not contain /bin/bash")
} else {
if val != 2 {