Add docker stats to the Container Topology

This commit is contained in:
Tom Wilkie
2015-06-11 09:56:41 +00:00
parent 033b4572ae
commit a2adaa2566
2 changed files with 234 additions and 69 deletions

View File

@@ -0,0 +1,156 @@
package tag
import (
"encoding/json"
"fmt"
"io"
"log"
"net"
"net/http"
"net/http/httputil"
"net/url"
"strconv"
"sync"
docker "github.com/fsouza/go-dockerclient"
)
// These constants are keys used in node metadata
// TODO: use these constants in report/{mapping.go, detailed_node.go} - pending some circular references
const (
NetworkRxDropped = "network_rx_dropped"
NetworkRxBytes = "network_rx_bytes"
NetworkRxErrors = "network_rx_errors"
NetworkTxPackets = "network_tx_packets"
NetworkTxDropped = "network_tx_dropped"
NetworkRxPackets = "network_rx_packets"
NetworkTxErrors = "network_tx_errors"
NetworkTxBytes = "network_tx_bytes"
MemoryMaxUsage = "memory_max_usage"
MemoryUsage = "memory_usage"
MemoryFailcnt = "memory_failcnt"
MemoryLimit = "memory_limit"
CPUPercpuUsage = "cpu_per_cpu_usage"
CPUUsageInUsermode = "cpu_usage_in_usermode"
CPUTotalUsage = "cpu_total_usage"
CPUUsageInKernelmode = "cpu_usage_in_kernelmode"
CPUSystemCPUUsage = "cpu_system_cpu_usage"
)
type dockerContainer struct {
sync.RWMutex
*docker.Container
statsConn *httputil.ClientConn
latestStats *docker.Stats
}
// called whilst holding t.Lock() for writes
func (c *dockerContainer) startGatheringStats(containerID string) error {
if c.statsConn != nil {
return fmt.Errorf("already gather stats for container %s", containerID)
}
log.Printf("docker mapper: collecting stats for %s", containerID)
req, err := http.NewRequest("GET", fmt.Sprintf("/containers/%s/stats", containerID), nil)
if err != nil {
return err
}
req.Header.Set("User-Agent", "weavescope")
url, err := url.Parse(endpoint)
if err != nil {
return err
}
dial, err := net.Dial(url.Scheme, url.Path)
if err != nil {
return err
}
conn := httputil.NewClientConn(dial, nil)
resp, err := conn.Do(req)
if err != nil {
return err
}
c.statsConn = conn
go func() {
defer func() {
c.Lock()
defer c.Unlock()
log.Printf("docker mapper: stopped collecting stats for %s", containerID)
c.statsConn = nil
c.latestStats = nil
}()
stats := &docker.Stats{}
decoder := json.NewDecoder(resp.Body)
for err := decoder.Decode(&stats); err != io.EOF; err = decoder.Decode(&stats) {
if err != nil {
log.Printf("docker mapper: error reading event %v", err)
return
}
c.Lock()
c.latestStats = stats
c.Unlock()
stats = &docker.Stats{}
}
}()
return nil
}
// called whilst holding t.Lock()
func (c *dockerContainer) stopGatheringStats(containerID string) {
c.Lock()
defer c.Unlock()
if c.statsConn == nil {
return
}
c.statsConn.Close()
c.statsConn = nil
c.latestStats = nil
return
}
// called whilst holding t.RLock()
func (c *dockerContainer) getStats() map[string]string {
c.RLock()
defer c.RUnlock()
if c.latestStats == nil {
return map[string]string{}
}
return map[string]string{
NetworkRxDropped: strconv.FormatUint(c.latestStats.Network.RxDropped, 10),
NetworkRxBytes: strconv.FormatUint(c.latestStats.Network.RxBytes, 10),
NetworkRxErrors: strconv.FormatUint(c.latestStats.Network.RxErrors, 10),
NetworkTxPackets: strconv.FormatUint(c.latestStats.Network.TxPackets, 10),
NetworkTxDropped: strconv.FormatUint(c.latestStats.Network.TxDropped, 10),
NetworkRxPackets: strconv.FormatUint(c.latestStats.Network.RxPackets, 10),
NetworkTxErrors: strconv.FormatUint(c.latestStats.Network.TxErrors, 10),
NetworkTxBytes: strconv.FormatUint(c.latestStats.Network.TxBytes, 10),
MemoryMaxUsage: strconv.FormatUint(c.latestStats.MemoryStats.MaxUsage, 10),
MemoryUsage: strconv.FormatUint(c.latestStats.MemoryStats.Usage, 10),
MemoryFailcnt: strconv.FormatUint(c.latestStats.MemoryStats.Failcnt, 10),
MemoryLimit: strconv.FormatUint(c.latestStats.MemoryStats.Limit, 10),
// CPUPercpuUsage: strconv.FormatUint(stats.CPUStats.CPUUsage.PercpuUsage, 10),
CPUUsageInUsermode: strconv.FormatUint(c.latestStats.CPUStats.CPUUsage.UsageInUsermode, 10),
CPUTotalUsage: strconv.FormatUint(c.latestStats.CPUStats.CPUUsage.TotalUsage, 10),
CPUUsageInKernelmode: strconv.FormatUint(c.latestStats.CPUStats.CPUUsage.UsageInKernelmode, 10),
CPUSystemCPUUsage: strconv.FormatUint(c.latestStats.CPUStats.SystemCPUUsage, 10),
}
}

