From e15fe2b7479f2a9de50ef3a3ba3d62ed77c44d2e Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 9 Dec 2015 15:27:35 +0000 Subject: [PATCH] Use caching proc walker in procspy. --- probe/endpoint/procspy/fixture.go | 6 ++++- probe/endpoint/procspy/proc.go | 35 +++++++--------------------- probe/endpoint/procspy/spy.go | 6 +++-- probe/endpoint/procspy/spy_darwin.go | 4 +++- probe/endpoint/procspy/spy_linux.go | 6 +++-- probe/endpoint/reporter.go | 6 +++-- prog/probe.go | 5 ++-- 7 files changed, 32 insertions(+), 36 deletions(-) diff --git a/probe/endpoint/procspy/fixture.go b/probe/endpoint/procspy/fixture.go index 1794108b5..2e53c8190 100644 --- a/probe/endpoint/procspy/fixture.go +++ b/probe/endpoint/procspy/fixture.go @@ -1,5 +1,9 @@ package procspy +import ( + "github.com/weaveworks/scope/probe/process" +) + // SetFixtures declares constant Connection and ConnectionProcs which will // always be returned by the package-level Connections and Processes // functions. It's designed to be used in tests. @@ -19,7 +23,7 @@ func (f *fixedConnIter) Next() *Connection { // SetFixtures is used in test scenarios to have known output. func SetFixtures(c []Connection) { - cbConnections = func(bool) (ConnIter, error) { + cbConnections = func(bool, process.Walker) (ConnIter, error) { f := fixedConnIter(c) return &f, nil } diff --git a/probe/endpoint/procspy/proc.go b/probe/endpoint/procspy/proc.go index 8f00dc5c6..0dc03325a 100644 --- a/probe/endpoint/procspy/proc.go +++ b/probe/endpoint/procspy/proc.go @@ -11,6 +11,7 @@ import ( "syscall" "github.com/weaveworks/scope/common/fs" + "github.com/weaveworks/scope/probe/process" ) var ( @@ -33,37 +34,27 @@ var ( // walkProcPid walks over all numerical (PID) /proc entries, and sees if their // ./fd/* files are symlink to sockets. Returns a map from socket ID (inode) // to PID. Will return an error if /proc isn't there. -func walkProcPid(buf *bytes.Buffer) (map[uint64]Proc, error) { - dirNames, err := readDir(procRoot) - if err != nil { - return nil, err - } - +func walkProcPid(buf *bytes.Buffer, walker process.Walker) (map[uint64]Proc, error) { var ( res = map[uint64]Proc{} namespaces = map[uint64]struct{}{} statT syscall.Stat_t ) - for _, entry := range dirNames { - dirName := entry.Name() - pid, err := strconv.ParseUint(dirName, 10, 0) - if err != nil { - // Not a number, so not a PID subdir. - continue - } + walker.Walk(func(p process.Process) { + dirName := strconv.Itoa(p.PID) fdBase := filepath.Join(procRoot, dirName, "fd") fds, err := readDir(fdBase) if err != nil { // Process is be gone by now, or we don't have access. - continue + return } // Read network namespace, and if we haven't seen it before, // read /proc//net/tcp err = lstat(filepath.Join(procRoot, dirName, "/ns/net"), &statT) if err != nil { - continue + return } if _, ok := namespaces[statT.Ino]; !ok { @@ -72,7 +63,6 @@ func walkProcPid(buf *bytes.Buffer) (map[uint64]Proc, error) { readFile(filepath.Join(procRoot, dirName, "/net/tcp6"), buf) } - var name string for _, fd := range fds { // Direct use of syscall.Stat() to save garbage. err = stat(filepath.Join(fdBase, fd.Name()), &statT) @@ -85,19 +75,12 @@ func walkProcPid(buf *bytes.Buffer) (map[uint64]Proc, error) { continue } - if name == "" { - if name = procName(filepath.Join(procRoot, dirName)); name == "" { - // Process might be gone by now - break - } - } - res[statT.Ino] = Proc{ - PID: uint(pid), - Name: name, + PID: uint(p.PID), + Name: p.Comm, } } - } + }) return res, nil } diff --git a/probe/endpoint/procspy/spy.go b/probe/endpoint/procspy/spy.go index 14934d3bb..27a748256 100644 --- a/probe/endpoint/procspy/spy.go +++ b/probe/endpoint/procspy/spy.go @@ -5,6 +5,8 @@ package procspy import ( "net" + + "github.com/weaveworks/scope/probe/process" ) const ( @@ -38,6 +40,6 @@ type ConnIter interface { // If processes is true it'll additionally try to lookup the process owning the // connection, filling in the Proc field. You will need to run this as root to // find all processes. -func Connections(processes bool) (ConnIter, error) { - return cbConnections(processes) +func Connections(processes bool, walker process.Walker) (ConnIter, error) { + return cbConnections(processes, walker) } diff --git a/probe/endpoint/procspy/spy_darwin.go b/probe/endpoint/procspy/spy_darwin.go index ef93b9045..988330185 100644 --- a/probe/endpoint/procspy/spy_darwin.go +++ b/probe/endpoint/procspy/spy_darwin.go @@ -4,6 +4,8 @@ import ( "net" "os/exec" "strconv" + + "github.com/weaveworks/scope/probe/process" ) const ( @@ -14,7 +16,7 @@ const ( // Connections returns all established (TCP) connections. No need to be root // to run this. If processes is true it also tries to fill in the process // fields of the connection. You need to be root to find all processes. -var cbConnections = func(processes bool) (ConnIter, error) { +var cbConnections = func(processes bool, walker process.Walker) (ConnIter, error) { out, err := exec.Command( netstatBinary, "-n", // no number resolving diff --git a/probe/endpoint/procspy/spy_linux.go b/probe/endpoint/procspy/spy_linux.go index 6c66a4613..29634edb0 100644 --- a/probe/endpoint/procspy/spy_linux.go +++ b/probe/endpoint/procspy/spy_linux.go @@ -3,6 +3,8 @@ package procspy import ( "bytes" "sync" + + "github.com/weaveworks/scope/probe/process" ) var bufPool = sync.Pool{ @@ -31,7 +33,7 @@ func (c *pnConnIter) Next() *Connection { } // cbConnections sets Connections() -var cbConnections = func(processes bool) (ConnIter, error) { +var cbConnections = func(processes bool, walker process.Walker) (ConnIter, error) { // buffer for contents of /proc//net/tcp buf := bufPool.Get().(*bytes.Buffer) buf.Reset() @@ -39,7 +41,7 @@ var cbConnections = func(processes bool) (ConnIter, error) { var procs map[uint64]Proc if processes { var err error - if procs, err = walkProcPid(buf); err != nil { + if procs, err = walkProcPid(buf, walker); err != nil { return nil, err } } diff --git a/probe/endpoint/reporter.go b/probe/endpoint/reporter.go index e0083a2d7..6dcd9e8bc 100644 --- a/probe/endpoint/reporter.go +++ b/probe/endpoint/reporter.go @@ -26,6 +26,7 @@ type Reporter struct { includeProcesses bool includeNAT bool flowWalker flowWalker // interface + procWalker process.Walker natMapper natMapper reverseResolver *reverseResolver } @@ -47,7 +48,7 @@ var SpyDuration = prometheus.NewSummaryVec( // on the host machine, at the granularity of host and port. That information // is stored in the Endpoint topology. It optionally enriches that topology // with process (PID) information. -func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bool) *Reporter { +func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bool, procWalker process.Walker) *Reporter { return &Reporter{ hostID: hostID, hostName: hostName, @@ -55,6 +56,7 @@ func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bo flowWalker: newConntrackFlowWalker(useConntrack), natMapper: makeNATMapper(newConntrackFlowWalker(useConntrack, "--any-nat")), reverseResolver: newReverseResolver(), + procWalker: procWalker, } } @@ -78,7 +80,7 @@ func (r *Reporter) Report() (report.Report, error) { rpt := report.MakeReport() { - conns, err := procspy.Connections(r.includeProcesses) + conns, err := procspy.Connections(r.includeProcesses, r.procWalker) if err != nil { return rpt, err } diff --git a/prog/probe.go b/prog/probe.go index d289e4020..cde995d19 100644 --- a/prog/probe.go +++ b/prog/probe.go @@ -112,10 +112,11 @@ func probeMain() { resolver := xfer.NewStaticResolver(targets, clients.Set) defer resolver.Stop() - endpointReporter := endpoint.NewReporter(hostID, hostName, *spyProcs, *useConntrack) + processCache := process.NewCachingWalker(process.NewWalker(*procRoot)) + + endpointReporter := endpoint.NewReporter(hostID, hostName, *spyProcs, *useConntrack, processCache) defer endpointReporter.Stop() - processCache := process.NewCachingWalker(process.NewWalker(*procRoot)) p := probe.New(*spyInterval, *publishInterval, clients) p.AddTicker(processCache) p.AddReporter(