From a2adaa25663a130f801f5fa9669bc9369c5b90ce Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 11 Jun 2015 09:56:41 +0000 Subject: [PATCH] Add docker stats to the Container Topology --- probe/tag/docker_container.go | 156 ++++++++++++++++++++++++++++++++++ probe/tag/docker_tagger.go | 147 +++++++++++++++++--------------- 2 files changed, 234 insertions(+), 69 deletions(-) create mode 100644 probe/tag/docker_container.go diff --git a/probe/tag/docker_container.go b/probe/tag/docker_container.go new file mode 100644 index 000000000..de3116954 --- /dev/null +++ b/probe/tag/docker_container.go @@ -0,0 +1,156 @@ +package tag + +import ( + "encoding/json" + "fmt" + "io" + "log" + "net" + "net/http" + "net/http/httputil" + "net/url" + "strconv" + "sync" + + docker "github.com/fsouza/go-dockerclient" +) + +// These constants are keys used in node metadata +// TODO: use these constants in report/{mapping.go, detailed_node.go} - pending some circular references +const ( + NetworkRxDropped = "network_rx_dropped" + NetworkRxBytes = "network_rx_bytes" + NetworkRxErrors = "network_rx_errors" + NetworkTxPackets = "network_tx_packets" + NetworkTxDropped = "network_tx_dropped" + NetworkRxPackets = "network_rx_packets" + NetworkTxErrors = "network_tx_errors" + NetworkTxBytes = "network_tx_bytes" + + MemoryMaxUsage = "memory_max_usage" + MemoryUsage = "memory_usage" + MemoryFailcnt = "memory_failcnt" + MemoryLimit = "memory_limit" + + CPUPercpuUsage = "cpu_per_cpu_usage" + CPUUsageInUsermode = "cpu_usage_in_usermode" + CPUTotalUsage = "cpu_total_usage" + CPUUsageInKernelmode = "cpu_usage_in_kernelmode" + CPUSystemCPUUsage = "cpu_system_cpu_usage" +) + +type dockerContainer struct { + sync.RWMutex + *docker.Container + + statsConn *httputil.ClientConn + latestStats *docker.Stats +} + +// called whilst holding t.Lock() for writes +func (c *dockerContainer) startGatheringStats(containerID string) error { + if c.statsConn != nil { + return fmt.Errorf("already gather stats for container %s", containerID) + } + + log.Printf("docker mapper: collecting stats for %s", containerID) + req, err := http.NewRequest("GET", fmt.Sprintf("/containers/%s/stats", containerID), nil) + if err != nil { + return err + } + req.Header.Set("User-Agent", "weavescope") + + url, err := url.Parse(endpoint) + if err != nil { + return err + } + + dial, err := net.Dial(url.Scheme, url.Path) + if err != nil { + return err + } + + conn := httputil.NewClientConn(dial, nil) + resp, err := conn.Do(req) + if err != nil { + return err + } + + c.statsConn = conn + + go func() { + defer func() { + c.Lock() + defer c.Unlock() + + log.Printf("docker mapper: stopped collecting stats for %s", containerID) + c.statsConn = nil + c.latestStats = nil + }() + + stats := &docker.Stats{} + decoder := json.NewDecoder(resp.Body) + + for err := decoder.Decode(&stats); err != io.EOF; err = decoder.Decode(&stats) { + if err != nil { + log.Printf("docker mapper: error reading event %v", err) + return + } + + c.Lock() + c.latestStats = stats + c.Unlock() + + stats = &docker.Stats{} + } + }() + + return nil +} + +// called whilst holding t.Lock() +func (c *dockerContainer) stopGatheringStats(containerID string) { + c.Lock() + defer c.Unlock() + + if c.statsConn == nil { + return + } + + c.statsConn.Close() + c.statsConn = nil + c.latestStats = nil + return +} + +// called whilst holding t.RLock() +func (c *dockerContainer) getStats() map[string]string { + c.RLock() + defer c.RUnlock() + + if c.latestStats == nil { + return map[string]string{} + } + + return map[string]string{ + NetworkRxDropped: strconv.FormatUint(c.latestStats.Network.RxDropped, 10), + NetworkRxBytes: strconv.FormatUint(c.latestStats.Network.RxBytes, 10), + NetworkRxErrors: strconv.FormatUint(c.latestStats.Network.RxErrors, 10), + NetworkTxPackets: strconv.FormatUint(c.latestStats.Network.TxPackets, 10), + NetworkTxDropped: strconv.FormatUint(c.latestStats.Network.TxDropped, 10), + NetworkRxPackets: strconv.FormatUint(c.latestStats.Network.RxPackets, 10), + NetworkTxErrors: strconv.FormatUint(c.latestStats.Network.TxErrors, 10), + NetworkTxBytes: strconv.FormatUint(c.latestStats.Network.TxBytes, 10), + + MemoryMaxUsage: strconv.FormatUint(c.latestStats.MemoryStats.MaxUsage, 10), + MemoryUsage: strconv.FormatUint(c.latestStats.MemoryStats.Usage, 10), + MemoryFailcnt: strconv.FormatUint(c.latestStats.MemoryStats.Failcnt, 10), + MemoryLimit: strconv.FormatUint(c.latestStats.MemoryStats.Limit, 10), + + // CPUPercpuUsage: strconv.FormatUint(stats.CPUStats.CPUUsage.PercpuUsage, 10), + CPUUsageInUsermode: strconv.FormatUint(c.latestStats.CPUStats.CPUUsage.UsageInUsermode, 10), + CPUTotalUsage: strconv.FormatUint(c.latestStats.CPUStats.CPUUsage.TotalUsage, 10), + CPUUsageInKernelmode: strconv.FormatUint(c.latestStats.CPUStats.CPUUsage.UsageInKernelmode, 10), + CPUSystemCPUUsage: strconv.FormatUint(c.latestStats.CPUStats.SystemCPUUsage, 10), + } +} diff --git a/probe/tag/docker_tagger.go b/probe/tag/docker_tagger.go index 508cde374..7fd3059bc 100644 --- a/probe/tag/docker_tagger.go +++ b/probe/tag/docker_tagger.go @@ -1,6 +1,7 @@ package tag import ( + "fmt" "log" "strconv" "strings" @@ -12,8 +13,9 @@ import ( ) const ( - die = "die" - start = "start" + start = "start" + die = "die" + endpoint = "unix:///var/run/docker.sock" ) // These constants are keys used in node metadata @@ -36,15 +38,29 @@ type DockerTagger struct { sync.RWMutex quit chan struct{} interval time.Duration + client dockerClient - containers map[string]*docker.Container - containersByPID map[int]*docker.Container + containers map[string]*dockerContainer + containersByPID map[int]*dockerContainer images map[string]*docker.APIImages procRoot string pidTree *PIDTree } +// Sub-interface for mocking. +type dockerClient interface { + ListContainers(docker.ListContainersOptions) ([]docker.APIContainers, error) + InspectContainer(string) (*docker.Container, error) + ListImages(docker.ListImagesOptions) ([]docker.APIImages, error) + AddEventListener(chan<- *docker.APIEvents) error + RemoveEventListener(chan *docker.APIEvents) error +} + +func newDockerClient(endpoint string) (dockerClient, error) { + return docker.NewClient(endpoint) +} + // NewDockerTagger returns a usable DockerTagger. Don't forget to Stop it. func NewDockerTagger(procRoot string, interval time.Duration) (*DockerTagger, error) { pidTree, err := newPIDTreeStub(procRoot) @@ -53,8 +69,8 @@ func NewDockerTagger(procRoot string, interval time.Duration) (*DockerTagger, er } t := DockerTagger{ - containers: map[string]*docker.Container{}, - containersByPID: map[int]*docker.Container{}, + containers: map[string]*dockerContainer{}, + containersByPID: map[int]*dockerContainer{}, images: map[string]*docker.APIImages{}, procRoot: procRoot, @@ -92,26 +108,13 @@ func (t *DockerTagger) loop() { } } -// Sub-interface for mocking. -type dockerClient interface { - ListContainers(docker.ListContainersOptions) ([]docker.APIContainers, error) - InspectContainer(string) (*docker.Container, error) - ListImages(docker.ListImagesOptions) ([]docker.APIImages, error) - AddEventListener(chan<- *docker.APIEvents) error - RemoveEventListener(chan *docker.APIEvents) error -} - -func newDockerClient(endpoint string) (dockerClient, error) { - return docker.NewClient(endpoint) -} - func (t *DockerTagger) update() bool { - endpoint := "unix:///var/run/docker.sock" client, err := newDockerClientStub(endpoint) if err != nil { log.Printf("docker mapper: %s", err) return true } + t.client = client events := make(chan *docker.APIEvents) if err := client.AddEventListener(events); err != nil { @@ -124,12 +127,12 @@ func (t *DockerTagger) update() bool { } }() - if err := t.updateContainers(client); err != nil { + if err := t.updateContainers(); err != nil { log.Printf("docker mapper: %s", err) return true } - if err := t.updateImages(client); err != nil { + if err := t.updateImages(); err != nil { log.Printf("docker mapper: %s", err) return true } @@ -138,7 +141,7 @@ func (t *DockerTagger) update() bool { for { select { case event := <-events: - t.handleEvent(event, client) + t.handleEvent(event) case <-otherUpdates: if err := t.updatePIDTree(); err != nil { @@ -146,7 +149,7 @@ func (t *DockerTagger) update() bool { continue } - if err := t.updateImages(client); err != nil { + if err := t.updateImages(); err != nil { log.Printf("docker mapper: %s", err) continue } @@ -157,39 +160,23 @@ func (t *DockerTagger) update() bool { } } -func (t *DockerTagger) updateContainers(client dockerClient) error { - apiContainers, err := client.ListContainers(docker.ListContainersOptions{All: true}) +func (t *DockerTagger) updateContainers() error { + apiContainers, err := t.client.ListContainers(docker.ListContainersOptions{All: true}) if err != nil { return err } - containers := []*docker.Container{} for _, apiContainer := range apiContainers { - container, err := client.InspectContainer(apiContainer.ID) - if err != nil { + if err := t.addContainer(apiContainer.ID); err != nil { log.Printf("docker mapper: %s", err) - continue } - - if !container.State.Running { - continue - } - - containers = append(containers, container) } - t.Lock() - for _, container := range containers { - t.containers[container.ID] = container - t.containersByPID[container.State.Pid] = container - } - t.Unlock() - return nil } -func (t *DockerTagger) updateImages(client dockerClient) error { - images, err := client.ListImages(docker.ListImagesOptions{}) +func (t *DockerTagger) updateImages() error { + images, err := t.client.ListImages(docker.ListImagesOptions{}) if err != nil { return err } @@ -204,36 +191,17 @@ func (t *DockerTagger) updateImages(client dockerClient) error { return nil } -func (t *DockerTagger) handleEvent(event *docker.APIEvents, client dockerClient) { +func (t *DockerTagger) handleEvent(event *docker.APIEvents) { switch event.Status { case die: containerID := event.ID - t.Lock() - if container, ok := t.containers[containerID]; ok { - delete(t.containers, containerID) - delete(t.containersByPID, container.State.Pid) - } else { - log.Printf("docker mapper: container %s not found", containerID) - } - t.Unlock() + t.removeContainer(containerID) case start: containerID := event.ID - container, err := client.InspectContainer(containerID) - if err != nil { + if err := t.addContainer(containerID); err != nil { log.Printf("docker mapper: %s", err) - return } - - if !container.State.Running { - log.Printf("docker mapper: container %s not running", containerID) - return - } - - t.Lock() - t.containers[containerID] = container - t.containersByPID[container.State.Pid] = container - t.Unlock() } } @@ -249,13 +217,52 @@ func (t *DockerTagger) updatePIDTree() error { return nil } +func (t *DockerTagger) addContainer(containerID string) error { + container, err := t.client.InspectContainer(containerID) + if err != nil { + // Don't spam the logs if the container was short lived + if _, ok := err.(*docker.NoSuchContainer); !ok { + return err + } + return nil + } + + if !container.State.Running { + return fmt.Errorf("docker mapper: container %s not running", containerID) + } + + t.Lock() + defer t.Unlock() + + dockerContainer := &dockerContainer{Container: container} + + t.containers[containerID] = dockerContainer + t.containersByPID[container.State.Pid] = dockerContainer + + return dockerContainer.startGatheringStats(containerID) +} + +func (t *DockerTagger) removeContainer(containerID string) { + t.Lock() + defer t.Unlock() + + container, ok := t.containers[containerID] + if !ok { + return + } + + delete(t.containers, containerID) + delete(t.containersByPID, container.State.Pid) + container.stopGatheringStats(containerID) +} + // Containers returns the Containers the DockerTagger knows about. func (t *DockerTagger) Containers() []*docker.Container { containers := []*docker.Container{} t.RLock() for _, container := range t.containers { - containers = append(containers, container) + containers = append(containers, container.Container) } t.RUnlock() @@ -277,7 +284,7 @@ func (t *DockerTagger) Tag(r report.Report) report.Report { } var ( - container *docker.Container + container *dockerContainer candidate = int(pid) ) @@ -335,6 +342,8 @@ func (t *DockerTagger) ContainerTopology(scope string) report.Topology { nmd[ImageName] = image.RepoTags[0] } + nmd.Merge(container.getStats()) + nodeID := report.MakeContainerNodeID(scope, container.ID) result.NodeMetadatas[nodeID] = nmd }