diff --git a/probe/docker/tagger.go b/probe/docker/tagger.go index 0cdc918a9..d925436c2 100644 --- a/probe/docker/tagger.go +++ b/probe/docker/tagger.go @@ -21,21 +21,21 @@ var ( // Tagger is a tagger that tags Docker container information to process // nodes that have a PID. type Tagger struct { - procRoot string - registry Registry + registry Registry + procWalker process.Walker } // NewTagger returns a usable Tagger. -func NewTagger(registry Registry, procRoot string) *Tagger { +func NewTagger(registry Registry, procWalker process.Walker) *Tagger { return &Tagger{ - registry: registry, - procRoot: procRoot, + registry: registry, + procWalker: procWalker, } } // Tag implements Tagger. func (t *Tagger) Tag(r report.Report) (report.Report, error) { - tree, err := NewProcessTreeStub(t.procRoot) + tree, err := NewProcessTreeStub(t.procWalker) if err != nil { return report.MakeReport(), err } diff --git a/probe/docker/tagger_test.go b/probe/docker/tagger_test.go index 356a2d95d..d9f7c01d6 100644 --- a/probe/docker/tagger_test.go +++ b/probe/docker/tagger_test.go @@ -27,7 +27,7 @@ func TestTagger(t *testing.T) { oldProcessTree := docker.NewProcessTreeStub defer func() { docker.NewProcessTreeStub = oldProcessTree }() - docker.NewProcessTreeStub = func(procRoot string) (process.Tree, error) { + docker.NewProcessTreeStub = func(_ process.Walker) (process.Tree, error) { return &mockProcessTree{map[int]int{2: 1}}, nil } @@ -45,7 +45,7 @@ func TestTagger(t *testing.T) { want.Process.NodeMetadatas[pid1NodeID] = report.NodeMetadata{"pid": "1"}.Merge(wantNodeMetadata) want.Process.NodeMetadatas[pid2NodeID] = report.NodeMetadata{"pid": "2"}.Merge(wantNodeMetadata) - tagger := docker.NewTagger(mockRegistryInstance, "/irrelevant") + tagger := docker.NewTagger(mockRegistryInstance, nil) have, err := tagger.Tag(input) if err != nil { t.Errorf("%v", err) diff --git a/probe/main.go b/probe/main.go index 59a44bd4a..99bb95e0a 100644 --- a/probe/main.go +++ b/probe/main.go @@ -4,6 +4,7 @@ import ( "flag" "log" "net/http" + _ "net/http/pprof" "os" "os/signal" "runtime" @@ -57,7 +58,10 @@ func main() { log.Printf("exposing Prometheus endpoint at %s%s", *httpListen, *prometheusEndpoint) http.Handle(*prometheusEndpoint, makePrometheusHandler()) } - go func(err error) { log.Print(err) }(http.ListenAndServe(*httpListen, nil)) + go func() { + err := http.ListenAndServe(*httpListen, nil) + log.Print(err) + }() } if *spyProcs && os.Getegid() != 0 { @@ -76,7 +80,8 @@ func main() { ) var ( - weaveTagger *tag.WeaveTagger + weaveTagger *tag.WeaveTagger + processCache *process.CachingWalker ) taggers := []tag.Tagger{ @@ -89,19 +94,25 @@ func main() { endpoint.NewReporter(hostID, hostName, *spyProcs), } - if *dockerEnabled && runtime.GOOS == linux { - if err = report.AddLocalBridge(*dockerBridge); err != nil { - log.Fatalf("failed to get docker bridge address: %v", err) - } + // TODO provide an alternate implementation for Darwin. + if runtime.GOOS == linux { + processCache = process.NewCachingWalker(process.NewWalker(*procRoot)) + reporters = append(reporters, process.NewReporter(processCache, hostID)) - dockerRegistry, err := docker.NewRegistry(*dockerInterval) - if err != nil { - log.Fatalf("failed to start docker registry: %v", err) - } - defer dockerRegistry.Stop() + if *dockerEnabled { + if err = report.AddLocalBridge(*dockerBridge); err != nil { + log.Fatalf("failed to get docker bridge address: %v", err) + } - taggers = append(taggers, docker.NewTagger(dockerRegistry, *procRoot)) - reporters = append(reporters, docker.NewReporter(dockerRegistry, hostID)) + dockerRegistry, err := docker.NewRegistry(*dockerInterval) + if err != nil { + log.Fatalf("failed to start docker registry: %v", err) + } + defer dockerRegistry.Stop() + + taggers = append(taggers, docker.NewTagger(dockerRegistry, processCache)) + reporters = append(reporters, docker.NewReporter(dockerRegistry, hostID)) + } } if *weaveRouterAddr != "" { @@ -113,11 +124,6 @@ func main() { taggers = append(taggers, weaveTagger) } - // TODO provide an alternate implementation for Darwin. - if runtime.GOOS == linux { - reporters = append(reporters, process.NewReporter(*procRoot, hostID)) - } - log.Printf("listening on %s", *listen) quit := make(chan struct{}) @@ -137,6 +143,12 @@ func main() { r = report.MakeReport() case <-spyTick: + if processCache != nil { + if err := processCache.Update(); err != nil { + log.Printf("error reading processes: %v", err) + } + } + for _, reporter := range reporters { newReport, err := reporter.Report() if err != nil { diff --git a/probe/process/reporter.go b/probe/process/reporter.go index 2261a78dd..c623b52a5 100644 --- a/probe/process/reporter.go +++ b/probe/process/reporter.go @@ -18,15 +18,15 @@ const ( // Reporter generate Reports containing the Process topology type reporter struct { - procRoot string - scope string + scope string + walker Walker } // NewReporter makes a new Reporter -func NewReporter(procRoot, scope string) tag.Reporter { +func NewReporter(walker Walker, scope string) tag.Reporter { return &reporter{ - procRoot: procRoot, - scope: scope, + scope: scope, + walker: walker, } } @@ -43,7 +43,7 @@ func (r *reporter) Report() (report.Report, error) { func (r *reporter) processTopology() (report.Topology, error) { t := report.NewTopology() - err := Walk(r.procRoot, func(p *Process) { + err := r.walker.Walk(func(p *Process) { pidstr := strconv.Itoa(p.PID) nodeID := report.MakeProcessNodeID(r.scope, pidstr) t.NodeMetadatas[nodeID] = report.NodeMetadata{ diff --git a/probe/process/reporter_test.go b/probe/process/reporter_test.go index 45fb57f80..cd0ffc501 100644 --- a/probe/process/reporter_test.go +++ b/probe/process/reporter_test.go @@ -9,23 +9,28 @@ import ( "github.com/weaveworks/scope/test" ) -func TestReporter(t *testing.T) { - oldWalk := process.Walk - defer func() { process.Walk = oldWalk }() +type mockWalker struct { + processes []*process.Process +} - process.Walk = func(_ string, f func(*process.Process)) error { - for _, p := range []*process.Process{ +func (m *mockWalker) Walk(f func(*process.Process)) error { + for _, p := range m.processes { + f(p) + } + return nil +} + +func TestReporter(t *testing.T) { + walker := &mockWalker{ + processes: []*process.Process{ {PID: 1, PPID: 0, Comm: "init"}, {PID: 2, PPID: 1, Comm: "bash"}, {PID: 3, PPID: 1, Comm: "apache", Threads: 2}, {PID: 4, PPID: 2, Comm: "ping", Cmdline: "ping foo.bar.local"}, - } { - f(p) - } - return nil + }, } - reporter := process.NewReporter("", "") + reporter := process.NewReporter(walker, "") want := report.MakeReport() want.Process = report.Topology{ Adjacency: report.Adjacency{}, diff --git a/probe/process/tree.go b/probe/process/tree.go index 1494f4e6f..2fd8c65e2 100644 --- a/probe/process/tree.go +++ b/probe/process/tree.go @@ -14,9 +14,9 @@ type tree struct { } // NewTree returns a new Tree that can be polled. -func NewTree(procRoot string) (Tree, error) { +func NewTree(walker Walker) (Tree, error) { pt := tree{processes: map[int]*Process{}} - err := Walk(procRoot, func(p *Process) { + err := walker.Walk(func(p *Process) { pt.processes[p.PID] = p }) diff --git a/probe/process/tree_test.go b/probe/process/tree_test.go index 1c461b22f..8278f53b2 100644 --- a/probe/process/tree_test.go +++ b/probe/process/tree_test.go @@ -8,22 +8,16 @@ import ( ) func TestTree(t *testing.T) { - oldWalk := process.Walk - defer func() { process.Walk = oldWalk }() - - process.Walk = func(_ string, f func(*process.Process)) error { - for _, p := range []*process.Process{ - {PID: 1, PPID: 0}, - {PID: 2, PPID: 1}, - {PID: 3, PPID: 1}, - {PID: 4, PPID: 2}, - } { - f(p) - } - return nil + walker := &mockWalker{ + processes: []*process.Process{ + {PID: 1, PPID: 0, Comm: "init"}, + {PID: 2, PPID: 1, Comm: "bash"}, + {PID: 3, PPID: 1, Comm: "apache", Threads: 2}, + {PID: 4, PPID: 2, Comm: "ping", Cmdline: "ping foo.bar.local"}, + }, } - tree, err := process.NewTree("foo") + tree, err := process.NewTree(walker) if err != nil { t.Fatalf("newProcessTree error: %v", err) } diff --git a/probe/process/walker.go b/probe/process/walker.go index ea885f57b..ca44934f3 100644 --- a/probe/process/walker.go +++ b/probe/process/walker.go @@ -6,6 +6,7 @@ import ( "path" "strconv" "strings" + "sync" ) // Process represents a single process. @@ -16,18 +17,32 @@ type Process struct { Threads int } +// Walker is something that walks the /proc directory +type Walker interface { + Walk(func(*Process)) error +} + // Hooks exposed for mocking var ( ReadDir = ioutil.ReadDir ReadFile = ioutil.ReadFile ) +type walker struct { + procRoot string +} + +// NewWalker creates a new process Walker +func NewWalker(procRoot string) Walker { + return &walker{procRoot: procRoot} +} + // Walk walks the supplied directory (expecting it to look like /proc) // and marshalls the files into instances of Process, which it then // passes one-by-one to the supplied function. Walk is only made public // so that is can be tested. -var Walk = func(procRoot string, f func(*Process)) error { - dirEntries, err := ReadDir(procRoot) +func (w *walker) Walk(f func(*Process)) error { + dirEntries, err := ReadDir(w.procRoot) if err != nil { return err } @@ -39,7 +54,7 @@ var Walk = func(procRoot string, f func(*Process)) error { continue } - stat, err := ReadFile(path.Join(procRoot, filename, "stat")) + stat, err := ReadFile(path.Join(w.procRoot, filename, "stat")) if err != nil { continue } @@ -55,13 +70,13 @@ var Walk = func(procRoot string, f func(*Process)) error { } cmdline := "" - if cmdlineBuf, err := ReadFile(path.Join(procRoot, filename, "cmdline")); err == nil { + if cmdlineBuf, err := ReadFile(path.Join(w.procRoot, filename, "cmdline")); err == nil { cmdlineBuf = bytes.Replace(cmdlineBuf, []byte{'\000'}, []byte{' '}, -1) cmdline = string(cmdlineBuf) } comm := "(unknown)" - if commBuf, err := ReadFile(path.Join(procRoot, filename, "comm")); err == nil { + if commBuf, err := ReadFile(path.Join(w.procRoot, filename, "comm")); err == nil { comm = strings.TrimSpace(string(commBuf)) } @@ -76,3 +91,43 @@ var Walk = func(procRoot string, f func(*Process)) error { return nil } + +// CachingWalker is a walker than caches a copy of the output from another +// Walker, and then allows other concurrent readers to Walk that copy. +type CachingWalker struct { + cache []*Process + cacheLock sync.RWMutex + source Walker +} + +// NewCachingWalker returns a new CachingWalker +func NewCachingWalker(source Walker) *CachingWalker { + return &CachingWalker{source: source} +} + +// Walk walks a cached copy of process list +func (c *CachingWalker) Walk(f func(*Process)) error { + c.cacheLock.RLock() + defer c.cacheLock.RUnlock() + + for _, p := range c.cache { + f(p) + } + return nil +} + +// Update updates cached copy of process list +func (c *CachingWalker) Update() error { + newCache := []*Process{} + err := c.source.Walk(func(p *Process) { + newCache = append(newCache, p) + }) + if err != nil { + return err + } + + c.cacheLock.Lock() + defer c.cacheLock.Unlock() + c.cache = newCache + return nil +} diff --git a/probe/process/walker_test.go b/probe/process/walker_test.go index 1c2d97766..729588c22 100644 --- a/probe/process/walker_test.go +++ b/probe/process/walker_test.go @@ -79,7 +79,8 @@ func TestWalker(t *testing.T) { } have := map[int]*process.Process{} - err := process.Walk("unused", func(p *process.Process) { + walker := process.NewWalker("unused") + err := walker.Walk(func(p *process.Process) { have[p.PID] = p }) @@ -87,3 +88,50 @@ func TestWalker(t *testing.T) { t.Errorf("%v (%v)", test.Diff(want, have), err) } } + +func TestCache(t *testing.T) { + processes := []*process.Process{ + {PID: 1, PPID: 0, Comm: "init"}, + {PID: 2, PPID: 1, Comm: "bash"}, + {PID: 3, PPID: 1, Comm: "apache", Threads: 2}, + {PID: 4, PPID: 2, Comm: "ping", Cmdline: "ping foo.bar.local"}, + } + walker := &mockWalker{ + processes: processes, + } + cachingWalker := process.NewCachingWalker(walker) + err := cachingWalker.Update() + if err != nil { + t.Fatal(err) + } + + have, err := all(cachingWalker) + if err != nil || !reflect.DeepEqual(processes, have) { + t.Errorf("%v (%v)", test.Diff(processes, have), err) + } + + walker.processes = []*process.Process{} + have, err = all(cachingWalker) + if err != nil || !reflect.DeepEqual(processes, have) { + t.Errorf("%v (%v)", test.Diff(processes, have), err) + } + + err = cachingWalker.Update() + if err != nil { + t.Fatal(err) + } + + have, err = all(cachingWalker) + want := []*process.Process{} + if err != nil || !reflect.DeepEqual(want, have) { + t.Errorf("%v (%v)", test.Diff(want, have), err) + } +} + +func all(w process.Walker) ([]*process.Process, error) { + all := []*process.Process{} + err := w.Walk(func(p *process.Process) { + all = append(all, p) + }) + return all, err +}