Merge pull request #767 from weaveworks/756-proc-cpu

Gather per-process CPU and memory metrics.
This commit is contained in:
Tom Wilkie
2015-12-16 15:29:56 +00:00
19 changed files with 197 additions and 106 deletions

View File

@@ -122,9 +122,9 @@ function setup {
. ~/.profile
git clone http://github.com/weaveworks/scope.git
cd scope
git checkout 0.9
git checkout master
make deps
make
make RUN_FLAGS=
./scope launch
EOF
done

View File

@@ -31,7 +31,7 @@ func walkProcPid(buf *bytes.Buffer, walker process.Walker) (map[uint64]*Proc, er
statT syscall.Stat_t
)
walker.Walk(func(p process.Process) {
walker.Walk(func(p, _ process.Process) {
dirName := strconv.Itoa(p.PID)
fdBase := filepath.Join(procRoot, dirName, "fd")

View File

@@ -43,7 +43,7 @@ var mockFS = fs.Dir("",
),
fs.File{
FName: "stat",
FContents: "1 na R 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1",
FContents: "1 na R 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0",
},
),
),

View File

@@ -20,7 +20,7 @@ const (
Load5 = "load5"
Load15 = "load15"
CPUUsage = "cpu_usage_percent"
MemUsage = "mem_usage_percent"
MemUsage = "mem_usage_bytes"
)
// Exposed for testing.
@@ -76,7 +76,7 @@ func (r *Reporter) Report() (report.Report, error) {
metrics := GetLoad(now)
cpuUsage, max := GetCPUUsagePercent()
metrics[CPUUsage] = report.MakeMetric().Add(now, cpuUsage).WithMax(max)
memUsage, max := GetMemoryUsagePercent()
memUsage, max := GetMemoryUsageBytes()
metrics[MemUsage] = report.MakeMetric().Add(now, memUsage).WithMax(max)
rep.Host.AddNode(report.MakeHostNodeID(r.hostID), report.MakeNodeWith(map[string]string{

View File

@@ -38,24 +38,24 @@ func TestReporter(t *testing.T) {
defer mtime.NowReset()
var (
oldGetKernelVersion = host.GetKernelVersion
oldGetLoad = host.GetLoad
oldGetUptime = host.GetUptime
oldGetCPUUsagePercent = host.GetCPUUsagePercent
oldGetMemoryUsagePercent = host.GetMemoryUsagePercent
oldGetKernelVersion = host.GetKernelVersion
oldGetLoad = host.GetLoad
oldGetUptime = host.GetUptime
oldGetCPUUsagePercent = host.GetCPUUsagePercent
oldGetMemoryUsageBytes = host.GetMemoryUsageBytes
)
defer func() {
host.GetKernelVersion = oldGetKernelVersion
host.GetLoad = oldGetLoad
host.GetUptime = oldGetUptime
host.GetCPUUsagePercent = oldGetCPUUsagePercent
host.GetMemoryUsagePercent = oldGetMemoryUsagePercent
host.GetMemoryUsageBytes = oldGetMemoryUsageBytes
}()
host.GetKernelVersion = func() (string, error) { return release + " " + version, nil }
host.GetLoad = func(time.Time) report.Metrics { return load }
host.GetUptime = func() (time.Duration, error) { return time.ParseDuration(uptime) }
host.GetCPUUsagePercent = func() (float64, float64) { return 30.0, 100.0 }
host.GetMemoryUsagePercent = func() (float64, float64) { return 60.0, 100.0 }
host.GetMemoryUsageBytes = func() (float64, float64) { return 60.0, 100.0 }
want := report.MakeReport()
want.Host.AddNode(report.MakeHostNodeID(hostID), report.MakeNodeWith(map[string]string{

View File

@@ -89,7 +89,7 @@ var GetCPUUsagePercent = func() (float64, float64) {
return 0.0, 0.0
}
// GetMemoryUsagePercent returns the percent memory usage and max (ie 100)
var GetMemoryUsagePercent = func() (float64, float64) {
// GetMemoryUsageBytes returns the bytes memory usage and max
var GetMemoryUsageBytes = func() (float64, float64) {
return 0.0, 0.0
}

View File

@@ -13,6 +13,8 @@ import (
"github.com/weaveworks/scope/report"
)
const kb = 1024
// Uname is swappable for mocking in tests.
var Uname = syscall.Uname
@@ -102,13 +104,13 @@ var GetCPUUsagePercent = func() (float64, float64) {
return float64(totald-idled) * 100. / float64(totald), float64(len(stat.CPUStats)) * 100.
}
// GetMemoryUsagePercent returns the percent memory usage and max (ie 100)
var GetMemoryUsagePercent = func() (float64, float64) {
// GetMemoryUsageBytes returns the bytes memory usage and max
var GetMemoryUsageBytes = func() (float64, float64) {
meminfo, err := linuxproc.ReadMemInfo(ProcMemInfo)
if err != nil {
return 0.0, 0.0
}
used := meminfo.MemTotal - meminfo.MemFree - meminfo.Buffers - meminfo.Cached
return float64(used) * 100. / float64(meminfo.MemTotal), 100.
return float64(used * kb), float64(meminfo.MemTotal * kb)
}

View File

@@ -1,8 +1,6 @@
package process
import (
"strconv"
"strings"
"time"
"github.com/armon/go-metrics"
@@ -41,32 +39,3 @@ func cachedReadFile(path string) ([]byte, error) {
metrics.IncrCounter(missMetricsKey, 1.0)
return buf, err
}
// we cache the stats, but for a shorter period
func readStats(path string) (int, int, error) {
var (
key = []byte(path)
buf []byte
err error
)
if buf, err = fileCache.Get(key); err == nil {
metrics.IncrCounter(hitMetricsKey, 1.0)
} else {
buf, err = fs.ReadFile(path)
if err != nil {
return -1, -1, err
}
fileCache.Set(key, buf, statsTimeout)
metrics.IncrCounter(missMetricsKey, 1.0)
}
splits := strings.Fields(string(buf))
ppid, err := strconv.Atoi(splits[3])
if err != nil {
return -1, -1, err
}
threads, err := strconv.Atoi(splits[19])
if err != nil {
return -1, -1, err
}
return ppid, threads, nil
}

View File

@@ -3,29 +3,37 @@ package process
import (
"strconv"
"github.com/weaveworks/scope/common/mtime"
"github.com/weaveworks/scope/report"
)
// We use these keys in node metadata
const (
PID = "pid"
Comm = "comm"
PPID = "ppid"
Cmdline = "cmdline"
Threads = "threads"
PID = "pid"
Comm = "comm"
PPID = "ppid"
Cmdline = "cmdline"
Threads = "threads"
CPUUsage = "cpu_usage_percent"
MemoryUsage = "memory_usage_bytes"
)
// Reporter generates Reports containing the Process topology.
type Reporter struct {
scope string
walker Walker
scope string
walker Walker
jiffies Jiffies
}
// Jiffies is the type for the function used to fetch the elapsed jiffies.
type Jiffies func() (uint64, float64, error)
// NewReporter makes a new Reporter.
func NewReporter(walker Walker, scope string) *Reporter {
func NewReporter(walker Walker, scope string, jiffies Jiffies) *Reporter {
return &Reporter{
scope: scope,
walker: walker,
scope: scope,
walker: walker,
jiffies: jiffies,
}
}
@@ -45,7 +53,13 @@ func (r *Reporter) Report() (report.Report, error) {
func (r *Reporter) processTopology() (report.Topology, error) {
t := report.MakeTopology()
err := r.walker.Walk(func(p Process) {
now := mtime.Now()
deltaTotal, maxCPU, err := r.jiffies()
if err != nil {
return t, err
}
err = r.walker.Walk(func(p, prev Process) {
pidstr := strconv.Itoa(p.PID)
nodeID := report.MakeProcessNodeID(r.scope, pidstr)
node := report.MakeNode()
@@ -59,9 +73,18 @@ func (r *Reporter) processTopology() (report.Topology, error) {
node.Metadata[tuple.key] = tuple.value
}
}
if p.PPID > 0 {
node.Metadata[PPID] = strconv.Itoa(p.PPID)
}
if deltaTotal > 0 {
cpuUsage := float64(p.Jiffies-prev.Jiffies) / float64(deltaTotal) * 100.
node = node.WithMetric(CPUUsage, report.MakeMetric().Add(now, cpuUsage).WithMax(maxCPU))
}
node = node.WithMetric(MemoryUsage, report.MakeMetric().Add(now, float64(p.RSSBytes)))
t.AddNode(nodeID, node)
})

View File

@@ -3,7 +3,9 @@ package process_test
import (
"reflect"
"testing"
"time"
"github.com/weaveworks/scope/common/mtime"
"github.com/weaveworks/scope/probe/process"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/test"
@@ -13,9 +15,9 @@ type mockWalker struct {
processes []process.Process
}
func (m *mockWalker) Walk(f func(process.Process)) error {
func (m *mockWalker) Walk(f func(process.Process, process.Process)) error {
for _, p := range m.processes {
f(p)
f(p, process.Process{})
}
return nil
}
@@ -30,29 +32,33 @@ func TestReporter(t *testing.T) {
{PID: 5, PPID: 1, Cmdline: "tail -f /var/log/syslog"},
},
}
getDeltaTotalJiffies := func() (uint64, float64, error) { return 0, 0., nil }
now := time.Now()
mtime.NowForce(now)
defer mtime.NowReset()
reporter := process.NewReporter(walker, "")
reporter := process.NewReporter(walker, "", getDeltaTotalJiffies)
want := report.MakeReport()
want.Process = report.MakeTopology().AddNode(
report.MakeProcessNodeID("", "1"), report.MakeNodeWith(map[string]string{
process.PID: "1",
process.Comm: "init",
process.Threads: "0",
}),
}).WithMetric(process.MemoryUsage, report.MakeMetric().Add(now, 0.)),
).AddNode(
report.MakeProcessNodeID("", "2"), report.MakeNodeWith(map[string]string{
process.PID: "2",
process.Comm: "bash",
process.PPID: "1",
process.Threads: "0",
}),
}).WithMetric(process.MemoryUsage, report.MakeMetric().Add(now, 0.)),
).AddNode(
report.MakeProcessNodeID("", "3"), report.MakeNodeWith(map[string]string{
process.PID: "3",
process.Comm: "apache",
process.PPID: "1",
process.Threads: "2",
}),
}).WithMetric(process.MemoryUsage, report.MakeMetric().Add(now, 0.)),
).AddNode(
report.MakeProcessNodeID("", "4"), report.MakeNodeWith(map[string]string{
process.PID: "4",
@@ -60,14 +66,14 @@ func TestReporter(t *testing.T) {
process.PPID: "2",
process.Cmdline: "ping foo.bar.local",
process.Threads: "0",
}),
}).WithMetric(process.MemoryUsage, report.MakeMetric().Add(now, 0.)),
).AddNode(
report.MakeProcessNodeID("", "5"), report.MakeNodeWith(map[string]string{
process.PID: "5",
process.PPID: "1",
process.Cmdline: "tail -f /var/log/syslog",
process.Threads: "0",
}),
}).WithMetric(process.MemoryUsage, report.MakeMetric().Add(now, 0.)),
)
have, err := reporter.Report()

View File

@@ -16,7 +16,7 @@ type tree struct {
// NewTree returns a new Tree that can be polled.
func NewTree(walker Walker) (Tree, error) {
pt := tree{processes: map[int]Process{}}
err := walker.Walk(func(p Process) {
err := walker.Walk(func(p, _ Process) {
pt.processes[p.PID] = p
})

View File

@@ -8,19 +8,22 @@ type Process struct {
Comm string
Cmdline string
Threads int
Jiffies uint64
RSSBytes uint64
}
// Walker is something that walks the /proc directory
type Walker interface {
Walk(func(Process)) error
Walk(func(Process, Process)) error
}
// CachingWalker is a walker than caches a copy of the output from another
// Walker, and then allows other concurrent readers to Walk that copy.
type CachingWalker struct {
cache []Process
cacheLock sync.RWMutex
source Walker
cache map[int]Process
previousByPID map[int]Process
cacheLock sync.RWMutex
source Walker
}
// NewCachingWalker returns a new CachingWalker
@@ -32,21 +35,21 @@ func NewCachingWalker(source Walker) *CachingWalker {
func (*CachingWalker) Name() string { return "Process" }
// Walk walks a cached copy of process list
func (c *CachingWalker) Walk(f func(Process)) error {
func (c *CachingWalker) Walk(f func(Process, Process)) error {
c.cacheLock.RLock()
defer c.cacheLock.RUnlock()
for _, p := range c.cache {
f(p)
f(p, c.previousByPID[p.PID])
}
return nil
}
// Tick updates cached copy of process list
func (c *CachingWalker) Tick() error {
newCache := []Process{}
err := c.source.Walk(func(p Process) {
newCache = append(newCache, p)
newCache := map[int]Process{}
err := c.source.Walk(func(p, _ Process) {
newCache[p.PID] = p
})
if err != nil {
return err
@@ -54,6 +57,7 @@ func (c *CachingWalker) Tick() error {
c.cacheLock.Lock()
defer c.cacheLock.Unlock()
c.previousByPID = c.cache
c.cache = newCache
return nil
}

View File

@@ -22,7 +22,7 @@ const (
// These functions copied from procspy.
func (walker) Walk(f func(Process)) error {
func (walker) Walk(f func(Process, Process)) error {
output, err := exec.Command(
lsofBinary,
"-i", // only Internet files
@@ -40,7 +40,7 @@ func (walker) Walk(f func(Process)) error {
}
for _, process := range processes {
f(process)
f(process, Process{})
}
return nil
}
@@ -92,3 +92,8 @@ func parseLSOF(output string) (map[string]Process, error) {
}
return processes, nil
}
// GetDeltaTotalJiffies returns 0 - darwin doesn't have jiffies.
func GetDeltaTotalJiffies() (uint64, float64, error) {
return 0, 0.0, nil
}

View File

@@ -2,11 +2,16 @@ package process
import (
"bytes"
"fmt"
"os"
"path"
"strconv"
"strings"
linuxproc "github.com/c9s/goprocinfo/linux"
"github.com/weaveworks/scope/common/fs"
"github.com/weaveworks/scope/probe/host"
)
type walker struct {
@@ -18,11 +23,50 @@ func NewWalker(procRoot string) Walker {
return &walker{procRoot: procRoot}
}
func readStats(path string) (ppid, threads int, jiffies, rss uint64, err error) {
var (
buf []byte
userJiffies, sysJiffies, rssPages uint64
)
buf, err = fs.ReadFile(path)
if err != nil {
return
}
splits := strings.Fields(string(buf))
if len(splits) < 24 {
err = fmt.Errorf("Invalid /proc/PID/stat")
return
}
ppid, err = strconv.Atoi(splits[3])
if err != nil {
return
}
threads, err = strconv.Atoi(splits[19])
if err != nil {
return
}
userJiffies, err = strconv.ParseUint(splits[13], 10, 64)
if err != nil {
return
}
sysJiffies, err = strconv.ParseUint(splits[14], 10, 64)
if err != nil {
return
}
jiffies = userJiffies + sysJiffies
rssPages, err = strconv.ParseUint(splits[23], 10, 64)
if err != nil {
return
}
rss = rssPages * uint64(os.Getpagesize())
return
}
// Walk walks the supplied directory (expecting it to look like /proc)
// and marshalls the files into instances of Process, which it then
// passes one-by-one to the supplied function. Walk is only made public
// so that is can be tested.
func (w *walker) Walk(f func(Process)) error {
func (w *walker) Walk(f func(Process, Process)) error {
dirEntries, err := fs.ReadDirNames(w.procRoot)
if err != nil {
return err
@@ -34,7 +78,7 @@ func (w *walker) Walk(f func(Process)) error {
continue
}
ppid, threads, err := readStats(path.Join(w.procRoot, filename, "stat"))
ppid, threads, jiffies, rss, err := readStats(path.Join(w.procRoot, filename, "stat"))
if err != nil {
continue
}
@@ -51,13 +95,38 @@ func (w *walker) Walk(f func(Process)) error {
}
f(Process{
PID: pid,
PPID: ppid,
Comm: comm,
Cmdline: cmdline,
Threads: threads,
})
PID: pid,
PPID: ppid,
Comm: comm,
Cmdline: cmdline,
Threads: threads,
Jiffies: jiffies,
RSSBytes: rss,
}, Process{})
}
return nil
}
var previousStat = linuxproc.CPUStat{}
// GetDeltaTotalJiffies returns the number of jiffies that have passed since it
// was last called. In that respect, it is side-effect-ful.
func GetDeltaTotalJiffies() (uint64, float64, error) {
stat, err := linuxproc.ReadStat(host.ProcStat)
if err != nil {
return 0, 0.0, err
}
var (
currentStat = stat.CPUStatAll
prevTotal = (previousStat.Idle + previousStat.IOWait + previousStat.User +
previousStat.Nice + previousStat.System + previousStat.IRQ +
previousStat.SoftIRQ + previousStat.Steal)
currentTotal = (currentStat.Idle + currentStat.IOWait + currentStat.User +
currentStat.Nice + currentStat.System + currentStat.IRQ +
currentStat.SoftIRQ + currentStat.Steal)
)
previousStat = currentStat
return currentTotal - prevTotal, float64(len(stat.CPUStats)) * 100., nil
}

View File

@@ -23,7 +23,7 @@ var mockFS = fs.Dir("",
},
fs.File{
FName: "stat",
FContents: "3 na R 2 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1",
FContents: "3 na R 2 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0",
},
),
fs.Dir("2",
@@ -37,7 +37,7 @@ var mockFS = fs.Dir("",
},
fs.File{
FName: "stat",
FContents: "2 na R 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1",
FContents: "2 na R 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0",
},
),
fs.Dir("4",
@@ -51,7 +51,7 @@ var mockFS = fs.Dir("",
},
fs.File{
FName: "stat",
FContents: "4 na R 3 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1",
FContents: "4 na R 3 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0",
},
),
fs.Dir("notapid"),
@@ -66,7 +66,7 @@ var mockFS = fs.Dir("",
},
fs.File{
FName: "stat",
FContents: "1 na R 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1",
FContents: "1 na R 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0",
},
),
),
@@ -85,7 +85,7 @@ func TestWalker(t *testing.T) {
have := map[int]process.Process{}
walker := process.NewWalker("/proc")
err := walker.Walk(func(p process.Process) {
err := walker.Walk(func(p, _ process.Process) {
have[p.PID] = p
})

View File

@@ -11,7 +11,7 @@ import (
func TestBasicWalk(t *testing.T) {
var (
procRoot = "/proc"
procFunc = func(process.Process) {}
procFunc = func(process.Process, process.Process) {}
)
if err := process.NewWalker(procRoot).Walk(procFunc); err != nil {
t.Fatal(err)
@@ -34,15 +34,16 @@ func TestCache(t *testing.T) {
t.Fatal(err)
}
want, err := all(walker)
have, err := all(cachingWalker)
if err != nil || !reflect.DeepEqual(processes, have) {
t.Errorf("%v (%v)", test.Diff(processes, have), err)
if err != nil || !reflect.DeepEqual(want, have) {
t.Errorf("%v (%v)", test.Diff(want, have), err)
}
walker.processes = []process.Process{}
have, err = all(cachingWalker)
if err != nil || !reflect.DeepEqual(processes, have) {
t.Errorf("%v (%v)", test.Diff(processes, have), err)
if err != nil || !reflect.DeepEqual(want, have) {
t.Errorf("%v (%v)", test.Diff(want, have), err)
}
err = cachingWalker.Tick()
@@ -51,16 +52,16 @@ func TestCache(t *testing.T) {
}
have, err = all(cachingWalker)
want := []process.Process{}
want = map[process.Process]struct{}{}
if err != nil || !reflect.DeepEqual(want, have) {
t.Errorf("%v (%v)", test.Diff(want, have), err)
}
}
func all(w process.Walker) ([]process.Process, error) {
all := []process.Process{}
err := w.Walk(func(p process.Process) {
all = append(all, p)
func all(w process.Walker) (map[process.Process]struct{}, error) {
all := map[process.Process]struct{}{}
err := w.Walk(func(p, _ process.Process) {
all[p] = struct{}{}
})
return all, err
}

View File

@@ -120,7 +120,7 @@ func probeMain() {
p.AddReporter(
endpointReporter,
host.NewReporter(hostID, hostName, localNets),
process.NewReporter(processCache, hostID),
process.NewReporter(processCache, hostID, process.GetDeltaTotalJiffies),
)
p.AddTagger(probe.NewTopologyTagger(), host.NewTagger(hostID, probeID))

View File

@@ -327,6 +327,18 @@ func processOriginTable(nmd report.Node, addHostTag bool, addContainerTag bool)
rows = append([]Row{{Key: "Host", ValueMajor: report.ExtractHostID(nmd)}}, rows...)
}
for _, tuple := range []struct {
key, human string
fmt formatter
}{
{process.CPUUsage, "CPU Usage", formatPercent},
{process.MemoryUsage, "Memory Usage", formatMemory},
} {
if val, ok := nmd.Metrics[tuple.key]; ok {
rows = append(rows, sparklineRow(tuple.human, val, tuple.fmt))
}
}
var (
title = "Process"
name, commFound = nmd.Metadata[process.Comm]
@@ -527,7 +539,7 @@ func hostOriginTable(nmd report.Node) (Table, bool) {
fmt formatter
}{
{host.CPUUsage, "CPU Usage", formatPercent},
{host.MemUsage, "Memory Usage", formatPercent},
{host.MemUsage, "Memory Usage", formatMemory},
} {
if val, ok := nmd.Metrics[tuple.key]; ok {
rows = append(rows, sparklineRow(tuple.human, val, tuple.fmt))

View File

@@ -145,7 +145,7 @@ func (n Node) WithSets(sets Sets) Node {
// WithMetric returns a fresh copy of n, with metric merged in at key.
func (n Node) WithMetric(key string, metric Metric) Node {
result := n.Copy()
n.Metrics[key] = n.Metrics[key].Merge(metric)
result.Metrics[key] = n.Metrics[key].Merge(metric)
return result
}