View File

@@ -1,6 +1,7 @@
package tag
import (
"fmt"
"log"
"strconv"
"strings"
@@ -12,8 +13,9 @@ import (
)
const (
die = "die"
start = "start"
start = "start"
die = "die"
endpoint = "unix:///var/run/docker.sock"
)
// These constants are keys used in node metadata
@@ -36,15 +38,29 @@ type DockerTagger struct {
sync.RWMutex
quit chan struct{}
interval time.Duration
client dockerClient
containers map[string]*docker.Container
containersByPID map[int]*docker.Container
containers map[string]*dockerContainer
containersByPID map[int]*dockerContainer
images map[string]*docker.APIImages
procRoot string
pidTree *PIDTree
}
// Sub-interface for mocking.
type dockerClient interface {
ListContainers(docker.ListContainersOptions) ([]docker.APIContainers, error)
InspectContainer(string) (*docker.Container, error)
ListImages(docker.ListImagesOptions) ([]docker.APIImages, error)
AddEventListener(chan<- *docker.APIEvents) error
RemoveEventListener(chan *docker.APIEvents) error
}
func newDockerClient(endpoint string) (dockerClient, error) {
return docker.NewClient(endpoint)
}
// NewDockerTagger returns a usable DockerTagger. Don't forget to Stop it.
func NewDockerTagger(procRoot string, interval time.Duration) (*DockerTagger, error) {
pidTree, err := newPIDTreeStub(procRoot)
@@ -53,8 +69,8 @@ func NewDockerTagger(procRoot string, interval time.Duration) (*DockerTagger, er
}
t := DockerTagger{
containers: map[string]*docker.Container{},
containersByPID: map[int]*docker.Container{},
containers: map[string]*dockerContainer{},
containersByPID: map[int]*dockerContainer{},
images: map[string]*docker.APIImages{},
procRoot: procRoot,
@@ -92,26 +108,13 @@ func (t *DockerTagger) loop() {
}
}
// Sub-interface for mocking.
type dockerClient interface {
ListContainers(docker.ListContainersOptions) ([]docker.APIContainers, error)
InspectContainer(string) (*docker.Container, error)
ListImages(docker.ListImagesOptions) ([]docker.APIImages, error)
AddEventListener(chan<- *docker.APIEvents) error
RemoveEventListener(chan *docker.APIEvents) error
}
func newDockerClient(endpoint string) (dockerClient, error) {
return docker.NewClient(endpoint)
}
func (t *DockerTagger) update() bool {
endpoint := "unix:///var/run/docker.sock"
client, err := newDockerClientStub(endpoint)
if err != nil {
log.Printf("docker mapper: %s", err)
return true
}
t.client = client
events := make(chan *docker.APIEvents)
if err := client.AddEventListener(events); err != nil {
@@ -124,12 +127,12 @@ func (t *DockerTagger) update() bool {
}
}()
if err := t.updateContainers(client); err != nil {
if err := t.updateContainers(); err != nil {
log.Printf("docker mapper: %s", err)
return true
}
if err := t.updateImages(client); err != nil {
if err := t.updateImages(); err != nil {
log.Printf("docker mapper: %s", err)
return true
}
@@ -138,7 +141,7 @@ func (t *DockerTagger) update() bool {
for {
select {
case event := <-events:
t.handleEvent(event, client)
t.handleEvent(event)
case <-otherUpdates:
if err := t.updatePIDTree(); err != nil {
@@ -146,7 +149,7 @@ func (t *DockerTagger) update() bool {
continue
}
if err := t.updateImages(client); err != nil {
if err := t.updateImages(); err != nil {
log.Printf("docker mapper: %s", err)
continue
}
@@ -157,39 +160,23 @@ func (t *DockerTagger) update() bool {
}
}
func (t *DockerTagger) updateContainers(client dockerClient) error {
apiContainers, err := client.ListContainers(docker.ListContainersOptions{All: true})
func (t *DockerTagger) updateContainers() error {
apiContainers, err := t.client.ListContainers(docker.ListContainersOptions{All: true})
if err != nil {
return err
}
containers := []*docker.Container{}
for _, apiContainer := range apiContainers {
container, err := client.InspectContainer(apiContainer.ID)
if err != nil {
if err := t.addContainer(apiContainer.ID); err != nil {
log.Printf("docker mapper: %s", err)
continue
}
if !container.State.Running {
continue
}
containers = append(containers, container)
}
t.Lock()
for _, container := range containers {
t.containers[container.ID] = container
t.containersByPID[container.State.Pid] = container
}
t.Unlock()
return nil
}
func (t *DockerTagger) updateImages(client dockerClient) error {
images, err := client.ListImages(docker.ListImagesOptions{})
func (t *DockerTagger) updateImages() error {
images, err := t.client.ListImages(docker.ListImagesOptions{})
if err != nil {
return err
}
@@ -204,36 +191,17 @@ func (t *DockerTagger) updateImages(client dockerClient) error {
return nil
}
func (t *DockerTagger) handleEvent(event *docker.APIEvents, client dockerClient) {
func (t *DockerTagger) handleEvent(event *docker.APIEvents) {
switch event.Status {
case die:
containerID := event.ID
t.Lock()
if container, ok := t.containers[containerID]; ok {
delete(t.containers, containerID)
delete(t.containersByPID, container.State.Pid)
} else {
log.Printf("docker mapper: container %s not found", containerID)
}
t.Unlock()
t.removeContainer(containerID)
case start:
containerID := event.ID
container, err := client.InspectContainer(containerID)
if err != nil {
if err := t.addContainer(containerID); err != nil {
log.Printf("docker mapper: %s", err)
return
}
if !container.State.Running {
log.Printf("docker mapper: container %s not running", containerID)
return
}
t.Lock()
t.containers[containerID] = container
t.containersByPID[container.State.Pid] = container
t.Unlock()
}
}
@@ -249,13 +217,52 @@ func (t *DockerTagger) updatePIDTree() error {
return nil
}
func (t *DockerTagger) addContainer(containerID string) error {
container, err := t.client.InspectContainer(containerID)
if err != nil {
// Don't spam the logs if the container was short lived
if _, ok := err.(*docker.NoSuchContainer); !ok {
return err
}
return nil
}
if !container.State.Running {
return fmt.Errorf("docker mapper: container %s not running", containerID)
}
t.Lock()
defer t.Unlock()
dockerContainer := &dockerContainer{Container: container}
t.containers[containerID] = dockerContainer
t.containersByPID[container.State.Pid] = dockerContainer
return dockerContainer.startGatheringStats(containerID)
}
func (t *DockerTagger) removeContainer(containerID string) {
t.Lock()
defer t.Unlock()
container, ok := t.containers[containerID]
if !ok {
return
}
delete(t.containers, containerID)
delete(t.containersByPID, container.State.Pid)
container.stopGatheringStats(containerID)
}
// Containers returns the Containers the DockerTagger knows about.
func (t *DockerTagger) Containers() []*docker.Container {
containers := []*docker.Container{}
t.RLock()
for _, container := range t.containers {
containers = append(containers, container)
containers = append(containers, container.Container)
}
t.RUnlock()
@@ -277,7 +284,7 @@ func (t *DockerTagger) Tag(r report.Report) report.Report {
}
var (
container *docker.Container
container *dockerContainer
candidate = int(pid)
)
@@ -335,6 +342,8 @@ func (t *DockerTagger) ContainerTopology(scope string) report.Topology {
nmd[ImageName] = image.RepoTags[0]
}
nmd.Merge(container.getStats())
nodeID := report.MakeContainerNodeID(scope, container.ID)
result.NodeMetadatas[nodeID] = nmd
}