diff --git a/probe/docker/container.go b/probe/docker/container.go index 61ce97719..ac16d0377 100644 --- a/probe/docker/container.go +++ b/probe/docker/container.go @@ -56,13 +56,6 @@ const ( CPUUsageInKernelmode = "docker_cpu_usage_in_kernelmode" CPUSystemCPUUsage = "docker_cpu_system_cpu_usage" - StateCreated = "created" - StateDead = "dead" - StateExited = "exited" - StatePaused = "paused" - StateRestarting = "restarting" - StateRunning = "running" - NetworkModeHost = "host" LabelPrefix = "docker_label_" @@ -71,6 +64,18 @@ const ( stopTimeout = 10 ) +// These 'constants' are used for node states. +// We need to take pointers to them, so they are vars... +var ( + StateCreated = "created" + StateDead = "dead" + StateExited = "exited" + StatePaused = "paused" + StateRestarting = "restarting" + StateRunning = "running" + StateDeleted = "deleted" +) + // Exported for testing var ( DialStub = net.Dial @@ -95,7 +100,7 @@ type Container interface { Image() string PID() int Hostname() string - GetNode(string, []net.IP) report.Node + GetNode([]net.IP) report.Node State() string StateString() string HasTTY() bool @@ -111,12 +116,14 @@ type container struct { latestStats docker.Stats pendingStats [20]docker.Stats numPending int + hostID string } // NewContainer creates a new Container -func NewContainer(c *docker.Container) Container { +func NewContainer(c *docker.Container, hostID string) Container { return &container{ container: c, + hostID: hostID, } } @@ -338,10 +345,9 @@ func (c *container) env() map[string]string { return result } -func (c *container) GetNode(hostID string, localAddrs []net.IP) report.Node { +func (c *container) GetNode(localAddrs []net.IP) report.Node { c.RLock() defer c.RUnlock() - ips := c.container.NetworkSettings.SecondaryIPAddresses if c.container.NetworkSettings.IPAddress != "" { ips = append(ips, c.container.NetworkSettings.IPAddress) @@ -349,7 +355,7 @@ func (c *container) GetNode(hostID string, localAddrs []net.IP) report.Node { // Treat all Docker IPs as local scoped. ipsWithScopes := []string{} for _, ip := range ips { - ipsWithScopes = append(ipsWithScopes, report.MakeScopedAddressNodeID(hostID, ip)) + ipsWithScopes = append(ipsWithScopes, report.MakeScopedAddressNodeID(c.hostID, ip)) } result := report.MakeNodeWith(report.MakeContainerNodeID(c.ID()), map[string]string{ diff --git a/probe/docker/container_test.go b/probe/docker/container_test.go index ec4d427ed..3c9f88b4f 100644 --- a/probe/docker/container_test.go +++ b/probe/docker/container_test.go @@ -51,7 +51,8 @@ func TestContainer(t *testing.T) { return connection } - c := docker.NewContainer(container1) + const hostID = "scope" + c := docker.NewContainer(container1, hostID) err := c.StartGatheringStats() if err != nil { t.Errorf("%v", err) @@ -101,7 +102,7 @@ func TestContainer(t *testing.T) { ) test.Poll(t, 100*time.Millisecond, want, func() interface{} { - node := c.GetNode("scope", []net.IP{}) + node := c.GetNode([]net.IP{}) node.Latest.ForEach(func(k, v string) { if v == "0" || v == "" { node.Latest = node.Latest.Delete(k) @@ -116,7 +117,7 @@ func TestContainer(t *testing.T) { if c.PID() != 2 { t.Errorf("%d != 2", c.PID()) } - if have := docker.ExtractContainerIPs(c.GetNode("", []net.IP{})); !reflect.DeepEqual(have, []string{"1.2.3.4"}) { + if have := docker.ExtractContainerIPs(c.GetNode([]net.IP{})); !reflect.DeepEqual(have, []string{"1.2.3.4"}) { t.Errorf("%v != %v", have, []string{"1.2.3.4"}) } } diff --git a/probe/docker/controls_test.go b/probe/docker/controls_test.go index aca28c117..97e3832b3 100644 --- a/probe/docker/controls_test.go +++ b/probe/docker/controls_test.go @@ -16,7 +16,7 @@ import ( func TestControls(t *testing.T) { mdc := newMockClient() setupStubs(mdc, func() { - registry, _ := docker.NewRegistry(10*time.Second, nil, false) + registry, _ := docker.NewRegistry(10*time.Second, nil, false, "") defer registry.Stop() for _, tc := range []struct{ command, result string }{ @@ -56,7 +56,7 @@ func TestPipes(t *testing.T) { mdc := newMockClient() setupStubs(mdc, func() { - registry, _ := docker.NewRegistry(10*time.Second, nil, false) + registry, _ := docker.NewRegistry(10*time.Second, nil, false, "") defer registry.Stop() test.Poll(t, 100*time.Millisecond, true, func() interface{} { diff --git a/probe/docker/registry.go b/probe/docker/registry.go index b839e86e5..45df65968 100644 --- a/probe/docker/registry.go +++ b/probe/docker/registry.go @@ -8,6 +8,7 @@ import ( docker_client "github.com/fsouza/go-dockerclient" "github.com/weaveworks/scope/probe/controls" + "github.com/weaveworks/scope/report" ) // Consts exported for testing. @@ -39,7 +40,7 @@ type Registry interface { } // ContainerUpdateWatcher is the type of functions that get called when containers are updated. -type ContainerUpdateWatcher func(c Container) +type ContainerUpdateWatcher func(report.Node) type registry struct { sync.RWMutex @@ -48,6 +49,7 @@ type registry struct { collectStats bool client Client pipes controls.PipeClient + hostID string watchers []ContainerUpdateWatcher containers map[string]Container @@ -78,7 +80,7 @@ func newDockerClient(endpoint string) (Client, error) { } // NewRegistry returns a usable Registry. Don't forget to Stop it. -func NewRegistry(interval time.Duration, pipes controls.PipeClient, collectStats bool) (Registry, error) { +func NewRegistry(interval time.Duration, pipes controls.PipeClient, collectStats bool, hostID string) (Registry, error) { client, err := NewDockerClientStub(endpoint) if err != nil { return nil, err @@ -93,6 +95,7 @@ func NewRegistry(interval time.Duration, pipes controls.PipeClient, collectStats pipes: pipes, interval: interval, collectStats: collectStats, + hostID: hostID, quit: make(chan chan struct{}), } @@ -214,7 +217,7 @@ func (r *registry) updateContainers() error { } for _, apiContainer := range apiContainers { - r.updateContainerState(apiContainer.ID) + r.updateContainerState(apiContainer.ID, nil) } return nil @@ -240,11 +243,20 @@ func (r *registry) updateImages() error { func (r *registry) handleEvent(event *docker_client.APIEvents) { switch event.Status { case CreateEvent, RenameEvent, StartEvent, DieEvent, DestroyEvent, PauseEvent, UnpauseEvent: - r.updateContainerState(event.ID) + r.updateContainerState(event.ID, stateAfterEvent(event.Status)) } } -func (r *registry) updateContainerState(containerID string) { +func stateAfterEvent(event string) *string { + switch event { + case DestroyEvent: + return &StateDeleted + default: + return nil + } +} + +func (r *registry) updateContainerState(containerID string, intendedState *string) { r.Lock() defer r.Unlock() @@ -267,13 +279,24 @@ func (r *registry) updateContainerState(containerID string) { if r.collectStats { container.StopGatheringStats() } + + if intendedState != nil { + node := report.MakeNodeWith(report.MakeContainerNodeID(containerID), map[string]string{ + ContainerID: containerID, + ContainerState: *intendedState, + }) + // Trigger anyone watching for updates + for _, f := range r.watchers { + f(node) + } + } return } // Container exists, ensure we have it c, ok := r.containers[containerID] if !ok { - c = NewContainerStub(dockerContainer) + c = NewContainerStub(dockerContainer, r.hostID) r.containers[containerID] = c } else { // potentially remove existing pid mapping. @@ -287,8 +310,12 @@ func (r *registry) updateContainerState(containerID string) { } // Trigger anyone watching for updates - for _, f := range r.watchers { - f(c) + localAddrs, err := report.LocalAddresses() + if err != nil { + node := c.GetNode(localAddrs) + for _, f := range r.watchers { + f(node) + } } // And finally, ensure we gather stats for it diff --git a/probe/docker/registry_test.go b/probe/docker/registry_test.go index 2658489c2..4b5609f20 100644 --- a/probe/docker/registry_test.go +++ b/probe/docker/registry_test.go @@ -11,9 +11,11 @@ import ( client "github.com/fsouza/go-dockerclient" + "github.com/weaveworks/scope/common/mtime" "github.com/weaveworks/scope/probe/docker" "github.com/weaveworks/scope/report" "github.com/weaveworks/scope/test" + "github.com/weaveworks/scope/test/reflect" ) type mockContainer struct { @@ -52,7 +54,7 @@ func (c *mockContainer) StartGatheringStats() error { func (c *mockContainer) StopGatheringStats() {} -func (c *mockContainer) GetNode(_ string, _ []net.IP) report.Node { +func (c *mockContainer) GetNode(_ []net.IP) report.Node { return report.MakeNodeWith(report.MakeContainerNodeID(c.c.ID), map[string]string{ docker.ContainerID: c.c.ID, docker.ContainerName: c.c.Name, @@ -237,7 +239,7 @@ func setupStubs(mdc *mockDockerClient, f func()) { return mdc, nil } - docker.NewContainerStub = func(c *client.Container) docker.Container { + docker.NewContainerStub = func(c *client.Container, _ string) docker.Container { return &mockContainer{c} } @@ -270,7 +272,7 @@ func allImages(r docker.Registry) []*client.APIImages { func TestRegistry(t *testing.T) { mdc := newMockClient() setupStubs(mdc, func() { - registry, _ := docker.NewRegistry(10*time.Second, nil, true) + registry, _ := docker.NewRegistry(10*time.Second, nil, true, "") defer registry.Stop() runtime.Gosched() @@ -293,7 +295,7 @@ func TestRegistry(t *testing.T) { func TestLookupByPID(t *testing.T) { mdc := newMockClient() setupStubs(mdc, func() { - registry, _ := docker.NewRegistry(10*time.Second, nil, true) + registry, _ := docker.NewRegistry(10*time.Second, nil, true, "") defer registry.Stop() want := docker.Container(&mockContainer{container1}) @@ -310,7 +312,7 @@ func TestLookupByPID(t *testing.T) { func TestRegistryEvents(t *testing.T) { mdc := newMockClient() setupStubs(mdc, func() { - registry, _ := docker.NewRegistry(10*time.Second, nil, true) + registry, _ := docker.NewRegistry(10*time.Second, nil, true, "") defer registry.Stop() runtime.Gosched() @@ -377,3 +379,57 @@ func TestRegistryEvents(t *testing.T) { } }) } + +func TestRegistryDelete(t *testing.T) { + mtime.NowForce(mtime.Now()) + defer mtime.NowReset() + + mdc := newMockClient() + setupStubs(mdc, func() { + registry, _ := docker.NewRegistry(10*time.Second, nil, true, "") + defer registry.Stop() + runtime.Gosched() + + // Collect all the events. + mtx := sync.Mutex{} + nodes := []report.Node{} + registry.WatchContainerUpdates(func(n report.Node) { + mtx.Lock() + defer mtx.Unlock() + nodes = append(nodes, n) + }) + + check := func(want []docker.Container) { + test.Poll(t, 100*time.Millisecond, want, func() interface{} { + return allContainers(registry) + }) + } + + want := []docker.Container{&mockContainer{container1}} + check(want) + + { + mdc.Lock() + mdc.apiContainers = []client.APIContainers{} + delete(mdc.containers, "ping") + mdc.Unlock() + mdc.send(&client.APIEvents{Status: docker.DestroyEvent, ID: "ping"}) + runtime.Gosched() + + check([]docker.Container{}) + + mtx.Lock() + want := []report.Node{ + report.MakeNodeWith(report.MakeContainerNodeID("ping"), map[string]string{ + docker.ContainerID: "ping", + docker.ContainerState: "deleted", + }), + } + if !reflect.DeepEqual(want, nodes) { + t.Errorf("Didn't get right container updates: %v", test.Diff(want, nodes)) + } + nodes = []report.Node{} + mtx.Unlock() + } + }) +} diff --git a/probe/docker/reporter.go b/probe/docker/reporter.go index 0b4ea8ff2..a0c22e774 100644 --- a/probe/docker/reporter.go +++ b/probe/docker/reporter.go @@ -4,7 +4,6 @@ import ( "net" "strings" - log "github.com/Sirupsen/logrus" docker_client "github.com/fsouza/go-dockerclient" "github.com/weaveworks/scope/probe" @@ -75,17 +74,11 @@ func NewReporter(registry Registry, hostID string, probeID string, probe *probe. func (Reporter) Name() string { return "Docker" } // ContainerUpdated should be called whenever a container is updated. -func (r *Reporter) ContainerUpdated(c Container) { - localAddrs, err := report.LocalAddresses() - if err != nil { - log.Errorf("Error getting local address: %v", err) - return - } - +func (r *Reporter) ContainerUpdated(n report.Node) { // Publish a 'short cut' report container just this container rpt := report.MakeReport() rpt.Shortcut = true - rpt.Container.AddNode(c.GetNode(r.hostID, localAddrs)) + rpt.Container.AddNode(n) r.probe.Publish(rpt) } @@ -146,7 +139,7 @@ func (r *Reporter) containerTopology(localAddrs []net.IP) report.Topology { metadata := map[string]string{report.ControlProbeID: r.probeID} r.registry.WalkContainers(func(c Container) { - result.AddNode(c.GetNode(r.hostID, localAddrs).WithLatests(metadata)) + result.AddNode(c.GetNode(localAddrs).WithLatests(metadata)) }) return result diff --git a/prog/probe.go b/prog/probe.go index fa6b082dc..dc43ba12f 100644 --- a/prog/probe.go +++ b/prog/probe.go @@ -132,7 +132,7 @@ func probeMain(flags probeFlags) { if err := report.AddLocalBridge(flags.dockerBridge); err != nil { log.Errorf("Docker: problem with bridge %s: %v", flags.dockerBridge, err) } - if registry, err := docker.NewRegistry(flags.dockerInterval, clients, true); err == nil { + if registry, err := docker.NewRegistry(flags.dockerInterval, clients, true, hostID); err == nil { defer registry.Stop() p.AddTagger(docker.NewTagger(registry, processCache)) p.AddReporter(docker.NewReporter(registry, hostID, probeID, p)) diff --git a/render/topologies.go b/render/topologies.go index 904f2cde8..9211d65a8 100644 --- a/render/topologies.go +++ b/render/topologies.go @@ -68,38 +68,45 @@ var ProcessNameRenderer = MakeMap( // NB We only want processes in container _or_ processes with network connections // but we need to be careful to ensure we only include each edge once, by only // including the ProcessRenderer once. -var ContainerRenderer = MakeReduce( - MakeSilentFilter( - func(n report.Node) bool { - // Drop unconnected pseudo nodes (could appear due to filtering) - _, isConnected := n.Latest.Lookup(IsConnected) - return n.Topology != Pseudo || isConnected - }, - MakeMap( - MapProcess2Container, - ColorConnected(ProcessRenderer), +var ContainerRenderer = MakeSilentFilter( + func(n report.Node) bool { + // Drop deleted containers + state, ok := n.Latest.Lookup(docker.ContainerState) + return !ok || state != docker.StateDeleted + }, + MakeReduce( + MakeSilentFilter( + func(n report.Node) bool { + // Drop unconnected pseudo nodes (could appear due to filtering) + _, isConnected := n.Latest.Lookup(IsConnected) + return n.Topology != Pseudo || isConnected + }, + MakeMap( + MapProcess2Container, + ColorConnected(ProcessRenderer), + ), ), + + // This mapper brings in short lived connections by joining with container IPs. + // We need to be careful to ensure we only include each edge once. Edges brought in + // by the above renders will have a pid, so its enough to filter out any nodes with + // pids. + SilentFilterUnconnected(MakeMap( + MapIP2Container, + MakeReduce( + MakeMap( + MapContainer2IP, + SelectContainer, + ), + MakeMap( + MapEndpoint2IP, + SelectEndpoint, + ), + ), + )), + + SelectContainer, ), - - // This mapper brings in short lived connections by joining with container IPs. - // We need to be careful to ensure we only include each edge once. Edges brought in - // by the above renders will have a pid, so its enough to filter out any nodes with - // pids. - SilentFilterUnconnected(MakeMap( - MapIP2Container, - MakeReduce( - MakeMap( - MapContainer2IP, - SelectContainer, - ), - MakeMap( - MapEndpoint2IP, - SelectEndpoint, - ), - ), - )), - - SelectContainer, ) type containerWithHostIPsRenderer struct {