Merge pull request #62 from tomwilkie/docker-process-mapper

A docker process mapper
This commit is contained in:
Tom Wilkie
2015-05-20 13:25:17 +02:00
8 changed files with 362 additions and 5 deletions

View File

@@ -41,7 +41,7 @@ $(APP_EXE) $(PROBE_EXE):
$(FIXPROBE_EXE):
cd experimental/fixprobe && go build
$(SCOPE_EXPORT): $(APP_EXE) $(PROBE_EXE) docker/Dockerfile docker/entrypoint.sh
$(SCOPE_EXPORT): $(APP_EXE) $(PROBE_EXE) docker/*
cp $(APP_EXE) $(PROBE_EXE) docker/
$(SUDO) docker build -t $(SCOPE_IMAGE) docker/
$(SUDO) docker save $(SCOPE_IMAGE):latest > $@

View File

@@ -1,3 +1,3 @@
#!/bin/sh
exec /home/weave/probe
exec /home/weave/probe -proc.root=/hostproc

View File

@@ -0,0 +1,128 @@
package main
import (
"fmt"
"log"
"sync"
"time"
docker "github.com/fsouza/go-dockerclient"
)
type dockerMapper struct {
sync.RWMutex
d map[int]*docker.Container
procRoot string
}
func newDockerMapper(procRoot string, interval time.Duration) *dockerMapper {
m := dockerMapper{
procRoot: procRoot,
d: map[int]*docker.Container{},
}
m.update()
go m.loop(interval)
return &m
}
func (m *dockerMapper) loop(d time.Duration) {
for range time.Tick(d) {
m.update()
}
}
// for mocking
type dockerClient interface {
ListContainers(docker.ListContainersOptions) ([]docker.APIContainers, error)
InspectContainer(string) (*docker.Container, error)
}
func newRealDockerClient(endpoint string) (dockerClient, error) {
return docker.NewClient(endpoint)
}
var (
newDockerClient = newRealDockerClient
newPIDTreeStub = newPIDTree
)
func (m *dockerMapper) update() {
pidTree, err := newPIDTreeStub(m.procRoot)
if err != nil {
log.Printf("docker mapper: %s", err)
return
}
endpoint := "unix:///var/run/docker.sock"
client, err := newDockerClient(endpoint)
if err != nil {
log.Printf("docker mapper: %s", err)
return
}
containers, err := client.ListContainers(docker.ListContainersOptions{All: true})
if err != nil {
log.Printf("docker mapper: %s", err)
return
}
pmap := map[int]*docker.Container{}
for _, container := range containers {
info, err := client.InspectContainer(container.ID)
if err != nil {
log.Printf("docker mapper: %s", err)
continue
}
if !info.State.Running {
continue
}
pids, err := pidTree.allChildren(info.State.Pid)
if err != nil {
log.Printf("docker mapper: %s", err)
continue
}
for _, pid := range pids {
pmap[pid] = info
}
}
m.Lock()
m.d = pmap
m.Unlock()
}
type dockerIDMapper struct {
*dockerMapper
}
func (m dockerIDMapper) Key() string { return "docker_id" }
func (m dockerIDMapper) Map(pid uint) (string, error) {
m.RLock()
container, ok := m.d[int(pid)]
m.RUnlock()
if !ok {
return "", fmt.Errorf("no container found for PID %d", pid)
}
return container.ID, nil
}
type dockerNameMapper struct {
*dockerMapper
}
func (m dockerNameMapper) Key() string { return "docker_name" }
func (m dockerNameMapper) Map(pid uint) (string, error) {
m.RLock()
container, ok := m.d[int(pid)]
m.RUnlock()
if !ok {
return "", fmt.Errorf("no container found for PID %d", pid)
}
return container.Name, nil
}

View File

@@ -0,0 +1,72 @@
package main
import (
"testing"
"time"
docker "github.com/fsouza/go-dockerclient"
)
type mockDockerClient struct {
containers []docker.APIContainers
containerInfo map[string]*docker.Container
}
func (m mockDockerClient) ListContainers(options docker.ListContainersOptions) ([]docker.APIContainers, error) {
return m.containers, nil
}
func (m mockDockerClient) InspectContainer(id string) (*docker.Container, error) {
return m.containerInfo[id], nil
}
func TestDockerProcessMapper(t *testing.T) {
oldPIDTreeStub, oldDockerClientStub := newPIDTreeStub, newDockerClient
defer func() {
newPIDTreeStub = oldPIDTreeStub
newDockerClient = oldDockerClientStub
}()
newPIDTreeStub = func(procRoot string) (*pidTree, error) {
pid1 := &process{pid: 1}
pid2 := &process{pid: 2, ppid: 1, parent: pid1}
pid1.children = []*process{pid2}
return &pidTree{
processes: map[int]*process{
1: pid1, 2: pid2,
},
}, nil
}
newDockerClient = func(endpoint string) (dockerClient, error) {
return mockDockerClient{
containers: []docker.APIContainers{{ID: "foo"}},
containerInfo: map[string]*docker.Container{
"foo": {
ID: "foo",
Name: "bar",
State: docker.State{Pid: 1, Running: true},
},
},
}, nil
}
dockerMapper := newDockerMapper("/proc", 10*time.Second)
dockerIDMapper := dockerIDMapper{dockerMapper}
dockerNameMapper := dockerNameMapper{dockerMapper}
for pid, want := range map[uint]struct{ id, name string }{
1: {"foo", "bar"},
2: {"foo", "bar"},
} {
haveID, err := dockerIDMapper.Map(pid)
if err != nil || want.id != haveID {
t.Errorf("%d: want %q, have %q (%v)", pid, want.id, haveID, err)
}
haveName, err := dockerNameMapper.Map(pid)
if err != nil || want.name != haveName {
t.Errorf("%d: want %q, have %q (%v)", pid, want.name, haveName, err)
}
}
}

View File

@@ -27,7 +27,9 @@ func main() {
prometheusEndpoint = flag.String("prometheus.endpoint", "/metrics", "Prometheus metrics exposition endpoint (requires -http.listen)")
spyProcs = flag.Bool("processes", true, "report processes (needs root)")
cgroupsRoot = flag.String("cgroups.root", "", "if provided, enrich -processes with cgroup names from this root (e.g. /mnt/cgroups)")
cgroupsUpdate = flag.Duration("cgroups.update", 10*time.Second, "how often to update cgroup names")
cgroupsInterval = flag.Duration("cgroups.interval", 10*time.Second, "how often to update cgroup names")
dockerMapper = flag.Bool("docker", true, "collect docker-related attributes for processes.")
dockerInterval = flag.Duration("docker.interval", 10*time.Second, "how often to update docker container info")
procRoot = flag.String("proc.root", "/proc", "location of the proc filesystem")
)
flag.Parse()
@@ -64,12 +66,17 @@ func main() {
if *cgroupsRoot != "" {
if fi, err := os.Stat(*cgroupsRoot); err == nil && fi.IsDir() {
log.Printf("enriching -processes with cgroup names from %s", *cgroupsRoot)
pms = append(pms, newCgroupMapper(*cgroupsRoot, *cgroupsUpdate))
pms = append(pms, newCgroupMapper(*cgroupsRoot, *cgroupsInterval))
} else {
log.Printf("-cgroups.root=%s: %v", *cgroupsRoot, err)
}
}
if *dockerMapper {
docker := newDockerMapper(*procRoot, *dockerInterval)
pms = append(pms, &dockerIDMapper{docker}, &dockerNameMapper{docker})
}
log.Printf("listening on %s", *listen)
go func() {

86
probe/pidtree.go Normal file
View File

@@ -0,0 +1,86 @@
package main
import (
"fmt"
"io/ioutil"
"path"
"strconv"
"strings"
)
type pidTree struct {
processes map[int]*process
}
type process struct {
pid, ppid int
parent *process
children []*process
}
// Hooks for mocking
var (
readDir = ioutil.ReadDir
readFile = ioutil.ReadFile
)
func newPIDTree(procRoot string) (*pidTree, error) {
dirEntries, err := readDir(procRoot)
if err != nil {
return nil, err
}
pt := pidTree{processes: map[int]*process{}}
for _, dirEntry := range dirEntries {
pid, err := strconv.Atoi(dirEntry.Name())
if err != nil {
continue
}
stat, err := readFile(path.Join(procRoot, dirEntry.Name(), "stat"))
if err != nil {
continue
}
splits := strings.Split(string(stat), " ")
ppid, err := strconv.Atoi(splits[3])
if err != nil {
return nil, err
}
pt.processes[pid] = &process{pid: pid, ppid: ppid}
}
for _, child := range pt.processes {
parent, ok := pt.processes[child.ppid]
if !ok {
// This can happen as listing proc is not a consistent snapshot
continue
}
child.parent = parent
parent.children = append(parent.children, child)
}
return &pt, nil
}
// allChildren returns a flattened list of child pids including the given pid
func (pt *pidTree) allChildren(pid int) ([]int, error) {
proc, ok := pt.processes[pid]
if !ok {
return []int{}, fmt.Errorf("PID %d not found", pid)
}
var result []int
var f func(*process)
f = func(p *process) {
result = append(result, p.pid)
for _, child := range p.children {
f(child)
}
}
f(proc)
return result, nil
}

65
probe/pidtree_test.go Normal file
View File

@@ -0,0 +1,65 @@
package main
import (
"fmt"
"os"
"reflect"
"strconv"
"strings"
"testing"
"time"
)
type fileinfo struct {
name string
}
func (f fileinfo) Name() string { return f.name }
func (f fileinfo) Size() int64 { return 0 }
func (f fileinfo) Mode() os.FileMode { return 0 }
func (f fileinfo) ModTime() time.Time { return time.Now() }
func (f fileinfo) IsDir() bool { return true }
func (f fileinfo) Sys() interface{} { return nil }
func TestPIDTree(t *testing.T) {
oldReadDir, oldReadFile := readDir, readFile
defer func() {
readDir = oldReadDir
readFile = oldReadFile
}()
readDir = func(path string) ([]os.FileInfo, error) {
return []os.FileInfo{
fileinfo{"3"}, fileinfo{"2"}, fileinfo{"4"},
fileinfo{"notapid"}, fileinfo{"1"},
}, nil
}
readFile = func(path string) ([]byte, error) {
splits := strings.Split(path, "/")
if splits[len(splits)-1] != "stat" {
return nil, fmt.Errorf("not stat")
}
pid, err := strconv.Atoi(splits[len(splits)-2])
if err != nil {
return nil, err
}
parent := pid - 1
return []byte(fmt.Sprintf("%d na R %d", pid, parent)), nil
}
pidtree, err := newPIDTree("/proc")
if err != nil {
t.Fatalf("newPIDTree error: %v", err)
}
for pid, want := range map[int][]int{
1: {1, 2, 3, 4},
2: {2, 3, 4},
} {
have, err := pidtree.allChildren(pid)
if err != nil || !reflect.DeepEqual(want, have) {
t.Errorf("%d: want %#v, have %#v (%v)", pid, want, have, err)
}
}
}

View File

@@ -87,7 +87,6 @@ func addConnection(
for _, pm := range pms {
v, err := pm.Map(c.PID)
if err != nil {
log.Printf("spy processes: %s", err)
continue
}
md[pm.Key()] = v