Merge pull request #287 from tomwilkie/284-cpu-usage

Cache the walk of the process tree, reusing it in docker tagger and process reporter.
This commit is contained in:
Tom Wilkie
2015-06-29 13:22:22 +02:00
9 changed files with 178 additions and 64 deletions

View File

@@ -21,21 +21,21 @@ var (
// Tagger is a tagger that tags Docker container information to process
// nodes that have a PID.
type Tagger struct {
procRoot string
registry Registry
registry Registry
procWalker process.Walker
}
// NewTagger returns a usable Tagger.
func NewTagger(registry Registry, procRoot string) *Tagger {
func NewTagger(registry Registry, procWalker process.Walker) *Tagger {
return &Tagger{
registry: registry,
procRoot: procRoot,
registry: registry,
procWalker: procWalker,
}
}
// Tag implements Tagger.
func (t *Tagger) Tag(r report.Report) (report.Report, error) {
tree, err := NewProcessTreeStub(t.procRoot)
tree, err := NewProcessTreeStub(t.procWalker)
if err != nil {
return report.MakeReport(), err
}

View File

@@ -27,7 +27,7 @@ func TestTagger(t *testing.T) {
oldProcessTree := docker.NewProcessTreeStub
defer func() { docker.NewProcessTreeStub = oldProcessTree }()
docker.NewProcessTreeStub = func(procRoot string) (process.Tree, error) {
docker.NewProcessTreeStub = func(_ process.Walker) (process.Tree, error) {
return &mockProcessTree{map[int]int{2: 1}}, nil
}
@@ -45,7 +45,7 @@ func TestTagger(t *testing.T) {
want.Process.NodeMetadatas[pid1NodeID] = report.NodeMetadata{"pid": "1"}.Merge(wantNodeMetadata)
want.Process.NodeMetadatas[pid2NodeID] = report.NodeMetadata{"pid": "2"}.Merge(wantNodeMetadata)
tagger := docker.NewTagger(mockRegistryInstance, "/irrelevant")
tagger := docker.NewTagger(mockRegistryInstance, nil)
have, err := tagger.Tag(input)
if err != nil {
t.Errorf("%v", err)

View File

@@ -4,6 +4,7 @@ import (
"flag"
"log"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"runtime"
@@ -57,7 +58,10 @@ func main() {
log.Printf("exposing Prometheus endpoint at %s%s", *httpListen, *prometheusEndpoint)
http.Handle(*prometheusEndpoint, makePrometheusHandler())
}
go func(err error) { log.Print(err) }(http.ListenAndServe(*httpListen, nil))
go func() {
err := http.ListenAndServe(*httpListen, nil)
log.Print(err)
}()
}
if *spyProcs && os.Getegid() != 0 {
@@ -76,7 +80,8 @@ func main() {
)
var (
weaveTagger *tag.WeaveTagger
weaveTagger *tag.WeaveTagger
processCache *process.CachingWalker
)
taggers := []tag.Tagger{
@@ -89,19 +94,25 @@ func main() {
endpoint.NewReporter(hostID, hostName, *spyProcs),
}
if *dockerEnabled && runtime.GOOS == linux {
if err = report.AddLocalBridge(*dockerBridge); err != nil {
log.Fatalf("failed to get docker bridge address: %v", err)
}
// TODO provide an alternate implementation for Darwin.
if runtime.GOOS == linux {
processCache = process.NewCachingWalker(process.NewWalker(*procRoot))
reporters = append(reporters, process.NewReporter(processCache, hostID))
dockerRegistry, err := docker.NewRegistry(*dockerInterval)
if err != nil {
log.Fatalf("failed to start docker registry: %v", err)
}
defer dockerRegistry.Stop()
if *dockerEnabled {
if err = report.AddLocalBridge(*dockerBridge); err != nil {
log.Fatalf("failed to get docker bridge address: %v", err)
}
taggers = append(taggers, docker.NewTagger(dockerRegistry, *procRoot))
reporters = append(reporters, docker.NewReporter(dockerRegistry, hostID))
dockerRegistry, err := docker.NewRegistry(*dockerInterval)
if err != nil {
log.Fatalf("failed to start docker registry: %v", err)
}
defer dockerRegistry.Stop()
taggers = append(taggers, docker.NewTagger(dockerRegistry, processCache))
reporters = append(reporters, docker.NewReporter(dockerRegistry, hostID))
}
}
if *weaveRouterAddr != "" {
@@ -113,11 +124,6 @@ func main() {
taggers = append(taggers, weaveTagger)
}
// TODO provide an alternate implementation for Darwin.
if runtime.GOOS == linux {
reporters = append(reporters, process.NewReporter(*procRoot, hostID))
}
log.Printf("listening on %s", *listen)
quit := make(chan struct{})
@@ -137,6 +143,12 @@ func main() {
r = report.MakeReport()
case <-spyTick:
if processCache != nil {
if err := processCache.Update(); err != nil {
log.Printf("error reading processes: %v", err)
}
}
for _, reporter := range reporters {
newReport, err := reporter.Report()
if err != nil {

View File

@@ -18,15 +18,15 @@ const (
// Reporter generate Reports containing the Process topology
type reporter struct {
procRoot string
scope string
scope string
walker Walker
}
// NewReporter makes a new Reporter
func NewReporter(procRoot, scope string) tag.Reporter {
func NewReporter(walker Walker, scope string) tag.Reporter {
return &reporter{
procRoot: procRoot,
scope: scope,
scope: scope,
walker: walker,
}
}
@@ -43,7 +43,7 @@ func (r *reporter) Report() (report.Report, error) {
func (r *reporter) processTopology() (report.Topology, error) {
t := report.NewTopology()
err := Walk(r.procRoot, func(p *Process) {
err := r.walker.Walk(func(p *Process) {
pidstr := strconv.Itoa(p.PID)
nodeID := report.MakeProcessNodeID(r.scope, pidstr)
t.NodeMetadatas[nodeID] = report.NodeMetadata{

View File

@@ -9,23 +9,28 @@ import (
"github.com/weaveworks/scope/test"
)
func TestReporter(t *testing.T) {
oldWalk := process.Walk
defer func() { process.Walk = oldWalk }()
type mockWalker struct {
processes []*process.Process
}
process.Walk = func(_ string, f func(*process.Process)) error {
for _, p := range []*process.Process{
func (m *mockWalker) Walk(f func(*process.Process)) error {
for _, p := range m.processes {
f(p)
}
return nil
}
func TestReporter(t *testing.T) {
walker := &mockWalker{
processes: []*process.Process{
{PID: 1, PPID: 0, Comm: "init"},
{PID: 2, PPID: 1, Comm: "bash"},
{PID: 3, PPID: 1, Comm: "apache", Threads: 2},
{PID: 4, PPID: 2, Comm: "ping", Cmdline: "ping foo.bar.local"},
} {
f(p)
}
return nil
},
}
reporter := process.NewReporter("", "")
reporter := process.NewReporter(walker, "")
want := report.MakeReport()
want.Process = report.Topology{
Adjacency: report.Adjacency{},

View File

@@ -14,9 +14,9 @@ type tree struct {
}
// NewTree returns a new Tree that can be polled.
func NewTree(procRoot string) (Tree, error) {
func NewTree(walker Walker) (Tree, error) {
pt := tree{processes: map[int]*Process{}}
err := Walk(procRoot, func(p *Process) {
err := walker.Walk(func(p *Process) {
pt.processes[p.PID] = p
})

View File

@@ -8,22 +8,16 @@ import (
)
func TestTree(t *testing.T) {
oldWalk := process.Walk
defer func() { process.Walk = oldWalk }()
process.Walk = func(_ string, f func(*process.Process)) error {
for _, p := range []*process.Process{
{PID: 1, PPID: 0},
{PID: 2, PPID: 1},
{PID: 3, PPID: 1},
{PID: 4, PPID: 2},
} {
f(p)
}
return nil
walker := &mockWalker{
processes: []*process.Process{
{PID: 1, PPID: 0, Comm: "init"},
{PID: 2, PPID: 1, Comm: "bash"},
{PID: 3, PPID: 1, Comm: "apache", Threads: 2},
{PID: 4, PPID: 2, Comm: "ping", Cmdline: "ping foo.bar.local"},
},
}
tree, err := process.NewTree("foo")
tree, err := process.NewTree(walker)
if err != nil {
t.Fatalf("newProcessTree error: %v", err)
}

View File

@@ -6,6 +6,7 @@ import (
"path"
"strconv"
"strings"
"sync"
)
// Process represents a single process.
@@ -16,18 +17,32 @@ type Process struct {
Threads int
}
// Walker is something that walks the /proc directory
type Walker interface {
Walk(func(*Process)) error
}
// Hooks exposed for mocking
var (
ReadDir = ioutil.ReadDir
ReadFile = ioutil.ReadFile
)
type walker struct {
procRoot string
}
// NewWalker creates a new process Walker
func NewWalker(procRoot string) Walker {
return &walker{procRoot: procRoot}
}
// Walk walks the supplied directory (expecting it to look like /proc)
// and marshalls the files into instances of Process, which it then
// passes one-by-one to the supplied function. Walk is only made public
// so that is can be tested.
var Walk = func(procRoot string, f func(*Process)) error {
dirEntries, err := ReadDir(procRoot)
func (w *walker) Walk(f func(*Process)) error {
dirEntries, err := ReadDir(w.procRoot)
if err != nil {
return err
}
@@ -39,7 +54,7 @@ var Walk = func(procRoot string, f func(*Process)) error {
continue
}
stat, err := ReadFile(path.Join(procRoot, filename, "stat"))
stat, err := ReadFile(path.Join(w.procRoot, filename, "stat"))
if err != nil {
continue
}
@@ -55,13 +70,13 @@ var Walk = func(procRoot string, f func(*Process)) error {
}
cmdline := ""
if cmdlineBuf, err := ReadFile(path.Join(procRoot, filename, "cmdline")); err == nil {
if cmdlineBuf, err := ReadFile(path.Join(w.procRoot, filename, "cmdline")); err == nil {
cmdlineBuf = bytes.Replace(cmdlineBuf, []byte{'\000'}, []byte{' '}, -1)
cmdline = string(cmdlineBuf)
}
comm := "(unknown)"
if commBuf, err := ReadFile(path.Join(procRoot, filename, "comm")); err == nil {
if commBuf, err := ReadFile(path.Join(w.procRoot, filename, "comm")); err == nil {
comm = strings.TrimSpace(string(commBuf))
}
@@ -76,3 +91,43 @@ var Walk = func(procRoot string, f func(*Process)) error {
return nil
}
// CachingWalker is a walker than caches a copy of the output from another
// Walker, and then allows other concurrent readers to Walk that copy.
type CachingWalker struct {
cache []*Process
cacheLock sync.RWMutex
source Walker
}
// NewCachingWalker returns a new CachingWalker
func NewCachingWalker(source Walker) *CachingWalker {
return &CachingWalker{source: source}
}
// Walk walks a cached copy of process list
func (c *CachingWalker) Walk(f func(*Process)) error {
c.cacheLock.RLock()
defer c.cacheLock.RUnlock()
for _, p := range c.cache {
f(p)
}
return nil
}
// Update updates cached copy of process list
func (c *CachingWalker) Update() error {
newCache := []*Process{}
err := c.source.Walk(func(p *Process) {
newCache = append(newCache, p)
})
if err != nil {
return err
}
c.cacheLock.Lock()
defer c.cacheLock.Unlock()
c.cache = newCache
return nil
}

View File

@@ -79,7 +79,8 @@ func TestWalker(t *testing.T) {
}
have := map[int]*process.Process{}
err := process.Walk("unused", func(p *process.Process) {
walker := process.NewWalker("unused")
err := walker.Walk(func(p *process.Process) {
have[p.PID] = p
})
@@ -87,3 +88,50 @@ func TestWalker(t *testing.T) {
t.Errorf("%v (%v)", test.Diff(want, have), err)
}
}
func TestCache(t *testing.T) {
processes := []*process.Process{
{PID: 1, PPID: 0, Comm: "init"},
{PID: 2, PPID: 1, Comm: "bash"},
{PID: 3, PPID: 1, Comm: "apache", Threads: 2},
{PID: 4, PPID: 2, Comm: "ping", Cmdline: "ping foo.bar.local"},
}
walker := &mockWalker{
processes: processes,
}
cachingWalker := process.NewCachingWalker(walker)
err := cachingWalker.Update()
if err != nil {
t.Fatal(err)
}
have, err := all(cachingWalker)
if err != nil || !reflect.DeepEqual(processes, have) {
t.Errorf("%v (%v)", test.Diff(processes, have), err)
}
walker.processes = []*process.Process{}
have, err = all(cachingWalker)
if err != nil || !reflect.DeepEqual(processes, have) {
t.Errorf("%v (%v)", test.Diff(processes, have), err)
}
err = cachingWalker.Update()
if err != nil {
t.Fatal(err)
}
have, err = all(cachingWalker)
want := []*process.Process{}
if err != nil || !reflect.DeepEqual(want, have) {
t.Errorf("%v (%v)", test.Diff(want, have), err)
}
}
func all(w process.Walker) ([]*process.Process, error) {
all := []*process.Process{}
err := w.Walk(func(p *process.Process) {
all = append(all, p)
})
return all, err
}