Merge pull request #741 from weaveworks/284-procspy-cached-walker

Use caching walker in procspy
This commit is contained in:
Tom Wilkie
2015-12-10 14:43:01 +00:00
14 changed files with 197 additions and 162 deletions

View File

@@ -2,10 +2,78 @@ package fs
import (
"io"
"io/ioutil"
"os"
"syscall"
)
// Open is a mockable version of os.Open
var Open = func(path string) (io.ReadWriteCloser, error) {
// Interface is the filesystem interface type.
type Interface interface {
ReadDir(string) ([]os.FileInfo, error)
ReadFile(string) ([]byte, error)
Lstat(string, *syscall.Stat_t) error
Stat(string, *syscall.Stat_t) error
Open(string) (io.ReadWriteCloser, error)
}
type realFS struct{}
// FS is the way you should access the filesystem.
var fs Interface = realFS{}
func (realFS) ReadDir(path string) ([]os.FileInfo, error) {
return ioutil.ReadDir(path)
}
func (realFS) ReadFile(path string) ([]byte, error) {
return ioutil.ReadFile(path)
}
func (realFS) Lstat(path string, stat *syscall.Stat_t) error {
return syscall.Lstat(path, stat)
}
func (realFS) Stat(path string, stat *syscall.Stat_t) error {
return syscall.Stat(path, stat)
}
func (realFS) Open(path string) (io.ReadWriteCloser, error) {
return os.Open(path)
}
// trampolines here to allow users to do fs.ReadDir etc
// ReadDir see ioutil.ReadDir
func ReadDir(path string) ([]os.FileInfo, error) {
return fs.ReadDir(path)
}
// ReadFile see ioutil.ReadFile
func ReadFile(path string) ([]byte, error) {
return fs.ReadFile(path)
}
// Lstat see syscall.Lstat
func Lstat(path string, stat *syscall.Stat_t) error {
return fs.Lstat(path, stat)
}
// Stat see syscall.Stat
func Stat(path string, stat *syscall.Stat_t) error {
return fs.Stat(path, stat)
}
// Open see os.Open
func Open(path string) (io.ReadWriteCloser, error) {
return fs.Open(path)
}
// Mock is used to switch out the filesystem for a mock.
func Mock(mock Interface) {
fs = mock
}
// Restore puts back the real filesystem.
func Restore() {
fs = realFS{}
}

View File

@@ -21,7 +21,7 @@ func benchmarkConnections(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
cbConnections(false)
cbConnections(false, nil)
}
}

View File

