From 314af5ca89dc8fa247ea1d3d31711c686929ea42 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 18 Jun 2015 08:50:27 +0000 Subject: [PATCH] Improve probe docker code quality & test coverage. - Move docker probe code into it's own module - Put PIDTree behind and interface for mocking - Disaggregate dockerTagger into a registry, tagger and reporter - Similarly disaggregate tests - Add mocks for docker container and registry - Add test for docker events & stats --- Makefile | 2 +- .../container.go} | 144 +++++-- probe/docker/container_test.go | 67 ++++ probe/docker/registry.go | 288 ++++++++++++++ probe/docker/registry_test.go | 241 ++++++++++++ probe/docker/reporter.go | 66 ++++ probe/docker/reporter_test.go | 79 ++++ probe/docker/tagger.go | 87 +++++ probe/docker/tagger_test.go | 60 +++ probe/main.go | 23 +- probe/tag/docker_tagger.go | 364 ------------------ probe/tag/docker_tagger_test.go | 123 ------ probe/tag/origin_host_tagger.go | 4 +- probe/tag/origin_host_tagger_test.go | 3 +- probe/tag/pidtree.go | 67 ++-- probe/tag/pidtree_test.go | 10 +- probe/tag/tagger.go | 19 +- probe/tag/topology_tagger.go | 4 +- probe/tag/topology_tagger_test.go | 3 +- probe/tag/weave_tagger.go | 4 +- 20 files changed, 1062 insertions(+), 596 deletions(-) rename probe/{tag/docker_container.go => docker/container.go} (57%) create mode 100644 probe/docker/container_test.go create mode 100644 probe/docker/registry.go create mode 100644 probe/docker/registry_test.go create mode 100644 probe/docker/reporter.go create mode 100644 probe/docker/reporter_test.go create mode 100644 probe/docker/tagger.go create mode 100644 probe/docker/tagger_test.go delete mode 100644 probe/tag/docker_tagger.go delete mode 100644 probe/tag/docker_tagger_test.go diff --git a/Makefile b/Makefile index 7a17c389d..9e837cba1 100644 --- a/Makefile +++ b/Makefile @@ -23,7 +23,7 @@ $(SCOPE_EXPORT): $(APP_EXE) $(PROBE_EXE) docker/* $(APP_EXE): app/*.go render/*.go report/*.go xfer/*.go -$(PROBE_EXE): probe/*.go probe/tag/*.go report/*.go xfer/*.go +$(PROBE_EXE): probe/*.go probe/tag/*.go probe/docker/*.go report/*.go xfer/*.go $(APP_EXE) $(PROBE_EXE): go get -tags netgo ./$(@D) diff --git a/probe/tag/docker_container.go b/probe/docker/container.go similarity index 57% rename from probe/tag/docker_container.go rename to probe/docker/container.go index de3116954..bb8a26017 100644 --- a/probe/tag/docker_container.go +++ b/probe/docker/container.go @@ -1,6 +1,7 @@ -package tag +package docker import ( + "bufio" "encoding/json" "fmt" "io" @@ -10,9 +11,12 @@ import ( "net/http/httputil" "net/url" "strconv" + "strings" "sync" docker "github.com/fsouza/go-dockerclient" + + "github.com/weaveworks/scope/report" ) // These constants are keys used in node metadata @@ -39,51 +43,102 @@ const ( CPUSystemCPUUsage = "cpu_system_cpu_usage" ) -type dockerContainer struct { - sync.RWMutex - *docker.Container +// Exported for testing +var ( + DialStub = net.Dial + NewClientConnStub = newClientConn +) - statsConn *httputil.ClientConn +func newClientConn(c net.Conn, r *bufio.Reader) ClientConn { + return httputil.NewClientConn(c, r) +} + +// ClientConn is exported for testing +type ClientConn interface { + Do(req *http.Request) (resp *http.Response, err error) + Close() error +} + +// Container represents a docker container +type Container interface { + ID() string + Image() string + PID() int + GetNodeMetadata() report.NodeMetadata + + StartGatheringStats() error + StopGatheringStats() +} + +type container struct { + sync.RWMutex + container *docker.Container + statsConn ClientConn latestStats *docker.Stats } -// called whilst holding t.Lock() for writes -func (c *dockerContainer) startGatheringStats(containerID string) error { +// NewContainer creates a new Container +func NewContainer(c *docker.Container) Container { + return &container{container: c} +} + +func (c *container) ID() string { + return c.container.ID +} + +func (c *container) Image() string { + return c.container.Image +} + +func (c *container) PID() int { + return c.container.State.Pid +} + +func (c *container) StartGatheringStats() error { + c.Lock() + defer c.Unlock() + if c.statsConn != nil { - return fmt.Errorf("already gather stats for container %s", containerID) + return fmt.Errorf("already gather stats for container %s", c.container.ID) } - log.Printf("docker mapper: collecting stats for %s", containerID) - req, err := http.NewRequest("GET", fmt.Sprintf("/containers/%s/stats", containerID), nil) - if err != nil { - return err - } - req.Header.Set("User-Agent", "weavescope") - - url, err := url.Parse(endpoint) - if err != nil { - return err - } - - dial, err := net.Dial(url.Scheme, url.Path) - if err != nil { - return err - } - - conn := httputil.NewClientConn(dial, nil) - resp, err := conn.Do(req) - if err != nil { - return err - } - - c.statsConn = conn - go func() { + log.Printf("docker container: collecting stats for %s", c.container.ID) + req, err := http.NewRequest("GET", fmt.Sprintf("/containers/%s/stats", c.container.ID), nil) + if err != nil { + log.Printf("docker container: %v", err) + return + } + req.Header.Set("User-Agent", "weavescope") + + url, err := url.Parse(endpoint) + if err != nil { + log.Printf("docker container: %v", err) + return + } + + dial, err := net.Dial(url.Scheme, url.Path) + if err != nil { + log.Printf("docker container: %v", err) + return + } + + conn := NewClientConnStub(dial, nil) + resp, err := conn.Do(req) + if err != nil { + log.Printf("docker container: %v", err) + return + } + + c.Lock() + c.statsConn = conn + c.Unlock() + defer func() { c.Lock() defer c.Unlock() - log.Printf("docker mapper: stopped collecting stats for %s", containerID) + log.Printf("docker container: stopped collecting stats for %s", c.container.ID) c.statsConn = nil c.latestStats = nil }() @@ -93,7 +148,7 @@ func (c *dockerContainer) startGatheringStats(containerID string) error { for err := decoder.Decode(&stats); err != io.EOF; err = decoder.Decode(&stats) { if err != nil { - log.Printf("docker mapper: error reading event %v", err) + log.Printf("docker container: error reading event %v", err) return } @@ -109,7 +164,7 @@ func (c *dockerContainer) startGatheringStats(containerID string) error { } // called whilst holding t.Lock() -func (c *dockerContainer) stopGatheringStats(containerID string) { +func (c *container) StopGatheringStats() { c.Lock() defer c.Unlock() @@ -124,15 +179,21 @@ func (c *dockerContainer) stopGatheringStats(containerID string) { } // called whilst holding t.RLock() -func (c *dockerContainer) getStats() map[string]string { +func (c *container) GetNodeMetadata() report.NodeMetadata { c.RLock() defer c.RUnlock() - if c.latestStats == nil { - return map[string]string{} + result := report.NodeMetadata{ + ContainerID: c.ID(), + ContainerName: strings.TrimPrefix(c.container.Name, "/"), + ImageID: c.container.Image, } - return map[string]string{ + if c.latestStats == nil { + return result + } + + result.Merge(report.NodeMetadata{ NetworkRxDropped: strconv.FormatUint(c.latestStats.Network.RxDropped, 10), NetworkRxBytes: strconv.FormatUint(c.latestStats.Network.RxBytes, 10), NetworkRxErrors: strconv.FormatUint(c.latestStats.Network.RxErrors, 10), @@ -152,5 +213,6 @@ func (c *dockerContainer) getStats() map[string]string { CPUTotalUsage: strconv.FormatUint(c.latestStats.CPUStats.CPUUsage.TotalUsage, 10), CPUUsageInKernelmode: strconv.FormatUint(c.latestStats.CPUStats.CPUUsage.UsageInKernelmode, 10), CPUSystemCPUUsage: strconv.FormatUint(c.latestStats.CPUStats.SystemCPUUsage, 10), - } + }) + return result } diff --git a/probe/docker/container_test.go b/probe/docker/container_test.go new file mode 100644 index 000000000..160e85c92 --- /dev/null +++ b/probe/docker/container_test.go @@ -0,0 +1,67 @@ +package docker_test + +import ( + "bufio" + "encoding/json" + "io" + "net" + "net/http" + "runtime" + "testing" + + client "github.com/fsouza/go-dockerclient" + "github.com/weaveworks/scope/probe/docker" +) + +type mockConnection struct { + reader *io.PipeReader +} + +func (c *mockConnection) Do(req *http.Request) (resp *http.Response, err error) { + return &http.Response{ + Body: c.reader, + }, nil +} + +func (c *mockConnection) Close() error { + return c.reader.Close() +} + +func TestContainer(t *testing.T) { + oldDialStub, oldNewClientConnStub := docker.DialStub, docker.NewClientConnStub + defer func() { docker.DialStub, docker.NewClientConnStub = oldDialStub, oldNewClientConnStub }() + + docker.DialStub = func(network, address string) (net.Conn, error) { + return nil, nil + } + + reader, writer := io.Pipe() + connection := &mockConnection{reader} + + docker.NewClientConnStub = func(c net.Conn, r *bufio.Reader) docker.ClientConn { + return connection + } + + c := docker.NewContainer(container1) + err := c.StartGatheringStats() + if err != nil { + t.Errorf("%v", err) + } + defer c.StopGatheringStats() + runtime.Gosched() // wait for StartGatheringStats goroutine to call connection.Do + + // Send some stats to the docker container + stats := &client.Stats{} + stats.MemoryStats.Usage = 12345 + err = json.NewEncoder(writer).Encode(&stats) + if err != nil { + t.Errorf("%v", err) + } + runtime.Gosched() // wait for StartGatheringStats goroutine to receive the stats + + // Now see if we go them + nmd := c.GetNodeMetadata() + if nmd[docker.MemoryUsage] != "12345" { + t.Errorf("want 12345, got %s", nmd[docker.MemoryUsage]) + } +} diff --git a/probe/docker/registry.go b/probe/docker/registry.go new file mode 100644 index 000000000..0fad353a0 --- /dev/null +++ b/probe/docker/registry.go @@ -0,0 +1,288 @@ +package docker + +import ( + "log" + "sync" + "time" + + docker_client "github.com/fsouza/go-dockerclient" +) + +// Consts exported for testing. +const ( + StartEvent = "start" + DieEvent = "die" + endpoint = "unix:///var/run/docker.sock" +) + +// Vars exported for testing. +var ( + NewDockerClientStub = newDockerClient + NewContainerStub = NewContainer +) + +// Registry keeps track of running docker containers and their images +type Registry interface { + Stop() + LockedPIDLookup(f func(func(int) Container)) + WalkContainers(f func(Container)) + WalkImages(f func(*docker_client.APIImages)) +} + +type registry struct { + sync.RWMutex + quit chan chan struct{} + interval time.Duration + client Client + + containers map[string]Container + containersByPID map[int]Container + images map[string]*docker_client.APIImages +} + +// Client interface for mocking. +type Client interface { + ListContainers(docker_client.ListContainersOptions) ([]docker_client.APIContainers, error) + InspectContainer(string) (*docker_client.Container, error) + ListImages(docker_client.ListImagesOptions) ([]docker_client.APIImages, error) + AddEventListener(chan<- *docker_client.APIEvents) error + RemoveEventListener(chan *docker_client.APIEvents) error +} + +func newDockerClient(endpoint string) (Client, error) { + return docker_client.NewClient(endpoint) +} + +// NewRegistry returns a usable Registry. Don't forget to Stop it. +func NewRegistry(interval time.Duration) (Registry, error) { + client, err := NewDockerClientStub(endpoint) + if err != nil { + return nil, err + } + + r := ®istry{ + containers: map[string]Container{}, + containersByPID: map[int]Container{}, + images: map[string]*docker_client.APIImages{}, + + client: client, + interval: interval, + quit: make(chan chan struct{}), + } + + go r.loop() + return r, nil +} + +// Stop stops the Docker registry's event subscriber. +func (r *registry) Stop() { + ch := make(chan struct{}) + r.quit <- ch + <-ch +} + +func (r *registry) loop() { + for { + // NB listenForEvents blocks. + // Returning false means we should exit. + if !r.listenForEvents() { + return + } + + // Sleep here so we don't hammer the + // logs if docker is down + time.Sleep(r.interval) + } +} + +func (r *registry) listenForEvents() bool { + // First we empty the store lists. + // This ensure any containers that went away inbetween calls to + // listenForEvents don't hang around. + r.reset() + + // Next, start listening for events. We do this before fetching + // the list of containers so we don't miss containers created + // after listing but before listening for events. + events := make(chan *docker_client.APIEvents) + if err := r.client.AddEventListener(events); err != nil { + log.Printf("docker registry: %s", err) + return true + } + defer func() { + if err := r.client.RemoveEventListener(events); err != nil { + log.Printf("docker registry: %s", err) + } + }() + + if err := r.updateContainers(); err != nil { + log.Printf("docker registry: %s", err) + return true + } + + if err := r.updateImages(); err != nil { + log.Printf("docker registry: %s", err) + return true + } + + otherUpdates := time.Tick(r.interval) + for { + select { + case event := <-events: + r.handleEvent(event) + + case <-otherUpdates: + if err := r.updateImages(); err != nil { + log.Printf("docker registry: %s", err) + return true + } + + case ch := <-r.quit: + r.Lock() + defer r.Unlock() + + for _, c := range r.containers { + c.StopGatheringStats() + } + close(ch) + return false + } + } +} + +func (r *registry) reset() { + r.Lock() + defer r.Unlock() + + for _, c := range r.containers { + c.StopGatheringStats() + } + + r.containers = map[string]Container{} + r.containersByPID = map[int]Container{} + r.images = map[string]*docker_client.APIImages{} +} + +func (r *registry) updateContainers() error { + apiContainers, err := r.client.ListContainers(docker_client.ListContainersOptions{All: true}) + if err != nil { + return err + } + + for _, apiContainer := range apiContainers { + if err := r.addContainer(apiContainer.ID); err != nil { + return err + } + } + + return nil +} + +func (r *registry) updateImages() error { + images, err := r.client.ListImages(docker_client.ListImagesOptions{}) + if err != nil { + return err + } + + r.Lock() + defer r.Unlock() + + for i := range images { + image := &images[i] + r.images[image.ID] = image + } + + return nil +} + +func (r *registry) handleEvent(event *docker_client.APIEvents) { + switch event.Status { + case DieEvent: + containerID := event.ID + r.removeContainer(containerID) + + case StartEvent: + containerID := event.ID + if err := r.addContainer(containerID); err != nil { + log.Printf("docker registry: %s", err) + } + } +} + +func (r *registry) addContainer(containerID string) error { + dockerContainer, err := r.client.InspectContainer(containerID) + if err != nil { + // Don't spam the logs if the container was short lived + if _, ok := err.(*docker_client.NoSuchContainer); ok { + return nil + } + return err + } + + if !dockerContainer.State.Running { + // We get events late, and the containers sometimes have already + // stopped. Not an error, so don't return it. + return nil + } + + r.Lock() + defer r.Unlock() + + c := NewContainerStub(dockerContainer) + r.containers[containerID] = c + r.containersByPID[dockerContainer.State.Pid] = c + + return c.StartGatheringStats() +} + +func (r *registry) removeContainer(containerID string) { + r.Lock() + defer r.Unlock() + + container, ok := r.containers[containerID] + if !ok { + return + } + + delete(r.containers, containerID) + delete(r.containersByPID, container.PID()) + container.StopGatheringStats() +} + +// LockedPIDLookup runs f under a read lock, and gives f a function for +// use doing pid->container lookups. +func (r *registry) LockedPIDLookup(f func(func(int) Container)) { + r.RLock() + defer r.RUnlock() + + lookup := func(pid int) Container { + return r.containersByPID[pid] + } + + f(lookup) +} + +// WalkContainers runs f on every running containers the registry knows of. +func (r *registry) WalkContainers(f func(Container)) { + r.RLock() + defer r.RUnlock() + + for _, container := range r.containers { + f(container) + } +} + +// WalkImages runs f on every image of running containers the registry +// knows of. f may be run on the same image more than once. +func (r *registry) WalkImages(f func(*docker_client.APIImages)) { + r.RLock() + defer r.RUnlock() + + // Loop over containers so we only emit images for running containers. + for _, container := range r.containers { + image, ok := r.images[container.Image()] + if ok { + f(image) + } + } +} diff --git a/probe/docker/registry_test.go b/probe/docker/registry_test.go new file mode 100644 index 000000000..bd90b3e3b --- /dev/null +++ b/probe/docker/registry_test.go @@ -0,0 +1,241 @@ +package docker_test + +import ( + "reflect" + "runtime" + "sort" + "sync" + "testing" + "time" + + client "github.com/fsouza/go-dockerclient" + + "github.com/weaveworks/scope/probe/docker" + "github.com/weaveworks/scope/report" + "github.com/weaveworks/scope/test" +) + +type mockContainer struct { + c *client.Container +} + +func (c *mockContainer) ID() string { + return c.c.ID +} + +func (c *mockContainer) PID() int { + return c.c.State.Pid +} + +func (c *mockContainer) Image() string { + return c.c.Image +} + +func (c *mockContainer) StartGatheringStats() error { + return nil +} + +func (c *mockContainer) StopGatheringStats() {} + +func (c *mockContainer) GetNodeMetadata() report.NodeMetadata { + return report.NodeMetadata{ + docker.ContainerID: c.c.ID, + docker.ContainerName: c.c.Name, + docker.ImageID: c.c.Image, + } +} + +type mockDockerClient struct { + sync.Mutex + apiContainers []client.APIContainers + containers map[string]*client.Container + apiImages []client.APIImages + events []chan<- *client.APIEvents +} + +func (m *mockDockerClient) ListContainers(client.ListContainersOptions) ([]client.APIContainers, error) { + return m.apiContainers, nil +} + +func (m *mockDockerClient) InspectContainer(id string) (*client.Container, error) { + return m.containers[id], nil +} + +func (m *mockDockerClient) ListImages(client.ListImagesOptions) ([]client.APIImages, error) { + m.Lock() + defer m.Unlock() + return m.apiImages, nil +} + +func (m *mockDockerClient) AddEventListener(events chan<- *client.APIEvents) error { + m.Lock() + defer m.Unlock() + m.events = append(m.events, events) + return nil +} + +func (m *mockDockerClient) RemoveEventListener(events chan *client.APIEvents) error { + m.Lock() + defer m.Unlock() + for i, c := range m.events { + if c == events { + m.events = append(m.events[:i], m.events[i+1:]...) + } + } + return nil +} + +func (m *mockDockerClient) send(event *client.APIEvents) { + m.Lock() + defer m.Unlock() + for _, c := range m.events { + c <- event + } +} + +var ( + container1 = &client.Container{ + ID: "ping", + Name: "pong", + Image: "baz", + State: client.State{Pid: 1, Running: true}, + } + container2 = &client.Container{ + ID: "wiff", + Name: "waff", + Image: "baz", + State: client.State{Pid: 1, Running: true}, + } + apiContainer1 = client.APIContainers{ID: "ping"} + apiImage1 = client.APIImages{ID: "baz", RepoTags: []string{"bang", "not-chosen"}} + mockClient = mockDockerClient{ + apiContainers: []client.APIContainers{apiContainer1}, + containers: map[string]*client.Container{"ping": container1}, + apiImages: []client.APIImages{apiImage1}, + } +) + +func setupStubs(mdc *mockDockerClient, f func()) { + oldDockerClient, oldNewContainer := docker.NewDockerClientStub, docker.NewContainerStub + defer func() { docker.NewDockerClientStub, docker.NewContainerStub = oldDockerClient, oldNewContainer }() + + docker.NewDockerClientStub = func(endpoint string) (docker.Client, error) { + return mdc, nil + } + + docker.NewContainerStub = func(c *client.Container) docker.Container { + return &mockContainer{c} + } + + f() +} + +type containers []docker.Container + +func (c containers) Len() int { return len(c) } +func (c containers) Swap(i, j int) { c[i], c[j] = c[j], c[i] } +func (c containers) Less(i, j int) bool { return c[i].ID() < c[j].ID() } + +func allContainers(r docker.Registry) []docker.Container { + result := []docker.Container{} + r.WalkContainers(func(c docker.Container) { + result = append(result, c) + }) + sort.Sort(containers(result)) + return result +} + +func allImages(r docker.Registry) []*client.APIImages { + result := []*client.APIImages{} + r.WalkImages(func(i *client.APIImages) { + result = append(result, i) + }) + return result +} + +func TestRegistry(t *testing.T) { + mdc := mockClient // take a copy + setupStubs(&mdc, func() { + registry, _ := docker.NewRegistry(10 * time.Second) + defer registry.Stop() + runtime.Gosched() + + { + have := allContainers(registry) + want := []docker.Container{&mockContainer{container1}} + if !reflect.DeepEqual(want, have) { + t.Errorf("%s", test.Diff(want, have)) + } + } + + { + have := allImages(registry) + want := []*client.APIImages{&apiImage1} + if !reflect.DeepEqual(want, have) { + t.Errorf("%s", test.Diff(want, have)) + } + } + }) +} + +func TestRegistryEvents(t *testing.T) { + mdc := mockClient // take a copy + setupStubs(&mdc, func() { + registry, _ := docker.NewRegistry(10 * time.Second) + defer registry.Stop() + runtime.Gosched() + + { + mdc.Lock() + mdc.containers["wiff"] = container2 + mdc.Unlock() + mdc.send(&client.APIEvents{Status: docker.StartEvent, ID: "wiff"}) + runtime.Gosched() + + have := allContainers(registry) + want := []docker.Container{&mockContainer{container1}, &mockContainer{container2}} + if !reflect.DeepEqual(want, have) { + t.Errorf("%s", test.Diff(want, have)) + } + } + + { + mdc.Lock() + delete(mdc.containers, "wiff") + mdc.Unlock() + mdc.send(&client.APIEvents{Status: docker.DieEvent, ID: "wiff"}) + runtime.Gosched() + + have := allContainers(registry) + want := []docker.Container{&mockContainer{container1}} + if !reflect.DeepEqual(want, have) { + t.Errorf("%s", test.Diff(want, have)) + } + } + + { + mdc.Lock() + delete(mdc.containers, "ping") + mdc.Unlock() + mdc.send(&client.APIEvents{Status: docker.DieEvent, ID: "ping"}) + runtime.Gosched() + + have := allContainers(registry) + want := []docker.Container{} + if !reflect.DeepEqual(want, have) { + t.Errorf("%s", test.Diff(want, have)) + } + } + + { + mdc.send(&client.APIEvents{Status: docker.DieEvent, ID: "doesntexist"}) + runtime.Gosched() + + have := allContainers(registry) + want := []docker.Container{} + if !reflect.DeepEqual(want, have) { + t.Errorf("%s", test.Diff(want, have)) + } + } + }) +} diff --git a/probe/docker/reporter.go b/probe/docker/reporter.go new file mode 100644 index 000000000..ac5ea2072 --- /dev/null +++ b/probe/docker/reporter.go @@ -0,0 +1,66 @@ +package docker + +import ( + docker_client "github.com/fsouza/go-dockerclient" + + "github.com/weaveworks/scope/report" +) + +// Keys for use in NodeMetadata +const ( + ContainerName = "docker_container_name" + ImageID = "docker_image_id" + ImageName = "docker_image_name" +) + +// Reporter generate Reports containing Container and ContainerImage topologies +type Reporter struct { + registry Registry + scope string +} + +// NewReporter makes a new Reporter +func NewReporter(registry Registry, scope string) *Reporter { + return &Reporter{ + registry: registry, + scope: scope, + } +} + +// Report generates a Report containing Container and ContainerImage topologies +func (r *Reporter) Report() report.Report { + result := report.MakeReport() + result.Container.Merge(r.containerTopology()) + result.ContainerImage.Merge(r.containerImageTopology()) + return result +} + +func (r *Reporter) containerTopology() report.Topology { + result := report.NewTopology() + + r.registry.WalkContainers(func(c Container) { + nodeID := report.MakeContainerNodeID(r.scope, c.ID()) + result.NodeMetadatas[nodeID] = c.GetNodeMetadata() + }) + + return result +} + +func (r *Reporter) containerImageTopology() report.Topology { + result := report.NewTopology() + + r.registry.WalkImages(func(image *docker_client.APIImages) { + nmd := report.NodeMetadata{ + ImageID: image.ID, + } + + if len(image.RepoTags) > 0 { + nmd[ImageName] = image.RepoTags[0] + } + + nodeID := report.MakeContainerNodeID(r.scope, image.ID) + result.NodeMetadatas[nodeID] = nmd + }) + + return result +} diff --git a/probe/docker/reporter_test.go b/probe/docker/reporter_test.go new file mode 100644 index 000000000..5dfac8d1a --- /dev/null +++ b/probe/docker/reporter_test.go @@ -0,0 +1,79 @@ +package docker_test + +import ( + "reflect" + "testing" + + client "github.com/fsouza/go-dockerclient" + + "github.com/weaveworks/scope/probe/docker" + "github.com/weaveworks/scope/report" + "github.com/weaveworks/scope/test" +) + +type mockRegistry struct { + containersByPID map[int]docker.Container + images map[string]*client.APIImages +} + +func (r *mockRegistry) Stop() {} + +func (r *mockRegistry) LockedPIDLookup(f func(func(int) docker.Container)) { + f(func(pid int) docker.Container { + return r.containersByPID[pid] + }) +} + +func (r *mockRegistry) WalkContainers(f func(docker.Container)) { + for _, c := range r.containersByPID { + f(c) + } +} + +func (r *mockRegistry) WalkImages(f func(*client.APIImages)) { + for _, i := range r.images { + f(i) + } +} + +var ( + mockRegistryInstance = &mockRegistry{ + containersByPID: map[int]docker.Container{ + 1: &mockContainer{container1}, + }, + images: map[string]*client.APIImages{ + "baz": &apiImage1, + }, + } +) + +func TestReporter(t *testing.T) { + want := report.MakeReport() + want.Container = report.Topology{ + Adjacency: report.Adjacency{}, + EdgeMetadatas: report.EdgeMetadatas{}, + NodeMetadatas: report.NodeMetadatas{ + report.MakeContainerNodeID("", "ping"): report.NodeMetadata{ + docker.ContainerID: "ping", + docker.ContainerName: "pong", + docker.ImageID: "baz", + }, + }, + } + want.ContainerImage = report.Topology{ + Adjacency: report.Adjacency{}, + EdgeMetadatas: report.EdgeMetadatas{}, + NodeMetadatas: report.NodeMetadatas{ + report.MakeContainerNodeID("", "baz"): report.NodeMetadata{ + docker.ImageID: "baz", + docker.ImageName: "bang", + }, + }, + } + + reporter := docker.NewReporter(mockRegistryInstance, "") + have := reporter.Report() + if !reflect.DeepEqual(want, have) { + t.Errorf("%s", test.Diff(want, have)) + } +} diff --git a/probe/docker/tagger.go b/probe/docker/tagger.go new file mode 100644 index 000000000..12411b349 --- /dev/null +++ b/probe/docker/tagger.go @@ -0,0 +1,87 @@ +package docker + +import ( + "strconv" + + "github.com/weaveworks/scope/probe/tag" + "github.com/weaveworks/scope/report" +) + +// These constants are keys used in node metadata +// TODO: use these constants in report/{mapping.go, detailed_node.go} - pending some circular references +const ( + ContainerID = "docker_container_id" +) + +// These vars are exported for testing. +var ( + NewPIDTreeStub = tag.NewPIDTree +) + +// Tagger is a tagger that tags Docker container information to process +// nodes that have a PID. +type Tagger struct { + procRoot string + registry Registry +} + +// NewTagger returns a usable Tagger. +func NewTagger(registry Registry, procRoot string) *Tagger { + return &Tagger{ + registry: registry, + procRoot: procRoot, + } +} + +// Tag implements Tagger. +func (t *Tagger) Tag(r report.Report) (report.Report, error) { + pidTree, err := NewPIDTreeStub(t.procRoot) + if err != nil { + return report.MakeReport(), err + } + t.tag(pidTree, &r.Process) + return r, nil +} + +func (t *Tagger) tag(pidTree tag.PIDTree, topology *report.Topology) { + for nodeID, nodeMetadata := range topology.NodeMetadatas { + pidStr, ok := nodeMetadata["pid"] + if !ok { + continue + } + + pid, err := strconv.ParseUint(pidStr, 10, 64) + if err != nil { + continue + } + + var ( + c Container + candidate = int(pid) + ) + + t.registry.LockedPIDLookup(func(lookup func(int) Container) { + for { + c = lookup(candidate) + if c != nil { + break + } + + candidate, err = pidTree.GetParent(candidate) + if err != nil { + break + } + } + }) + + if c == nil { + continue + } + + md := report.NodeMetadata{ + ContainerID: c.ID(), + } + + topology.NodeMetadatas[nodeID].Merge(md) + } +} diff --git a/probe/docker/tagger_test.go b/probe/docker/tagger_test.go new file mode 100644 index 000000000..dd5c91aac --- /dev/null +++ b/probe/docker/tagger_test.go @@ -0,0 +1,60 @@ +package docker_test + +import ( + "fmt" + "reflect" + "testing" + + "github.com/weaveworks/scope/probe/docker" + "github.com/weaveworks/scope/probe/tag" + "github.com/weaveworks/scope/report" + "github.com/weaveworks/scope/test" +) + +type mockPIDTree struct { + parents map[int]int +} + +func (m *mockPIDTree) GetParent(pid int) (int, error) { + parent, ok := m.parents[pid] + if !ok { + return -1, fmt.Errorf("Not found %d", pid) + } + return parent, nil +} + +func (m *mockPIDTree) ProcessTopology(hostID string) report.Topology { + panic("") +} + +func TestTagger(t *testing.T) { + oldPIDTree := docker.NewPIDTreeStub + defer func() { docker.NewPIDTreeStub = oldPIDTree }() + + docker.NewPIDTreeStub = func(procRoot string) (tag.PIDTree, error) { + return &mockPIDTree{map[int]int{2: 1}}, nil + } + + var ( + pid1NodeID = report.MakeProcessNodeID("somehost.com", "1") + pid2NodeID = report.MakeProcessNodeID("somehost.com", "2") + wantNodeMetadata = report.NodeMetadata{docker.ContainerID: "ping"} + ) + + input := report.MakeReport() + input.Process.NodeMetadatas[pid1NodeID] = report.NodeMetadata{"pid": "1"} + input.Process.NodeMetadatas[pid2NodeID] = report.NodeMetadata{"pid": "2"} + + want := report.MakeReport() + want.Process.NodeMetadatas[pid1NodeID] = report.NodeMetadata{"pid": "1"}.Merge(wantNodeMetadata) + want.Process.NodeMetadatas[pid2NodeID] = report.NodeMetadata{"pid": "2"}.Merge(wantNodeMetadata) + + tagger := docker.NewTagger(mockRegistryInstance, "/irrelevant") + have, err := tagger.Tag(input) + if err != nil { + t.Errorf("%v", err) + } + if !reflect.DeepEqual(want, have) { + t.Errorf("%s", test.Diff(want, have)) + } +} diff --git a/probe/main.go b/probe/main.go index ea768383a..ca42f6fa2 100644 --- a/probe/main.go +++ b/probe/main.go @@ -14,6 +14,7 @@ import ( "time" "github.com/weaveworks/procspy" + "github.com/weaveworks/scope/probe/docker" "github.com/weaveworks/scope/probe/tag" "github.com/weaveworks/scope/report" "github.com/weaveworks/scope/xfer" @@ -73,18 +74,21 @@ func main() { ) var ( - dockerTagger *tag.DockerTagger - weaveTagger *tag.WeaveTagger + weaveTagger *tag.WeaveTagger ) + taggers := []tag.Tagger{tag.NewTopologyTagger(), tag.NewOriginHostTagger(hostID)} + reporters := []tag.Reporter{} + if *dockerEnabled && runtime.GOOS == linux { - var err error - dockerTagger, err = tag.NewDockerTagger(*procRoot, *dockerInterval) + dockerRegistry, err := docker.NewRegistry(*dockerInterval) if err != nil { - log.Fatalf("failed to start docker tagger: %v", err) + log.Fatalf("failed to start docker registry: %v", err) } - defer dockerTagger.Stop() - taggers = append(taggers, dockerTagger) + defer dockerRegistry.Stop() + + taggers = append(taggers, docker.NewTagger(dockerRegistry, *procRoot)) + reporters = append(reporters, docker.NewReporter(dockerRegistry, hostID)) } if *weaveRouterAddr != "" { @@ -128,9 +132,8 @@ func main() { } } - if dockerTagger != nil { - r.Container.Merge(dockerTagger.ContainerTopology(hostID)) - r.ContainerImage.Merge(dockerTagger.ContainerImageTopology(hostID)) + for _, reporter := range reporters { + r.Merge(reporter.Report()) } if weaveTagger != nil { diff --git a/probe/tag/docker_tagger.go b/probe/tag/docker_tagger.go deleted file mode 100644 index 1e32e3b62..000000000 --- a/probe/tag/docker_tagger.go +++ /dev/null @@ -1,364 +0,0 @@ -package tag - -import ( - "fmt" - "log" - "strconv" - "strings" - "sync" - "time" - - docker "github.com/fsouza/go-dockerclient" - "github.com/weaveworks/scope/report" -) - -const ( - start = "start" - die = "die" - endpoint = "unix:///var/run/docker.sock" -) - -// These constants are keys used in node metadata -// TODO: use these constants in report/{mapping.go, detailed_node.go} - pending some circular references -const ( - ContainerID = "docker_container_id" - ContainerName = "docker_container_name" - ImageID = "docker_image_id" - ImageName = "docker_image_name" -) - -var ( - newDockerClientStub = newDockerClient - newPIDTreeStub = NewPIDTree -) - -// DockerTagger is a tagger that tags Docker container information to process -// nodes that have a PID. -type DockerTagger struct { - sync.RWMutex - quit chan struct{} - interval time.Duration - client dockerClient - - containers map[string]*dockerContainer - containersByPID map[int]*dockerContainer - images map[string]*docker.APIImages - - procRoot string - pidTree *PIDTree -} - -// Sub-interface for mocking. -type dockerClient interface { - ListContainers(docker.ListContainersOptions) ([]docker.APIContainers, error) - InspectContainer(string) (*docker.Container, error) - ListImages(docker.ListImagesOptions) ([]docker.APIImages, error) - AddEventListener(chan<- *docker.APIEvents) error - RemoveEventListener(chan *docker.APIEvents) error -} - -func newDockerClient(endpoint string) (dockerClient, error) { - return docker.NewClient(endpoint) -} - -// NewDockerTagger returns a usable DockerTagger. Don't forget to Stop it. -func NewDockerTagger(procRoot string, interval time.Duration) (*DockerTagger, error) { - pidTree, err := newPIDTreeStub(procRoot) - if err != nil { - return nil, err - } - - t := DockerTagger{ - containers: map[string]*dockerContainer{}, - containersByPID: map[int]*dockerContainer{}, - images: map[string]*docker.APIImages{}, - - procRoot: procRoot, - pidTree: pidTree, - - interval: interval, - quit: make(chan struct{}), - } - - go t.loop() - return &t, nil -} - -// Stop stops the Docker tagger's event subscriber. -func (t *DockerTagger) Stop() { - close(t.quit) -} - -func (t *DockerTagger) loop() { - if !t.update() { - return - } - - ticker := time.Tick(t.interval) - for { - select { - case <-ticker: - if !t.update() { - return - } - - case <-t.quit: - return - } - } -} - -func (t *DockerTagger) update() bool { - client, err := newDockerClientStub(endpoint) - if err != nil { - log.Printf("docker mapper: %s", err) - return true - } - t.client = client - - events := make(chan *docker.APIEvents) - if err := client.AddEventListener(events); err != nil { - log.Printf("docker mapper: %s", err) - return true - } - defer func() { - if err := client.RemoveEventListener(events); err != nil { - log.Printf("docker mapper: %s", err) - } - }() - - if err := t.updateContainers(); err != nil { - log.Printf("docker mapper: %s", err) - return true - } - - if err := t.updateImages(); err != nil { - log.Printf("docker mapper: %s", err) - return true - } - - otherUpdates := time.Tick(t.interval) - for { - select { - case event := <-events: - t.handleEvent(event) - - case <-otherUpdates: - if err := t.updatePIDTree(); err != nil { - log.Printf("docker mapper: %s", err) - continue - } - - if err := t.updateImages(); err != nil { - log.Printf("docker mapper: %s", err) - continue - } - - case <-t.quit: - return false - } - } -} - -func (t *DockerTagger) updateContainers() error { - apiContainers, err := t.client.ListContainers(docker.ListContainersOptions{All: true}) - if err != nil { - return err - } - - for _, apiContainer := range apiContainers { - if err := t.addContainer(apiContainer.ID); err != nil { - log.Printf("docker mapper: %s", err) - } - } - - return nil -} - -func (t *DockerTagger) updateImages() error { - images, err := t.client.ListImages(docker.ListImagesOptions{}) - if err != nil { - return err - } - - t.Lock() - for i := range images { - image := &images[i] - t.images[image.ID] = image - } - t.Unlock() - - return nil -} - -func (t *DockerTagger) handleEvent(event *docker.APIEvents) { - switch event.Status { - case die: - containerID := event.ID - t.removeContainer(containerID) - - case start: - containerID := event.ID - if err := t.addContainer(containerID); err != nil { - log.Printf("docker mapper: %s", err) - } - } -} - -func (t *DockerTagger) updatePIDTree() error { - pidTree, err := newPIDTreeStub(t.procRoot) - if err != nil { - return err - } - - t.Lock() - t.pidTree = pidTree - t.Unlock() - return nil -} - -func (t *DockerTagger) addContainer(containerID string) error { - container, err := t.client.InspectContainer(containerID) - if err != nil { - // Don't spam the logs if the container was short lived - if _, ok := err.(*docker.NoSuchContainer); !ok { - return err - } - return nil - } - - if !container.State.Running { - return fmt.Errorf("docker mapper: container %s not running", containerID) - } - - t.Lock() - defer t.Unlock() - - dockerContainer := &dockerContainer{Container: container} - - t.containers[containerID] = dockerContainer - t.containersByPID[container.State.Pid] = dockerContainer - - return dockerContainer.startGatheringStats(containerID) -} - -func (t *DockerTagger) removeContainer(containerID string) { - t.Lock() - defer t.Unlock() - - container, ok := t.containers[containerID] - if !ok { - return - } - - delete(t.containers, containerID) - delete(t.containersByPID, container.State.Pid) - container.stopGatheringStats(containerID) -} - -// Containers returns the Containers the DockerTagger knows about. -func (t *DockerTagger) Containers() []*docker.Container { - containers := []*docker.Container{} - - t.RLock() - for _, container := range t.containers { - containers = append(containers, container.Container) - } - t.RUnlock() - - return containers -} - -// Tag implements Tagger. -func (t *DockerTagger) Tag(r report.Report) report.Report { - t.tag(&r.Process) - return r -} - -func (t *DockerTagger) tag(topology *report.Topology) { - for nodeID, nodeMetadata := range topology.NodeMetadatas { - pidStr, ok := nodeMetadata["pid"] - if !ok { - //log.Printf("dockerTagger: %q: no process node ID", id) - continue - } - pid, err := strconv.ParseUint(pidStr, 10, 64) - if err != nil { - //log.Printf("dockerTagger: %q: bad process node PID (%v)", id, err) - continue - } - - var ( - container *dockerContainer - candidate = int(pid) - ) - - t.RLock() - for { - container, ok = t.containersByPID[candidate] - if ok { - break - } - candidate, err = t.pidTree.getParent(candidate) - if err != nil { - break - } - } - t.RUnlock() - - if !ok { - continue - } - - md := report.NodeMetadata{ - ContainerID: container.ID, - } - - topology.NodeMetadatas[nodeID].Merge(md) - } -} - -// ContainerTopology produces a Toplogy of Containers -func (t *DockerTagger) ContainerTopology(scope string) report.Topology { - t.RLock() - defer t.RUnlock() - - result := report.NewTopology() - for _, container := range t.containers { - nmd := report.NodeMetadata{ - ContainerID: container.ID, - ContainerName: strings.TrimPrefix(container.Name, "/"), - ImageID: container.Image, - } - - nmd.Merge(container.getStats()) - - nodeID := report.MakeContainerNodeID(scope, container.ID) - result.NodeMetadatas[nodeID] = nmd - } - return result -} - -// ContainerImageTopology produces a Toplogy of Container Images -func (t *DockerTagger) ContainerImageTopology(scope string) report.Topology { - t.RLock() - defer t.RUnlock() - - result := report.NewTopology() - - // Loop over containers so we only emit images for running containers. - for _, container := range t.containers { - nmd := report.NodeMetadata{ - ImageID: container.Image, - } - - image, ok := t.images[container.Image] - if ok && len(image.RepoTags) > 0 { - nmd[ImageName] = image.RepoTags[0] - } - - nodeID := report.MakeContainerNodeID(scope, container.Image) - result.NodeMetadatas[nodeID] = nmd - } - return result -} diff --git a/probe/tag/docker_tagger_test.go b/probe/tag/docker_tagger_test.go deleted file mode 100644 index 85572701e..000000000 --- a/probe/tag/docker_tagger_test.go +++ /dev/null @@ -1,123 +0,0 @@ -package tag - -import ( - "reflect" - "runtime" - "testing" - "time" - - docker "github.com/fsouza/go-dockerclient" - "github.com/weaveworks/scope/report" - "github.com/weaveworks/scope/test" -) - -type mockDockerClient struct { - apiContainers []docker.APIContainers - containers map[string]*docker.Container - apiImages []docker.APIImages -} - -func (m mockDockerClient) ListContainers(docker.ListContainersOptions) ([]docker.APIContainers, error) { - return m.apiContainers, nil -} - -func (m mockDockerClient) InspectContainer(id string) (*docker.Container, error) { - return m.containers[id], nil -} - -func (m mockDockerClient) ListImages(docker.ListImagesOptions) ([]docker.APIImages, error) { - return m.apiImages, nil -} - -func (m mockDockerClient) AddEventListener(events chan<- *docker.APIEvents) error { - return nil -} - -func (m mockDockerClient) RemoveEventListener(events chan *docker.APIEvents) error { - return nil -} - -func TestDockerTagger(t *testing.T) { - oldPIDTree, oldDockerClient := newPIDTreeStub, newDockerClientStub - defer func() { newPIDTreeStub, newDockerClientStub = oldPIDTree, oldDockerClient }() - - newPIDTreeStub = func(procRoot string) (*PIDTree, error) { - pid1 := &Process{PID: 1} - pid2 := &Process{PID: 2, PPID: 1, parent: pid1} - pid1.children = []*Process{pid2} - return &PIDTree{ - processes: map[int]*Process{ - 1: pid1, 2: pid2, - }, - }, nil - } - - newDockerClientStub = func(endpoint string) (dockerClient, error) { - return mockDockerClient{ - apiContainers: []docker.APIContainers{{ID: "foo"}}, - containers: map[string]*docker.Container{ - "foo": { - ID: "foo", - Name: "bar", - Image: "baz", - State: docker.State{Pid: 1, Running: true}, - }, - }, - apiImages: []docker.APIImages{{ID: "baz", RepoTags: []string{"bang", "not-chosen"}}}, - }, nil - } - - var ( - pid1NodeID = report.MakeProcessNodeID("somehost.com", "1") - pid2NodeID = report.MakeProcessNodeID("somehost.com", "2") - processNodeMetadata = report.NodeMetadata{ - ContainerID: "foo", - } - wantContainerTopology = report.Topology{ - Adjacency: report.Adjacency{}, - EdgeMetadatas: report.EdgeMetadatas{}, - NodeMetadatas: report.NodeMetadatas{ - report.MakeContainerNodeID("", "foo"): report.NodeMetadata{ - ContainerID: "foo", - ContainerName: "bar", - ImageID: "baz", - }, - }, - } - wantContainerImageTopology = report.Topology{ - Adjacency: report.Adjacency{}, - EdgeMetadatas: report.EdgeMetadatas{}, - NodeMetadatas: report.NodeMetadatas{ - report.MakeContainerNodeID("", "baz"): report.NodeMetadata{ - ImageID: "baz", - ImageName: "bang", - }, - }, - } - ) - - r := report.MakeReport() - r.Process.NodeMetadatas[pid1NodeID] = report.NodeMetadata{"pid": "1"} - r.Process.NodeMetadatas[pid2NodeID] = report.NodeMetadata{"pid": "2"} - - dockerTagger, _ := NewDockerTagger("/irrelevant", 10*time.Second) - runtime.Gosched() - for _, nodeID := range []string{pid1NodeID, pid2NodeID} { - want := processNodeMetadata.Copy() - have := dockerTagger.Tag(r).Process.NodeMetadatas[nodeID].Copy() - delete(have, "pid") - if !reflect.DeepEqual(want, have) { - t.Errorf("%q: want %+v, have %+v", nodeID, want, have) - } - } - - haveContainerTopology := dockerTagger.ContainerTopology("") - if !reflect.DeepEqual(wantContainerTopology, haveContainerTopology) { - t.Errorf("%s", test.Diff(wantContainerTopology, haveContainerTopology)) - } - - haveContainerImageTopology := dockerTagger.ContainerImageTopology("") - if !reflect.DeepEqual(wantContainerImageTopology, haveContainerImageTopology) { - t.Errorf("%s", test.Diff(wantContainerImageTopology, haveContainerImageTopology)) - } -} diff --git a/probe/tag/origin_host_tagger.go b/probe/tag/origin_host_tagger.go index 6d3f59b28..6034d752a 100644 --- a/probe/tag/origin_host_tagger.go +++ b/probe/tag/origin_host_tagger.go @@ -12,12 +12,12 @@ func NewOriginHostTagger(hostID string) Tagger { return &originHostTagger{hostNodeID: report.MakeHostNodeID(hostID)} } -func (t originHostTagger) Tag(r report.Report) report.Report { +func (t originHostTagger) Tag(r report.Report) (report.Report, error) { for _, topology := range r.Topologies() { md := report.NodeMetadata{report.HostNodeID: t.hostNodeID} for nodeID := range topology.NodeMetadatas { topology.NodeMetadatas[nodeID].Merge(md) } } - return r + return r, nil } diff --git a/probe/tag/origin_host_tagger_test.go b/probe/tag/origin_host_tagger_test.go index a53ec8a3c..2212814e6 100644 --- a/probe/tag/origin_host_tagger_test.go +++ b/probe/tag/origin_host_tagger_test.go @@ -18,7 +18,8 @@ func TestOriginHostTagger(t *testing.T) { r := report.MakeReport() r.Endpoint.NodeMetadatas[endpointNodeID] = nodeMetadata want := nodeMetadata.Merge(report.NodeMetadata{report.HostNodeID: report.MakeHostNodeID(hostID)}) - have := tag.NewOriginHostTagger(hostID).Tag(r).Endpoint.NodeMetadatas[endpointNodeID].Copy() + rpt, _ := tag.NewOriginHostTagger(hostID).Tag(r) + have := rpt.Endpoint.NodeMetadatas[endpointNodeID].Copy() if !reflect.DeepEqual(want, have) { t.Errorf("\nwant %+v\nhave %+v", want, have) } diff --git a/probe/tag/pidtree.go b/probe/tag/pidtree.go index 2ca59bbe8..69fceedb6 100644 --- a/probe/tag/pidtree.go +++ b/probe/tag/pidtree.go @@ -11,16 +11,21 @@ import ( ) // PIDTree represents all processes on the machine. -type PIDTree struct { - processes map[int]*Process +type PIDTree interface { + GetParent(pid int) (int, error) + ProcessTopology(hostID string) report.Topology +} + +type pidTree struct { + processes map[int]*process } // Process represents a single process. -type Process struct { - PID, PPID int - Comm string - parent *Process - children []*Process +type process struct { + pid, ppid int + comm string + parent *process + children []*process } // Hooks for mocking @@ -30,13 +35,13 @@ var ( ) // NewPIDTree returns a new PIDTree that can be polled. -func NewPIDTree(procRoot string) (*PIDTree, error) { +func NewPIDTree(procRoot string) (PIDTree, error) { dirEntries, err := readDir(procRoot) if err != nil { return nil, err } - pt := PIDTree{processes: map[int]*Process{}} + pt := pidTree{processes: map[int]*process{}} for _, dirEntry := range dirEntries { filename := dirEntry.Name() pid, err := strconv.Atoi(filename) @@ -59,15 +64,15 @@ func NewPIDTree(procRoot string) (*PIDTree, error) { comm = string(commBuf) } - pt.processes[pid] = &Process{ - PID: pid, - PPID: ppid, - Comm: comm, + pt.processes[pid] = &process{ + pid: pid, + ppid: ppid, + comm: comm, } } for _, child := range pt.processes { - parent, ok := pt.processes[child.PPID] + parent, ok := pt.processes[child.ppid] if !ok { // This can happen as listing proc is not a consistent snapshot continue @@ -79,48 +84,28 @@ func NewPIDTree(procRoot string) (*PIDTree, error) { return &pt, nil } -func (pt *PIDTree) getParent(pid int) (int, error) { +// GetParent returns the pid of the parent process for a given pid +func (pt *pidTree) GetParent(pid int) (int, error) { proc, ok := pt.processes[pid] if !ok { return -1, fmt.Errorf("PID %d not found", pid) } - return proc.PPID, nil -} - -// allChildren returns a flattened list of child pids including the given pid -func (pt *PIDTree) allChildren(pid int) ([]int, error) { - proc, ok := pt.processes[pid] - if !ok { - return []int{}, fmt.Errorf("PID %d not found", pid) - } - - var result []int - - var f func(*Process) - f = func(p *Process) { - result = append(result, p.PID) - for _, child := range p.children { - f(child) - } - } - - f(proc) - return result, nil + return proc.ppid, nil } // ProcessTopology returns a process topology based on the current state of the PIDTree. -func (pt *PIDTree) ProcessTopology(hostID string) report.Topology { +func (pt *pidTree) ProcessTopology(hostID string) report.Topology { t := report.NewTopology() for pid, proc := range pt.processes { pidstr := strconv.Itoa(pid) nodeID := report.MakeProcessNodeID(hostID, pidstr) t.NodeMetadatas[nodeID] = report.NodeMetadata{ "pid": pidstr, - "comm": proc.Comm, + "comm": proc.comm, } - if proc.PPID > 0 { - t.NodeMetadatas[nodeID]["ppid"] = strconv.Itoa(proc.PPID) + if proc.ppid > 0 { + t.NodeMetadatas[nodeID]["ppid"] = strconv.Itoa(proc.ppid) } } return t diff --git a/probe/tag/pidtree_test.go b/probe/tag/pidtree_test.go index fe4805f07..8f4d39c4a 100644 --- a/probe/tag/pidtree_test.go +++ b/probe/tag/pidtree_test.go @@ -48,16 +48,16 @@ func TestPIDTree(t *testing.T) { return []byte(fmt.Sprintf("%d na R %d", pid, parent)), nil } - pidtree, err := newPIDTreeStub("/proc") + pidtree, err := NewPIDTree("/proc") if err != nil { t.Fatalf("newPIDTree error: %v", err) } - for pid, want := range map[int][]int{ - 1: {1, 2, 3, 4}, - 2: {2, 3, 4}, + for pid, want := range map[int]int{ + 2: 1, + 3: 2, } { - have, err := pidtree.allChildren(pid) + have, err := pidtree.GetParent(pid) if err != nil || !reflect.DeepEqual(want, have) { t.Errorf("%d: want %#v, have %#v (%v)", pid, want, have, err) } diff --git a/probe/tag/tagger.go b/probe/tag/tagger.go index fb1afa4ed..bc7ed9130 100644 --- a/probe/tag/tagger.go +++ b/probe/tag/tagger.go @@ -1,16 +1,29 @@ package tag -import "github.com/weaveworks/scope/report" +import ( + "log" + + "github.com/weaveworks/scope/report" +) // Tagger tags nodes with value-add node metadata. type Tagger interface { - Tag(r report.Report) report.Report + Tag(r report.Report) (report.Report, error) +} + +// Reporter generates Reports. +type Reporter interface { + Report() report.Report } // Apply tags the report with all the taggers. func Apply(r report.Report, taggers []Tagger) report.Report { + var err error for _, tagger := range taggers { - r = tagger.Tag(r) + r, err = tagger.Tag(r) + if err != nil { + log.Printf("error applying tagger: %v", err) + } } return r } diff --git a/probe/tag/topology_tagger.go b/probe/tag/topology_tagger.go index a860ca55f..90b808c89 100644 --- a/probe/tag/topology_tagger.go +++ b/probe/tag/topology_tagger.go @@ -11,7 +11,7 @@ func NewTopologyTagger() Tagger { return &topologyTagger{} } -func (topologyTagger) Tag(r report.Report) report.Report { +func (topologyTagger) Tag(r report.Report) (report.Report, error) { for val, topology := range map[string]*report.Topology{ "endpoint": &(r.Endpoint), "address": &(r.Address), @@ -24,5 +24,5 @@ func (topologyTagger) Tag(r report.Report) report.Report { (*topology).NodeMetadatas[nodeID].Merge(md) } } - return r + return r, nil } diff --git a/probe/tag/topology_tagger_test.go b/probe/tag/topology_tagger_test.go index 28839d20d..a3a4ac9b0 100644 --- a/probe/tag/topology_tagger_test.go +++ b/probe/tag/topology_tagger_test.go @@ -12,7 +12,8 @@ func TestTagMissingID(t *testing.T) { const nodeID = "not-found" r := report.MakeReport() want := report.NodeMetadata{} - have := tag.NewTopologyTagger().Tag(r).Endpoint.NodeMetadatas[nodeID].Copy() + rpt, _ := tag.NewTopologyTagger().Tag(r) + have := rpt.Endpoint.NodeMetadatas[nodeID].Copy() if !reflect.DeepEqual(want, have) { t.Error("TopologyTagger erroneously tagged a missing node ID") } diff --git a/probe/tag/weave_tagger.go b/probe/tag/weave_tagger.go index 1865bc37e..bebd10660 100644 --- a/probe/tag/weave_tagger.go +++ b/probe/tag/weave_tagger.go @@ -41,10 +41,10 @@ func NewWeaveTagger(weaveRouterAddress string) (*WeaveTagger, error) { } // Tag implements Tagger. -func (t WeaveTagger) Tag(r report.Report) report.Report { +func (t WeaveTagger) Tag(r report.Report) (report.Report, error) { // The status-json endpoint doesn't return any link information, so // there's nothing to tag, yet. - return r + return r, nil } // OverlayTopology produces an overlay topology from the Weave router.