Merge pull request #11 from treydock/parallel
Parallelize cgroup loads and process info collection
This commit is contained in:
commit
59b0b2779f
|
@ -24,6 +24,7 @@ import (
|
|||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/containerd/cgroups"
|
||||
"github.com/go-kit/kit/log"
|
||||
|
@ -51,6 +52,7 @@ var (
|
|||
procRoot = kingpin.Flag("path.proc.root", "Root path to proc fs").Default(defProcRoot).String()
|
||||
collectProc = kingpin.Flag("collect.proc", "Boolean that sets if to collect proc information").Default("false").Bool()
|
||||
collectProcMaxExec = kingpin.Flag("collect.proc.max-exec", "Max length of process executable to record").Default("100").Int()
|
||||
metricLock = sync.RWMutex{}
|
||||
)
|
||||
|
||||
type CgroupMetric struct {
|
||||
|
@ -220,24 +222,34 @@ func getProcInfo(pids []int, metric *CgroupMetric, logger log.Logger) {
|
|||
level.Error(logger).Log("msg", "Unable to open procfs", "path", *procRoot)
|
||||
return
|
||||
}
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(len(pids))
|
||||
for _, pid := range pids {
|
||||
proc, err := procFS.Proc(pid)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("msg", "Unable to read PID", "pid", pid)
|
||||
continue
|
||||
}
|
||||
executable, err := proc.Executable()
|
||||
if err != nil {
|
||||
level.Error(logger).Log("msg", "Unable to get executable for PID", "pid", pid)
|
||||
continue
|
||||
}
|
||||
if len(executable) > *collectProcMaxExec {
|
||||
level.Debug(logger).Log("msg", "Executable will be truncated", "executable", executable, "len", len(executable), "pid", pid)
|
||||
executable = executable[len(executable)-*collectProcMaxExec:]
|
||||
executable = fmt.Sprintf("...%s", executable)
|
||||
}
|
||||
executables[executable] += 1
|
||||
go func(p int) {
|
||||
proc, err := procFS.Proc(p)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("msg", "Unable to read PID", "pid", p)
|
||||
wg.Done()
|
||||
return
|
||||
}
|
||||
executable, err := proc.Executable()
|
||||
if err != nil {
|
||||
level.Error(logger).Log("msg", "Unable to get executable for PID", "pid", p)
|
||||
wg.Done()
|
||||
return
|
||||
}
|
||||
if len(executable) > *collectProcMaxExec {
|
||||
level.Debug(logger).Log("msg", "Executable will be truncated", "executable", executable, "len", len(executable), "pid", p)
|
||||
executable = executable[len(executable)-*collectProcMaxExec:]
|
||||
executable = fmt.Sprintf("...%s", executable)
|
||||
}
|
||||
metricLock.Lock()
|
||||
executables[executable] += 1
|
||||
metricLock.Unlock()
|
||||
wg.Done()
|
||||
}(pid)
|
||||
}
|
||||
wg.Wait()
|
||||
metric.processExec = executables
|
||||
}
|
||||
|
||||
|
@ -305,6 +317,45 @@ func NewExporter(paths []string, logger log.Logger) *Exporter {
|
|||
}
|
||||
}
|
||||
|
||||
func (e *Exporter) getMetrics(name string, pids map[string][]int) (CgroupMetric, error) {
|
||||
metric := CgroupMetric{name: name}
|
||||
level.Debug(e.logger).Log("msg", "Loading cgroup", "path", name)
|
||||
ctrl, err := cgroups.Load(subsystem, func(subsystem cgroups.Name) (string, error) {
|
||||
return name, nil
|
||||
})
|
||||
if err != nil {
|
||||
level.Error(e.logger).Log("msg", "Failed to load cgroups", "path", name, "err", err)
|
||||
metric.err = true
|
||||
return metric, err
|
||||
}
|
||||
stats, _ := ctrl.Stat(cgroups.IgnoreNotExist)
|
||||
metric.cpuUser = float64(stats.CPU.Usage.User) / 1000000000.0
|
||||
metric.cpuSystem = float64(stats.CPU.Usage.Kernel) / 1000000000.0
|
||||
metric.cpuTotal = float64(stats.CPU.Usage.Total) / 1000000000.0
|
||||
metric.memoryRSS = float64(stats.Memory.TotalRSS)
|
||||
metric.memoryCache = float64(stats.Memory.TotalCache)
|
||||
metric.memoryUsed = float64(stats.Memory.Usage.Usage)
|
||||
metric.memoryTotal = float64(stats.Memory.Usage.Limit)
|
||||
metric.memoryFailCount = float64(stats.Memory.Usage.Failcnt)
|
||||
metric.memswUsed = float64(stats.Memory.Swap.Usage)
|
||||
metric.memswTotal = float64(stats.Memory.Swap.Limit)
|
||||
metric.memswFailCount = float64(stats.Memory.Swap.Failcnt)
|
||||
if cpus, err := getCPUs(name, e.logger); err == nil {
|
||||
metric.cpus = len(cpus)
|
||||
metric.cpu_list = strings.Join(cpus, ",")
|
||||
}
|
||||
getInfo(name, &metric, e.logger)
|
||||
if *collectProc {
|
||||
if val, ok := pids[name]; ok {
|
||||
level.Debug(e.logger).Log("msg", "Get process info", "pids", fmt.Sprintf("%v", val))
|
||||
getProcInfo(val, &metric, e.logger)
|
||||
} else {
|
||||
level.Error(e.logger).Log("msg", "Unable to get PIDs", "path", name)
|
||||
}
|
||||
}
|
||||
return metric, nil
|
||||
}
|
||||
|
||||
func (e *Exporter) collect() ([]CgroupMetric, error) {
|
||||
var names []string
|
||||
var metrics []CgroupMetric
|
||||
|
@ -345,47 +396,19 @@ func (e *Exporter) collect() ([]CgroupMetric, error) {
|
|||
pids[name] = []int{p.Pid}
|
||||
}
|
||||
}
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(len(names))
|
||||
for _, name := range names {
|
||||
metric := CgroupMetric{name: name}
|
||||
level.Debug(e.logger).Log("msg", "Loading cgroup", "path", name)
|
||||
ctrl, err := cgroups.Load(subsystem, func(subsystem cgroups.Name) (string, error) {
|
||||
return name, nil
|
||||
})
|
||||
if err != nil {
|
||||
level.Error(e.logger).Log("msg", "Failed to load cgroups", "path", name, "err", err)
|
||||
metric.err = true
|
||||
go func(n string, p map[string][]int) {
|
||||
metric, _ := e.getMetrics(n, p)
|
||||
metricLock.Lock()
|
||||
metrics = append(metrics, metric)
|
||||
continue
|
||||
}
|
||||
stats, _ := ctrl.Stat(cgroups.IgnoreNotExist)
|
||||
metric.cpuUser = float64(stats.CPU.Usage.User) / 1000000000.0
|
||||
metric.cpuSystem = float64(stats.CPU.Usage.Kernel) / 1000000000.0
|
||||
metric.cpuTotal = float64(stats.CPU.Usage.Total) / 1000000000.0
|
||||
metric.memoryRSS = float64(stats.Memory.TotalRSS)
|
||||
metric.memoryCache = float64(stats.Memory.TotalCache)
|
||||
metric.memoryUsed = float64(stats.Memory.Usage.Usage)
|
||||
metric.memoryTotal = float64(stats.Memory.Usage.Limit)
|
||||
metric.memoryFailCount = float64(stats.Memory.Usage.Failcnt)
|
||||
metric.memswUsed = float64(stats.Memory.Swap.Usage)
|
||||
metric.memswTotal = float64(stats.Memory.Swap.Limit)
|
||||
metric.memswFailCount = float64(stats.Memory.Swap.Failcnt)
|
||||
if cpus, err := getCPUs(name, e.logger); err == nil {
|
||||
metric.cpus = len(cpus)
|
||||
metric.cpu_list = strings.Join(cpus, ",")
|
||||
}
|
||||
getInfo(name, &metric, e.logger)
|
||||
if *collectProc {
|
||||
if val, ok := pids[name]; ok {
|
||||
level.Debug(e.logger).Log("msg", "Get process info", "pids", fmt.Sprintf("%v", val))
|
||||
getProcInfo(val, &metric, e.logger)
|
||||
} else {
|
||||
level.Error(e.logger).Log("msg", "Unable to get PIDs", "path", name)
|
||||
}
|
||||
}
|
||||
metrics = append(metrics, metric)
|
||||
metricLock.Unlock()
|
||||
wg.Done()
|
||||
}(name, pids)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
return metrics, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -182,49 +182,59 @@ func TestCollectSLURM(t *testing.T) {
|
|||
t.Errorf("Unexpected number of metrics, got %d expected 2", val)
|
||||
return
|
||||
}
|
||||
if val := metrics[0].cpuUser; val != 0 {
|
||||
var m CgroupMetric
|
||||
for _, metric := range metrics {
|
||||
if metric.jobid == "10" {
|
||||
m = metric
|
||||
}
|
||||
}
|
||||
if m.jobid == "" {
|
||||
t.Errorf("Metrics with jobid=10 not found")
|
||||
return
|
||||
}
|
||||
if val := m.cpuUser; val != 0 {
|
||||
t.Errorf("Unexpected value for cpuUser, got %v", val)
|
||||
}
|
||||
if val := metrics[0].cpuSystem; val != 0 {
|
||||
if val := m.cpuSystem; val != 0 {
|
||||
t.Errorf("Unexpected value for cpuSystem, got %v", val)
|
||||
}
|
||||
if val := metrics[0].cpuTotal; val != 0.007710215 {
|
||||
if val := m.cpuTotal; val != 0.007710215 {
|
||||
t.Errorf("Unexpected value for cpuTotal, got %v", val)
|
||||
}
|
||||
if val := metrics[0].cpus; val != 2 {
|
||||
if val := m.cpus; val != 2 {
|
||||
t.Errorf("Unexpected value for cpus, got %v", val)
|
||||
}
|
||||
if val := metrics[0].memoryRSS; val != 311296 {
|
||||
if val := m.memoryRSS; val != 311296 {
|
||||
t.Errorf("Unexpected value for memoryRSS, got %v", val)
|
||||
}
|
||||
if val := metrics[0].memoryCache; val != 4096 {
|
||||
if val := m.memoryCache; val != 4096 {
|
||||
t.Errorf("Unexpected value for memoryCache, got %v", val)
|
||||
}
|
||||
if val := metrics[0].memoryUsed; val != 356352 {
|
||||
if val := m.memoryUsed; val != 356352 {
|
||||
t.Errorf("Unexpected value for memoryUsed, got %v", val)
|
||||
}
|
||||
if val := metrics[0].memoryTotal; val != 2147483648 {
|
||||
if val := m.memoryTotal; val != 2147483648 {
|
||||
t.Errorf("Unexpected value for memoryTotal, got %v", val)
|
||||
}
|
||||
if val := metrics[0].memoryFailCount; val != 0 {
|
||||
if val := m.memoryFailCount; val != 0 {
|
||||
t.Errorf("Unexpected value for memoryFailCount, got %v", val)
|
||||
}
|
||||
if val := metrics[0].memswUsed; val != 356352 {
|
||||
if val := m.memswUsed; val != 356352 {
|
||||
t.Errorf("Unexpected value for swapUsed, got %v", val)
|
||||
}
|
||||
if val := metrics[0].memswTotal; val != 2147483648 {
|
||||
if val := m.memswTotal; val != 2147483648 {
|
||||
t.Errorf("Unexpected value for swapTotal, got %v", val)
|
||||
}
|
||||
if val := metrics[0].memswFailCount; val != 0 {
|
||||
if val := m.memswFailCount; val != 0 {
|
||||
t.Errorf("Unexpected value for swapFailCount, got %v", val)
|
||||
}
|
||||
if val := metrics[0].uid; val != "20821" {
|
||||
if val := m.uid; val != "20821" {
|
||||
t.Errorf("Unexpected value for uid, got %v", val)
|
||||
}
|
||||
if val := metrics[0].jobid; val != "10" {
|
||||
if val := m.jobid; val != "10" {
|
||||
t.Errorf("Unexpected value for jobid, got %v", val)
|
||||
}
|
||||
if val, ok := metrics[0].processExec["/bin/bash"]; !ok {
|
||||
if val, ok := m.processExec["/bin/bash"]; !ok {
|
||||
t.Errorf("processExec does not contain /bin/bash")
|
||||
} else {
|
||||
if val != 2 {
|
||||
|
|
Loading…
Reference in New Issue