@@ -1,5 +1,9 @@
package procspy
import (
"github.com/weaveworks/scope/probe/process"
)
// SetFixtures declares constant Connection and ConnectionProcs which will
// always be returned by the package-level Connections and Processes
// functions. It's designed to be used in tests.
@@ -19,7 +23,7 @@ func (f *fixedConnIter) Next() *Connection {
// SetFixtures is used in test scenarios to have known output.
func SetFixtures(c []Connection) {
cbConnections = func(bool) (ConnIter, error) {
cbConnections = func(bool, process.Walker) (ConnIter, error) {
f := fixedConnIter(c)
return &f, nil
}

View File

@@ -4,13 +4,12 @@ package procspy
import (
"bytes"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"syscall"
"github.com/weaveworks/scope/common/fs"
"github.com/weaveworks/scope/probe/process"
)
var (
@@ -22,48 +21,30 @@ func SetProcRoot(root string) {
procRoot = root
}
// made variables for mocking
var (
readDir = ioutil.ReadDir
lstat = syscall.Lstat
stat = syscall.Stat
open = fs.Open
)
// walkProcPid walks over all numerical (PID) /proc entries, and sees if their
// ./fd/* files are symlink to sockets. Returns a map from socket ID (inode)
// to PID. Will return an error if /proc isn't there.
func walkProcPid(buf *bytes.Buffer) (map[uint64]Proc, error) {
dirNames, err := readDir(procRoot)
if err != nil {
return nil, err
}
func walkProcPid(buf *bytes.Buffer, walker process.Walker) (map[uint64]Proc, error) {
var (
res = map[uint64]Proc{}
namespaces = map[uint64]struct{}{}
statT syscall.Stat_t
)
for _, entry := range dirNames {
dirName := entry.Name()
pid, err := strconv.ParseUint(dirName, 10, 0)
if err != nil {
// Not a number, so not a PID subdir.
continue
}
walker.Walk(func(p process.Process) {
dirName := strconv.Itoa(p.PID)
fdBase := filepath.Join(procRoot, dirName, "fd")
fds, err := readDir(fdBase)
fds, err := fs.ReadDir(fdBase)
if err != nil {
// Process is be gone by now, or we don't have access.
continue
return
}
// Read network namespace, and if we haven't seen it before,
// read /proc/<pid>/net/tcp
err = lstat(filepath.Join(procRoot, dirName, "/ns/net"), &statT)
err = fs.Lstat(filepath.Join(procRoot, dirName, "/ns/net"), &statT)
if err != nil {
continue
return
}
if _, ok := namespaces[statT.Ino]; !ok {
@@ -72,10 +53,9 @@ func walkProcPid(buf *bytes.Buffer) (map[uint64]Proc, error) {
readFile(filepath.Join(procRoot, dirName, "/net/tcp6"), buf)
}
var name string
for _, fd := range fds {
// Direct use of syscall.Stat() to save garbage.
err = stat(filepath.Join(fdBase, fd.Name()), &statT)
err = fs.Stat(filepath.Join(fdBase, fd.Name()), &statT)
if err != nil {
continue
}
@@ -85,50 +65,21 @@ func walkProcPid(buf *bytes.Buffer) (map[uint64]Proc, error) {
continue
}
if name == "" {
if name = procName(filepath.Join(procRoot, dirName)); name == "" {
// Process might be gone by now
break
}
}
res[statT.Ino] = Proc{
PID: uint(pid),
Name: name,
PID: uint(p.PID),
Name: p.Comm,
}
}
}
})
return res, nil
}
// procName does a pid->name lookup.
func procName(base string) string {
fh, err := open(filepath.Join(base, "/comm"))
if err != nil {
return ""
}
name := make([]byte, 64)
l, err := fh.Read(name)
fh.Close()
if err != nil {
return ""
}
if l < 2 {
return ""
}
// drop trailing "\n"
return string(name[:l-1])
}
// readFile reads an arbitrary file into a buffer. It's a variable so it can
// be overwritten for benchmarks. That's bad practice and we should change it
// to be a dependency.
var readFile = func(filename string, buf *bytes.Buffer) error {
f, err := os.Open(filename)
f, err := fs.Open(filename)
if err != nil {
return err
}

View File

@@ -6,6 +6,8 @@ import (
"syscall"
"testing"
fs_hook "github.com/weaveworks/scope/common/fs"
"github.com/weaveworks/scope/probe/process"
"github.com/weaveworks/scope/test/fs"
)
@@ -31,17 +33,20 @@ var mockFS = fs.Dir("",
FStat: syscall.Stat_t{},
},
),
fs.File{
FName: "stat",
FContents: "1 na R 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1",
},
),
),
)
func TestWalkProcPid(t *testing.T) {
oldReadDir, oldLstat, oldStat, oldOpen := readDir, lstat, stat, open
defer func() { readDir, lstat, stat, open = oldReadDir, oldLstat, oldStat, oldOpen }()
readDir, lstat, stat, open = mockFS.ReadDir, mockFS.Lstat, mockFS.Stat, mockFS.Open
fs_hook.Mock(mockFS)
defer fs_hook.Restore()
buf := bytes.Buffer{}
have, err := walkProcPid(&buf)
have, err := walkProcPid(&buf, process.NewWalker(procRoot))
if err != nil {
t.Fatal(err)
}

View File

@@ -5,6 +5,8 @@ package procspy
import (
"net"
"github.com/weaveworks/scope/probe/process"
)
const (
@@ -38,6 +40,6 @@ type ConnIter interface {
// If processes is true it'll additionally try to lookup the process owning the
// connection, filling in the Proc field. You will need to run this as root to
// find all processes.
func Connections(processes bool) (ConnIter, error) {
return cbConnections(processes)
func Connections(processes bool, walker process.Walker) (ConnIter, error) {
return cbConnections(processes, walker)
}

View File

@@ -4,6 +4,8 @@ import (
"net"
"os/exec"
"strconv"
"github.com/weaveworks/scope/probe/process"
)
const (
@@ -14,7 +16,7 @@ const (
// Connections returns all established (TCP) connections. No need to be root
// to run this. If processes is true it also tries to fill in the process
// fields of the connection. You need to be root to find all processes.
var cbConnections = func(processes bool) (ConnIter, error) {
var cbConnections = func(processes bool, walker process.Walker) (ConnIter, error) {
out, err := exec.Command(
netstatBinary,
"-n", // no number resolving

View File

@@ -3,6 +3,8 @@ package procspy
import (
"bytes"
"sync"
"github.com/weaveworks/scope/probe/process"
)
var bufPool = sync.Pool{
@@ -31,7 +33,7 @@ func (c *pnConnIter) Next() *Connection {
}
// cbConnections sets Connections()
var cbConnections = func(processes bool) (ConnIter, error) {
var cbConnections = func(processes bool, walker process.Walker) (ConnIter, error) {
// buffer for contents of /proc/<pid>/net/tcp
buf := bufPool.Get().(*bytes.Buffer)
buf.Reset()
@@ -39,7 +41,7 @@ var cbConnections = func(processes bool) (ConnIter, error) {
var procs map[uint64]Proc
if processes {
var err error
if procs, err = walkProcPid(buf); err != nil {
if procs, err = walkProcPid(buf, walker); err != nil {
return nil, err
}
}

View File

@@ -26,6 +26,7 @@ type Reporter struct {
includeProcesses bool
includeNAT bool
flowWalker flowWalker // interface
procWalker process.Walker
natMapper natMapper
reverseResolver *reverseResolver
}
@@ -47,7 +48,7 @@ var SpyDuration = prometheus.NewSummaryVec(
// on the host machine, at the granularity of host and port. That information
// is stored in the Endpoint topology. It optionally enriches that topology
// with process (PID) information.
func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bool) *Reporter {
func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bool, procWalker process.Walker) *Reporter {
return &Reporter{
hostID: hostID,
hostName: hostName,
@@ -55,6 +56,7 @@ func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bo
flowWalker: newConntrackFlowWalker(useConntrack),
natMapper: makeNATMapper(newConntrackFlowWalker(useConntrack, "--any-nat")),
reverseResolver: newReverseResolver(),
procWalker: procWalker,
}
}
@@ -78,7 +80,7 @@ func (r *Reporter) Report() (report.Report, error) {
rpt := report.MakeReport()
{
conns, err := procspy.Connections(r.includeProcesses)
conns, err := procspy.Connections(r.includeProcesses, r.procWalker)
if err != nil {
return rpt, err
}

View File

@@ -71,7 +71,7 @@ func TestSpyNoProcesses(t *testing.T) {
nodeName = "frenchs-since-1904" // TODO rename to hostNmae
)
reporter := endpoint.NewReporter(nodeID, nodeName, false, false)
reporter := endpoint.NewReporter(nodeID, nodeName, false, false, nil)
r, _ := reporter.Report()
//buf, _ := json.MarshalIndent(r, "", " ")
//t.Logf("\n%s\n", buf)
@@ -107,7 +107,7 @@ func TestSpyWithProcesses(t *testing.T) {
nodeName = "fishermans-friend" // TODO rename to hostNmae
)
reporter := endpoint.NewReporter(nodeID, nodeName, true, false)
reporter := endpoint.NewReporter(nodeID, nodeName, true, false, nil)
r, _ := reporter.Report()
// buf, _ := json.MarshalIndent(r, "", " ") ; t.Logf("\n%s\n", buf)

View File

@@ -2,16 +2,11 @@ package process
import (
"bytes"
"io/ioutil"
"path"
"strconv"
"strings"
)
// Hooks exposed for mocking
var (
ReadDir = ioutil.ReadDir
ReadFile = ioutil.ReadFile
"github.com/weaveworks/scope/common/fs"
)
type walker struct {
@@ -28,7 +23,7 @@ func NewWalker(procRoot string) Walker {
// passes one-by-one to the supplied function. Walk is only made public
// so that is can be tested.
func (w *walker) Walk(f func(Process)) error {
dirEntries, err := ReadDir(w.procRoot)
dirEntries, err := fs.ReadDir(w.procRoot)
if err != nil {
return err
}
@@ -40,7 +35,7 @@ func (w *walker) Walk(f func(Process)) error {
continue
}
stat, err := ReadFile(path.Join(w.procRoot, filename, "stat"))
stat, err := fs.ReadFile(path.Join(w.procRoot, filename, "stat"))
if err != nil {
continue
}
@@ -56,13 +51,13 @@ func (w *walker) Walk(f func(Process)) error {
}
cmdline := ""
if cmdlineBuf, err := ReadFile(path.Join(w.procRoot, filename, "cmdline")); err == nil {
if cmdlineBuf, err := fs.ReadFile(path.Join(w.procRoot, filename, "cmdline")); err == nil {
cmdlineBuf = bytes.Replace(cmdlineBuf, []byte{'\000'}, []byte{' '}, -1)
cmdline = string(cmdlineBuf)
}
comm := "(unknown)"
if commBuf, err := ReadFile(path.Join(w.procRoot, filename, "comm")); err == nil {
if commBuf, err := fs.ReadFile(path.Join(w.procRoot, filename, "comm")); err == nil {
comm = strings.TrimSpace(string(commBuf))
}

View File

@@ -1,75 +1,80 @@
package process_test
import (
"fmt"
"os"
"reflect"
"strconv"
"strings"
"testing"
"time"
fs_hook "github.com/weaveworks/scope/common/fs"
"github.com/weaveworks/scope/probe/process"
"github.com/weaveworks/scope/test"
"github.com/weaveworks/scope/test/fs"
)
type mockProcess struct {
name, comm, cmdline string
}
func (p mockProcess) Name() string { return p.name }
func (p mockProcess) Size() int64 { return 0 }
func (p mockProcess) Mode() os.FileMode { return 0 }
func (p mockProcess) ModTime() time.Time { return time.Now() }
func (p mockProcess) IsDir() bool { return true }
func (p mockProcess) Sys() interface{} { return nil }
var mockFS = fs.Dir("",
fs.Dir("proc",
fs.Dir("3",
fs.File{
FName: "comm",
FContents: "curl\n",
},
fs.File{
FName: "cmdline",
FContents: "curl\000google.com",
},
fs.File{
FName: "stat",
FContents: "3 na R 2 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1",
},
),
fs.Dir("2",
fs.File{
FName: "comm",
FContents: "bash\n",
},
fs.File{
FName: "cmdline",
FContents: "",
},
fs.File{
FName: "stat",
FContents: "2 na R 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1",
},
),
fs.Dir("4",
fs.File{
FName: "comm",
FContents: "apache\n",
},
fs.File{
FName: "cmdline",
FContents: "",
},
fs.File{
FName: "stat",
FContents: "4 na R 3 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1",
},
),
fs.Dir("notapid"),
fs.Dir("1",
fs.File{
FName: "comm",
FContents: "init\n",
},
fs.File{
FName: "cmdline",
FContents: "",
},
fs.File{
FName: "stat",
FContents: "1 na R 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1",
},
),
),
)
func TestWalker(t *testing.T) {
oldReadDir, oldReadFile := process.ReadDir, process.ReadFile
defer func() {
process.ReadDir = oldReadDir
process.ReadFile = oldReadFile
}()
processes := map[string]mockProcess{
"3": {name: "3", comm: "curl\n", cmdline: "curl\000google.com"},
"2": {name: "2", comm: "bash\n"},
"4": {name: "4", comm: "apache\n"},
"notapid": {name: "notapid"},
"1": {name: "1", comm: "init\n"},
}
process.ReadDir = func(path string) ([]os.FileInfo, error) {
result := []os.FileInfo{}
for _, p := range processes {
result = append(result, p)
}
return result, nil
}
process.ReadFile = func(path string) ([]byte, error) {
splits := strings.Split(path, "/")
pid := splits[len(splits)-2]
process, ok := processes[pid]
if !ok {
return nil, fmt.Errorf("not found")
}
file := splits[len(splits)-1]
switch file {
case "comm":
return []byte(process.comm), nil
case "stat":
pid, _ := strconv.Atoi(splits[len(splits)-2])
parent := pid - 1
return []byte(fmt.Sprintf("%d na R %d 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1", pid, parent)), nil
case "cmdline":
return []byte(process.cmdline), nil
}
return nil, fmt.Errorf("not found")
}
fs_hook.Mock(mockFS)
defer fs_hook.Restore()
want := map[int]process.Process{
3: {PID: 3, PPID: 2, Comm: "curl", Cmdline: "curl google.com", Threads: 1},
@@ -79,7 +84,7 @@ func TestWalker(t *testing.T) {
}
have := map[int]process.Process{}
walker := process.NewWalker("unused")
walker := process.NewWalker("/proc")
err := walker.Walk(func(p process.Process) {
have[p.PID] = p
})

View File

@@ -112,10 +112,11 @@ func probeMain() {
resolver := xfer.NewStaticResolver(targets, clients.Set)
defer resolver.Stop()
endpointReporter := endpoint.NewReporter(hostID, hostName, *spyProcs, *useConntrack)
processCache := process.NewCachingWalker(process.NewWalker(*procRoot))
endpointReporter := endpoint.NewReporter(hostID, hostName, *spyProcs, *useConntrack, processCache)
defer endpointReporter.Stop()
processCache := process.NewCachingWalker(process.NewWalker(*procRoot))
p := probe.New(*spyInterval, *publishInterval, clients)
p.AddTicker(processCache)
p.AddReporter(

View File

@@ -9,6 +9,8 @@ import (
"strings"
"syscall"
"time"
"github.com/weaveworks/scope/common/fs"
)
type mockInode struct{}
@@ -16,7 +18,7 @@ type mockInode struct{}
type dir struct {
mockInode
name string
entries map[string]FS
entries map[string]Entry
stat syscall.Stat_t
}
@@ -28,21 +30,17 @@ type File struct {
FStat syscall.Stat_t
}
// FS is a mock filesystem
type FS interface {
// Entry is an entry in the mock filesystem
type Entry interface {
os.FileInfo
ReadDir(string) ([]os.FileInfo, error)
ReadFile(string) ([]byte, error)
Lstat(string, *syscall.Stat_t) error
Stat(string, *syscall.Stat_t) error
Open(string) (io.ReadWriteCloser, error)
fs.Interface
}
// Dir creates a new directory with the given entries.
func Dir(name string, entries ...FS) FS {
func Dir(name string, entries ...Entry) Entry {
result := dir{
name: name,
entries: map[string]FS{},
entries: map[string]Entry{},
}
for _, entry := range entries {