package docker import ( "bufio" "fmt" "io" "net" "net/http" "net/http/httputil" "net/url" "strconv" "strings" "sync" "time" log "github.com/Sirupsen/logrus" docker "github.com/fsouza/go-dockerclient" "github.com/ugorji/go/codec" "github.com/weaveworks/scope/common/mtime" "github.com/weaveworks/scope/report" ) // These constants are keys used in node metadata const ( ContainerName = "docker_container_name" ContainerCommand = "docker_container_command" ContainerPorts = "docker_container_ports" ContainerCreated = "docker_container_created" ContainerIPs = "docker_container_ips" ContainerHostname = "docker_container_hostname" ContainerIPsWithScopes = "docker_container_ips_with_scopes" ContainerState = "docker_container_state" ContainerUptime = "docker_container_uptime" ContainerRestartCount = "docker_container_restart_count" ContainerNetworkMode = "docker_container_network_mode" NetworkRxDropped = "network_rx_dropped" NetworkRxBytes = "network_rx_bytes" NetworkRxErrors = "network_rx_errors" NetworkTxPackets = "network_tx_packets" NetworkTxDropped = "network_tx_dropped" NetworkRxPackets = "network_rx_packets" NetworkTxErrors = "network_tx_errors" NetworkTxBytes = "network_tx_bytes" MemoryMaxUsage = "docker_memory_max_usage" MemoryUsage = "docker_memory_usage" MemoryFailcnt = "docker_memory_failcnt" MemoryLimit = "docker_memory_limit" CPUPercpuUsage = "docker_cpu_per_cpu_usage" CPUUsageInUsermode = "docker_cpu_usage_in_usermode" CPUTotalUsage = "docker_cpu_total_usage" CPUUsageInKernelmode = "docker_cpu_usage_in_kernelmode" CPUSystemCPUUsage = "docker_cpu_system_cpu_usage" StateRunning = "running" StateStopped = "stopped" StatePaused = "paused" NetworkModeHost = "host" stopTimeout = 10 ) // Exported for testing var ( DialStub = net.Dial NewClientConnStub = newClientConn ) func newClientConn(c net.Conn, r *bufio.Reader) ClientConn { return httputil.NewClientConn(c, r) } // ClientConn is exported for testing type ClientConn interface { Do(req *http.Request) (resp *http.Response, err error) Close() error } // Container represents a Docker container type Container interface { UpdateState(*docker.Container) ID() string Image() string PID() int Hostname() string GetNode(string, []net.IP) report.Node State() string HasTTY() bool Container() *docker.Container StartGatheringStats() error StopGatheringStats() } type container struct { sync.RWMutex container *docker.Container statsConn ClientConn latestStats docker.Stats pendingStats [20]docker.Stats numPending int } // NewContainer creates a new Container func NewContainer(c *docker.Container) Container { return &container{ container: c, } } func (c *container) UpdateState(container *docker.Container) { c.Lock() defer c.Unlock() c.container = container } func (c *container) ID() string { return c.container.ID } func (c *container) Image() string { return c.container.Image } func (c *container) PID() int { return c.container.State.Pid } func (c *container) Hostname() string { if c.container.Config.Domainname == "" { return c.container.Config.Hostname } return fmt.Sprintf("%s.%s", c.container.Config.Hostname, c.container.Config.Domainname) } func (c *container) HasTTY() bool { return c.container.Config.Tty } func (c *container) State() string { if c.container.State.Paused { return StatePaused } else if c.container.State.Running { return StateRunning } return StateStopped } func (c *container) Container() *docker.Container { return c.container } func (c *container) StartGatheringStats() error { c.Lock() defer c.Unlock() if c.statsConn != nil { return fmt.Errorf("already gather stats for container %s", c.container.ID) } go func() { log.Infof("docker container: collecting stats for %s", c.container.ID) req, err := http.NewRequest("GET", fmt.Sprintf("/containers/%s/stats", c.container.ID), nil) if err != nil { log.Errorf("docker container: %v", err) return } req.Header.Set("User-Agent", "weavescope") url, err := url.Parse(endpoint) if err != nil { log.Errorf("docker container: %v", err) return } dial, err := DialStub(url.Scheme, url.Path) if err != nil { log.Errorf("docker container: %v", err) return } conn := NewClientConnStub(dial, nil) resp, err := conn.Do(req) if err != nil { log.Errorf("docker container: %v", err) return } defer resp.Body.Close() c.Lock() c.statsConn = conn c.Unlock() defer func() { c.Lock() defer c.Unlock() log.Infof("docker container: stopped collecting stats for %s", c.container.ID) c.statsConn = nil }() var stats docker.Stats // Use a buffer since the codec library doesn't implicitly do it bufReader := bufio.NewReader(resp.Body) decoder := codec.NewDecoder(bufReader, &codec.JsonHandle{}) for err := decoder.Decode(&stats); err != io.EOF; err = decoder.Decode(&stats) { if err != nil { log.Errorf("docker container: error reading event, did container stop? %v", err) return } c.Lock() if c.numPending >= len(c.pendingStats) { log.Warnf("docker container: dropping stats.") } else { c.latestStats = stats c.pendingStats[c.numPending] = stats c.numPending++ } c.Unlock() stats = docker.Stats{} } }() return nil } func (c *container) StopGatheringStats() { c.Lock() defer c.Unlock() if c.statsConn == nil { return } c.statsConn.Close() c.statsConn = nil return } func (c *container) ports(localAddrs []net.IP) report.StringSet { if c.container.NetworkSettings == nil { return report.MakeStringSet() } ports := []string{} for port, bindings := range c.container.NetworkSettings.Ports { if len(bindings) == 0 { ports = append(ports, fmt.Sprintf("%s", port)) continue } for _, b := range bindings { if b.HostIP == "0.0.0.0" { for _, ip := range localAddrs { ports = append(ports, fmt.Sprintf("%s:%s->%s", ip, b.HostPort, port)) } } else { ports = append(ports, fmt.Sprintf("%s:%s->%s", b.HostIP, b.HostPort, port)) } } } return report.MakeStringSet(ports...) } func (c *container) memoryUsageMetric(stats []docker.Stats) report.Metric { result := report.MakeMetric() for _, s := range stats { result = result.Add(s.Read, float64(s.MemoryStats.Usage)) } return result } func (c *container) cpuPercentMetric(stats []docker.Stats) report.Metric { result := report.MakeMetric() if len(stats) < 2 { return result } previous := stats[0] for _, s := range stats[1:] { // Copies from docker/api/client/stats.go#L205 cpuDelta := float64(s.CPUStats.CPUUsage.TotalUsage - previous.CPUStats.CPUUsage.TotalUsage) systemDelta := float64(s.CPUStats.SystemCPUUsage - previous.CPUStats.SystemCPUUsage) cpuPercent := 0.0 if systemDelta > 0.0 && cpuDelta > 0.0 { cpuPercent = (cpuDelta / systemDelta) * float64(len(s.CPUStats.CPUUsage.PercpuUsage)) * 100.0 } result = result.Add(s.Read, cpuPercent) available := float64(len(s.CPUStats.CPUUsage.PercpuUsage)) * 100.0 if available >= result.Max { result.Max = available } previous = s } return result } func (c *container) metrics() report.Metrics { if c.numPending == 0 { return report.Metrics{} } pendingStats := c.pendingStats[:c.numPending] result := report.Metrics{ MemoryUsage: c.memoryUsageMetric(pendingStats), CPUTotalUsage: c.cpuPercentMetric(pendingStats), } // leave one stat to help with relative metrics c.pendingStats[0] = c.pendingStats[c.numPending-1] c.numPending = 1 return result } func (c *container) GetNode(hostID string, localAddrs []net.IP) report.Node { c.RLock() defer c.RUnlock() ips := c.container.NetworkSettings.SecondaryIPAddresses if c.container.NetworkSettings.IPAddress != "" { ips = append(ips, c.container.NetworkSettings.IPAddress) } // Treat all Docker IPs as local scoped. ipsWithScopes := []string{} for _, ip := range ips { ipsWithScopes = append(ipsWithScopes, report.MakeScopedAddressNodeID(hostID, ip)) } state := c.State() result := report.MakeNodeWith(map[string]string{ ContainerID: c.ID(), ContainerName: strings.TrimPrefix(c.container.Name, "/"), ContainerCreated: c.container.Created.Format(time.RFC822), ContainerCommand: c.container.Path + " " + strings.Join(c.container.Args, " "), ImageID: c.container.Image, ContainerHostname: c.Hostname(), ContainerState: state, }).WithSets(report.EmptySets. Add(ContainerPorts, c.ports(localAddrs)). Add(ContainerIPs, report.MakeStringSet(ips...)). Add(ContainerIPsWithScopes, report.MakeStringSet(ipsWithScopes...)), ).WithMetrics( c.metrics(), ).WithParents(report.EmptySets. Add(report.ContainerImage, report.MakeStringSet(report.MakeContainerImageNodeID(c.container.Image))), ) if c.container.State.Paused { result = result.WithControls(UnpauseContainer) } else if c.container.State.Running { uptime := (mtime.Now().Sub(c.container.State.StartedAt) / time.Second) * time.Second networkMode := "" if c.container.HostConfig != nil { networkMode = c.container.HostConfig.NetworkMode } result = result.WithLatests(map[string]string{ ContainerUptime: uptime.String(), ContainerRestartCount: strconv.Itoa(c.container.RestartCount), ContainerNetworkMode: networkMode, }) result = result.WithControls( RestartContainer, StopContainer, PauseContainer, AttachContainer, ExecContainer, ) } else { result = result.WithControls(StartContainer) } result = AddLabels(result, c.container.Config.Labels) result = result.WithMetrics(c.metrics()) return result } // ExtractContainerIPs returns the list of container IPs given a Node from the Container topology. func ExtractContainerIPs(nmd report.Node) []string { v, _ := nmd.Sets.Lookup(ContainerIPs) return []string(v) } // ExtractContainerIPsWithScopes returns the list of container IPs, prepended // with scopes, given a Node from the Container topology. func ExtractContainerIPsWithScopes(nmd report.Node) []string { v, _ := nmd.Sets.Lookup(ContainerIPsWithScopes) return []string(v) }