Use caching proc walker in procspy.

This commit is contained in:
Tom Wilkie
2015-12-09 15:27:35 +00:00
parent ee2efeb10f
commit e15fe2b747
7 changed files with 32 additions and 36 deletions

View File

@@ -1,5 +1,9 @@
package procspy
import (
"github.com/weaveworks/scope/probe/process"
)
// SetFixtures declares constant Connection and ConnectionProcs which will
// always be returned by the package-level Connections and Processes
// functions. It's designed to be used in tests.
@@ -19,7 +23,7 @@ func (f *fixedConnIter) Next() *Connection {
// SetFixtures is used in test scenarios to have known output.
func SetFixtures(c []Connection) {
cbConnections = func(bool) (ConnIter, error) {
cbConnections = func(bool, process.Walker) (ConnIter, error) {
f := fixedConnIter(c)
return &f, nil
}

View File

@@ -11,6 +11,7 @@ import (
"syscall"
"github.com/weaveworks/scope/common/fs"
"github.com/weaveworks/scope/probe/process"
)
var (
@@ -33,37 +34,27 @@ var (
// walkProcPid walks over all numerical (PID) /proc entries, and sees if their
// ./fd/* files are symlink to sockets. Returns a map from socket ID (inode)
// to PID. Will return an error if /proc isn't there.
func walkProcPid(buf *bytes.Buffer) (map[uint64]Proc, error) {
dirNames, err := readDir(procRoot)
if err != nil {
return nil, err
}
func walkProcPid(buf *bytes.Buffer, walker process.Walker) (map[uint64]Proc, error) {
var (
res = map[uint64]Proc{}
namespaces = map[uint64]struct{}{}
statT syscall.Stat_t
)
for _, entry := range dirNames {
dirName := entry.Name()
pid, err := strconv.ParseUint(dirName, 10, 0)
if err != nil {
// Not a number, so not a PID subdir.
continue
}
walker.Walk(func(p process.Process) {
dirName := strconv.Itoa(p.PID)
fdBase := filepath.Join(procRoot, dirName, "fd")
fds, err := readDir(fdBase)
if err != nil {
// Process is be gone by now, or we don't have access.
continue
return
}
// Read network namespace, and if we haven't seen it before,
// read /proc/<pid>/net/tcp
err = lstat(filepath.Join(procRoot, dirName, "/ns/net"), &statT)
if err != nil {
continue
return
}
if _, ok := namespaces[statT.Ino]; !ok {
@@ -72,7 +63,6 @@ func walkProcPid(buf *bytes.Buffer) (map[uint64]Proc, error) {
readFile(filepath.Join(procRoot, dirName, "/net/tcp6"), buf)
}
var name string
for _, fd := range fds {
// Direct use of syscall.Stat() to save garbage.
err = stat(filepath.Join(fdBase, fd.Name()), &statT)
@@ -85,19 +75,12 @@ func walkProcPid(buf *bytes.Buffer) (map[uint64]Proc, error) {
continue
}
if name == "" {
if name = procName(filepath.Join(procRoot, dirName)); name == "" {
// Process might be gone by now
break
}
}
res[statT.Ino] = Proc{
PID: uint(pid),
Name: name,
PID: uint(p.PID),
Name: p.Comm,
}
}
}
})
return res, nil
}

View File

@@ -5,6 +5,8 @@ package procspy
import (
"net"
"github.com/weaveworks/scope/probe/process"
)
const (
@@ -38,6 +40,6 @@ type ConnIter interface {
// If processes is true it'll additionally try to lookup the process owning the
// connection, filling in the Proc field. You will need to run this as root to
// find all processes.
func Connections(processes bool) (ConnIter, error) {
return cbConnections(processes)
func Connections(processes bool, walker process.Walker) (ConnIter, error) {
return cbConnections(processes, walker)
}

View File

@@ -4,6 +4,8 @@ import (
"net"
"os/exec"
"strconv"
"github.com/weaveworks/scope/probe/process"
)
const (
@@ -14,7 +16,7 @@ const (
// Connections returns all established (TCP) connections. No need to be root
// to run this. If processes is true it also tries to fill in the process
// fields of the connection. You need to be root to find all processes.
var cbConnections = func(processes bool) (ConnIter, error) {
var cbConnections = func(processes bool, walker process.Walker) (ConnIter, error) {
out, err := exec.Command(
netstatBinary,
"-n", // no number resolving

View File

@@ -3,6 +3,8 @@ package procspy
import (
"bytes"
"sync"
"github.com/weaveworks/scope/probe/process"
)
var bufPool = sync.Pool{
@@ -31,7 +33,7 @@ func (c *pnConnIter) Next() *Connection {
}
// cbConnections sets Connections()
var cbConnections = func(processes bool) (ConnIter, error) {
var cbConnections = func(processes bool, walker process.Walker) (ConnIter, error) {
// buffer for contents of /proc/<pid>/net/tcp
buf := bufPool.Get().(*bytes.Buffer)
buf.Reset()
@@ -39,7 +41,7 @@ var cbConnections = func(processes bool) (ConnIter, error) {
var procs map[uint64]Proc
if processes {
var err error
if procs, err = walkProcPid(buf); err != nil {
if procs, err = walkProcPid(buf, walker); err != nil {
return nil, err
}
}

View File

@@ -26,6 +26,7 @@ type Reporter struct {
includeProcesses bool
includeNAT bool
flowWalker flowWalker // interface
procWalker process.Walker
natMapper natMapper
reverseResolver *reverseResolver
}
@@ -47,7 +48,7 @@ var SpyDuration = prometheus.NewSummaryVec(
// on the host machine, at the granularity of host and port. That information
// is stored in the Endpoint topology. It optionally enriches that topology
// with process (PID) information.
func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bool) *Reporter {
func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bool, procWalker process.Walker) *Reporter {
return &Reporter{
hostID: hostID,
hostName: hostName,
@@ -55,6 +56,7 @@ func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bo
flowWalker: newConntrackFlowWalker(useConntrack),
natMapper: makeNATMapper(newConntrackFlowWalker(useConntrack, "--any-nat")),
reverseResolver: newReverseResolver(),
procWalker: procWalker,
}
}
@@ -78,7 +80,7 @@ func (r *Reporter) Report() (report.Report, error) {
rpt := report.MakeReport()
{
conns, err := procspy.Connections(r.includeProcesses)
conns, err := procspy.Connections(r.includeProcesses, r.procWalker)
if err != nil {
return rpt, err
}

View File

@@ -112,10 +112,11 @@ func probeMain() {
resolver := xfer.NewStaticResolver(targets, clients.Set)
defer resolver.Stop()
endpointReporter := endpoint.NewReporter(hostID, hostName, *spyProcs, *useConntrack)
processCache := process.NewCachingWalker(process.NewWalker(*procRoot))
endpointReporter := endpoint.NewReporter(hostID, hostName, *spyProcs, *useConntrack, processCache)
defer endpointReporter.Stop()
processCache := process.NewCachingWalker(process.NewWalker(*procRoot))
p := probe.New(*spyInterval, *publishInterval, clients)
p.AddTicker(processCache)
p.AddReporter(