From b9e968ff437395c7e19e2bae39c39fef649c721e Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Fri, 26 Jun 2015 17:40:13 +0000 Subject: [PATCH] Cache the walk of the process tree, reusing it in docker tagger and process reporter. Update the cache every spy tick. This change make CPU usage of scope on my box go from ~40% to ~17%. --- probe/docker/tagger.go | 12 +++---- probe/docker/tagger_test.go | 4 +-- probe/main.go | 48 +++++++++++++++---------- probe/process/reporter.go | 12 +++---- probe/process/reporter_test.go | 25 +++++++------ probe/process/tree.go | 4 +-- probe/process/tree_test.go | 22 +++++------- probe/process/walker.go | 65 +++++++++++++++++++++++++++++++--- probe/process/walker_test.go | 50 +++++++++++++++++++++++++- 9 files changed, 178 insertions(+), 64 deletions(-) diff --git a/probe/docker/tagger.go b/probe/docker/tagger.go index 0cdc918a9..d925436c2 100644 --- a/probe/docker/tagger.go +++ b/probe/docker/tagger.go @@ -21,21 +21,21 @@ var ( // Tagger is a tagger that tags Docker container information to process // nodes that have a PID. type Tagger struct { - procRoot string - registry Registry + registry Registry + procWalker process.Walker } // NewTagger returns a usable Tagger. -func NewTagger(registry Registry, procRoot string) *Tagger { +func NewTagger(registry Registry, procWalker process.Walker) *Tagger { return &Tagger{ - registry: registry, - procRoot: procRoot, + registry: registry, + procWalker: procWalker, } } // Tag implements Tagger. func (t *Tagger) Tag(r report.Report) (report.Report, error) { - tree, err := NewProcessTreeStub(t.procRoot) + tree, err := NewProcessTreeStub(t.procWalker) if err != nil { return report.MakeReport(), err } diff --git a/probe/docker/tagger_test.go b/probe/docker/tagger_test.go index 356a2d95d..d9f7c01d6 100644 --- a/probe/docker/tagger_test.go +++ b/probe/docker/tagger_test.go @@ -27,7 +27,7 @@ func TestTagger(t *testing.T) { oldProcessTree := docker.NewProcessTreeStub defer func() { docker.NewProcessTreeStub = oldProcessTree }() - docker.NewProcessTreeStub = func(procRoot string) (process.Tree, error) { + docker.NewProcessTreeStub = func(_ process.Walker) (process.Tree, error) { return &mockProcessTree{map[int]int{2: 1}}, nil } @@ -45,7 +45,7 @@ func TestTagger(t *testing.T) { want.Process.NodeMetadatas[pid1NodeID] = report.NodeMetadata{"pid": "1"}.Merge(wantNodeMetadata) want.Process.NodeMetadatas[pid2NodeID] = report.NodeMetadata{"pid": "2"}.Merge(wantNodeMetadata) - tagger := docker.NewTagger(mockRegistryInstance, "/irrelevant") + tagger := docker.NewTagger(mockRegistryInstance, nil) have, err := tagger.Tag(input) if err != nil { t.Errorf("%v", err) diff --git a/probe/main.go b/probe/main.go index 59a44bd4a..99bb95e0a 100644 --- a/probe/main.go +++ b/probe/main.go @@ -4,6 +4,7 @@ import ( "flag" "log" "net/http" + _ "net/http/pprof" "os" "os/signal" "runtime" @@ -57,7 +58,10 @@ func main() { log.Printf("exposing Prometheus endpoint at %s%s", *httpListen, *prometheusEndpoint) http.Handle(*prometheusEndpoint, makePrometheusHandler()) } - go func(err error) { log.Print(err) }(http.ListenAndServe(*httpListen, nil)) + go func() { + err := http.ListenAndServe(*httpListen, nil) + log.Print(err) + }() } if *spyProcs && os.Getegid() != 0 { @@ -76,7 +80,8 @@ func main() { ) var ( - weaveTagger *tag.WeaveTagger + weaveTagger *tag.WeaveTagger + processCache *process.CachingWalker ) taggers := []tag.Tagger{ @@ -89,19 +94,25 @@ func main() { endpoint.NewReporter(hostID, hostName, *spyProcs), } - if *dockerEnabled && runtime.GOOS == linux { - if err = report.AddLocalBridge(*dockerBridge); err != nil { - log.Fatalf("failed to get docker bridge address: %v", err) - } + // TODO provide an alternate implementation for Darwin. + if runtime.GOOS == linux { + processCache = process.NewCachingWalker(process.NewWalker(*procRoot)) + reporters = append(reporters, process.NewReporter(processCache, hostID)) - dockerRegistry, err := docker.NewRegistry(*dockerInterval) - if err != nil { - log.Fatalf("failed to start docker registry: %v", err) - } - defer dockerRegistry.Stop() + if *dockerEnabled { + if err = report.AddLocalBridge(*dockerBridge); err != nil { + log.Fatalf("failed to get docker bridge address: %v", err) + } - taggers = append(taggers, docker.NewTagger(dockerRegistry, *procRoot)) - reporters = append(reporters, docker.NewReporter(dockerRegistry, hostID)) + dockerRegistry, err := docker.NewRegistry(*dockerInterval) + if err != nil { + log.Fatalf("failed to start docker registry: %v", err) + } + defer dockerRegistry.Stop() + + taggers = append(taggers, docker.NewTagger(dockerRegistry, processCache)) + reporters = append(reporters, docker.NewReporter(dockerRegistry, hostID)) + } } if *weaveRouterAddr != "" { @@ -113,11 +124,6 @@ func main() { taggers = append(taggers, weaveTagger) } - // TODO provide an alternate implementation for Darwin. - if runtime.GOOS == linux { - reporters = append(reporters, process.NewReporter(*procRoot, hostID)) - } - log.Printf("listening on %s", *listen) quit := make(chan struct{}) @@ -137,6 +143,12 @@ func main() { r = report.MakeReport() case <-spyTick: + if processCache != nil { + if err := processCache.Update(); err != nil { + log.Printf("error reading processes: %v", err) + } + } + for _, reporter := range reporters { newReport, err := reporter.Report() if err != nil { diff --git a/probe/process/reporter.go b/probe/process/reporter.go index 2261a78dd..c623b52a5 100644 --- a/probe/process/reporter.go +++ b/probe/process/reporter.go @@ -18,15 +18,15 @@ const ( // Reporter generate Reports containing the Process topology type reporter struct { - procRoot string - scope string + scope string + walker Walker } // NewReporter makes a new Reporter -func NewReporter(procRoot, scope string) tag.Reporter { +func NewReporter(walker Walker, scope string) tag.Reporter { return &reporter{ - procRoot: procRoot, - scope: scope, + scope: scope, + walker: walker, } } @@ -43,7 +43,7 @@ func (r *reporter) Report() (report.Report, error) { func (r *reporter) processTopology() (report.Topology, error) { t := report.NewTopology() - err := Walk(r.procRoot, func(p *Process) { + err := r.walker.Walk(func(p *Process) { pidstr := strconv.Itoa(p.PID) nodeID := report.MakeProcessNodeID(r.scope, pidstr) t.NodeMetadatas[nodeID] = report.NodeMetadata{ diff --git a/probe/process/reporter_test.go b/probe/process/reporter_test.go index 45fb57f80..cd0ffc501 100644 --- a/probe/process/reporter_test.go +++ b/probe/process/reporter_test.go @@ -9,23 +9,28 @@ import ( "github.com/weaveworks/scope/test" ) -func TestReporter(t *testing.T) { - oldWalk := process.Walk - defer func() { process.Walk = oldWalk }() +type mockWalker struct { + processes []*process.Process +} - process.Walk = func(_ string, f func(*process.Process)) error { - for _, p := range []*process.Process{ +func (m *mockWalker) Walk(f func(*process.Process)) error { + for _, p := range m.processes { + f(p) + } + return nil +} + +func TestReporter(t *testing.T) { + walker := &mockWalker{ + processes: []*process.Process{ {PID: 1, PPID: 0, Comm: "init"}, {PID: 2, PPID: 1, Comm: "bash"}, {PID: 3, PPID: 1, Comm: "apache", Threads: 2}, {PID: 4, PPID: 2, Comm: "ping", Cmdline: "ping foo.bar.local"}, - } { - f(p) - } - return nil + }, } - reporter := process.NewReporter("", "") + reporter := process.NewReporter(walker, "") want := report.MakeReport() want.Process = report.Topology{ Adjacency: report.Adjacency{}, diff --git a/probe/process/tree.go b/probe/process/tree.go index 1494f4e6f..2fd8c65e2 100644 --- a/probe/process/tree.go +++ b/probe/process/tree.go @@ -14,9 +14,9 @@ type tree struct { } // NewTree returns a new Tree that can be polled. -func NewTree(procRoot string) (Tree, error) { +func NewTree(walker Walker) (Tree, error) { pt := tree{processes: map[int]*Process{}} - err := Walk(procRoot, func(p *Process) { + err := walker.Walk(func(p *Process) { pt.processes[p.PID] = p }) diff --git a/probe/process/tree_test.go b/probe/process/tree_test.go index 1c461b22f..8278f53b2 100644 --- a/probe/process/tree_test.go +++ b/probe/process/tree_test.go @@ -8,22 +8,16 @@ import ( ) func TestTree(t *testing.T) { - oldWalk := process.Walk - defer func() { process.Walk = oldWalk }() - - process.Walk = func(_ string, f func(*process.Process)) error { - for _, p := range []*process.Process{ - {PID: 1, PPID: 0}, - {PID: 2, PPID: 1}, - {PID: 3, PPID: 1}, - {PID: 4, PPID: 2}, - } { - f(p) - } - return nil + walker := &mockWalker{ + processes: []*process.Process{ + {PID: 1, PPID: 0, Comm: "init"}, + {PID: 2, PPID: 1, Comm: "bash"}, + {PID: 3, PPID: 1, Comm: "apache", Threads: 2}, + {PID: 4, PPID: 2, Comm: "ping", Cmdline: "ping foo.bar.local"}, + }, } - tree, err := process.NewTree("foo") + tree, err := process.NewTree(walker) if err != nil { t.Fatalf("newProcessTree error: %v", err) } diff --git a/probe/process/walker.go b/probe/process/walker.go index ea885f57b..ca44934f3 100644 --- a/probe/process/walker.go +++ b/probe/process/walker.go @@ -6,6 +6,7 @@ import ( "path" "strconv" "strings" + "sync" ) // Process represents a single process. @@ -16,18 +17,32 @@ type Process struct { Threads int } +// Walker is something that walks the /proc directory +type Walker interface { + Walk(func(*Process)) error +} + // Hooks exposed for mocking var ( ReadDir = ioutil.ReadDir ReadFile = ioutil.ReadFile ) +type walker struct { + procRoot string +} + +// NewWalker creates a new process Walker +func NewWalker(procRoot string) Walker { + return &walker{procRoot: procRoot} +} + // Walk walks the supplied directory (expecting it to look like /proc) // and marshalls the files into instances of Process, which it then // passes one-by-one to the supplied function. Walk is only made public // so that is can be tested. -var Walk = func(procRoot string, f func(*Process)) error { - dirEntries, err := ReadDir(procRoot) +func (w *walker) Walk(f func(*Process)) error { + dirEntries, err := ReadDir(w.procRoot) if err != nil { return err } @@ -39,7 +54,7 @@ var Walk = func(procRoot string, f func(*Process)) error { continue } - stat, err := ReadFile(path.Join(procRoot, filename, "stat")) + stat, err := ReadFile(path.Join(w.procRoot, filename, "stat")) if err != nil { continue } @@ -55,13 +70,13 @@ var Walk = func(procRoot string, f func(*Process)) error { } cmdline := "" - if cmdlineBuf, err := ReadFile(path.Join(procRoot, filename, "cmdline")); err == nil { + if cmdlineBuf, err := ReadFile(path.Join(w.procRoot, filename, "cmdline")); err == nil { cmdlineBuf = bytes.Replace(cmdlineBuf, []byte{'\000'}, []byte{' '}, -1) cmdline = string(cmdlineBuf) } comm := "(unknown)" - if commBuf, err := ReadFile(path.Join(procRoot, filename, "comm")); err == nil { + if commBuf, err := ReadFile(path.Join(w.procRoot, filename, "comm")); err == nil { comm = strings.TrimSpace(string(commBuf)) } @@ -76,3 +91,43 @@ var Walk = func(procRoot string, f func(*Process)) error { return nil } + +// CachingWalker is a walker than caches a copy of the output from another +// Walker, and then allows other concurrent readers to Walk that copy. +type CachingWalker struct { + cache []*Process + cacheLock sync.RWMutex + source Walker +} + +// NewCachingWalker returns a new CachingWalker +func NewCachingWalker(source Walker) *CachingWalker { + return &CachingWalker{source: source} +} + +// Walk walks a cached copy of process list +func (c *CachingWalker) Walk(f func(*Process)) error { + c.cacheLock.RLock() + defer c.cacheLock.RUnlock() + + for _, p := range c.cache { + f(p) + } + return nil +} + +// Update updates cached copy of process list +func (c *CachingWalker) Update() error { + newCache := []*Process{} + err := c.source.Walk(func(p *Process) { + newCache = append(newCache, p) + }) + if err != nil { + return err + } + + c.cacheLock.Lock() + defer c.cacheLock.Unlock() + c.cache = newCache + return nil +} diff --git a/probe/process/walker_test.go b/probe/process/walker_test.go index 1c2d97766..729588c22 100644 --- a/probe/process/walker_test.go +++ b/probe/process/walker_test.go @@ -79,7 +79,8 @@ func TestWalker(t *testing.T) { } have := map[int]*process.Process{} - err := process.Walk("unused", func(p *process.Process) { + walker := process.NewWalker("unused") + err := walker.Walk(func(p *process.Process) { have[p.PID] = p }) @@ -87,3 +88,50 @@ func TestWalker(t *testing.T) { t.Errorf("%v (%v)", test.Diff(want, have), err) } } + +func TestCache(t *testing.T) { + processes := []*process.Process{ + {PID: 1, PPID: 0, Comm: "init"}, + {PID: 2, PPID: 1, Comm: "bash"}, + {PID: 3, PPID: 1, Comm: "apache", Threads: 2}, + {PID: 4, PPID: 2, Comm: "ping", Cmdline: "ping foo.bar.local"}, + } + walker := &mockWalker{ + processes: processes, + } + cachingWalker := process.NewCachingWalker(walker) + err := cachingWalker.Update() + if err != nil { + t.Fatal(err) + } + + have, err := all(cachingWalker) + if err != nil || !reflect.DeepEqual(processes, have) { + t.Errorf("%v (%v)", test.Diff(processes, have), err) + } + + walker.processes = []*process.Process{} + have, err = all(cachingWalker) + if err != nil || !reflect.DeepEqual(processes, have) { + t.Errorf("%v (%v)", test.Diff(processes, have), err) + } + + err = cachingWalker.Update() + if err != nil { + t.Fatal(err) + } + + have, err = all(cachingWalker) + want := []*process.Process{} + if err != nil || !reflect.DeepEqual(want, have) { + t.Errorf("%v (%v)", test.Diff(want, have), err) + } +} + +func all(w process.Walker) ([]*process.Process, error) { + all := []*process.Process{} + err := w.Walk(func(p *process.Process) { + all = append(all, p) + }) + return all, err +}