Files
weave-scope/probe/docker/container.go
Paul Bellamy d2bf204181 Merge pull request #960 from weaveworks/no-ips-for-no-network
Don't show blank IPs metadata row for containers with no IP
2016-02-19 17:39:01 +00:00

393 lines
10 KiB
Go

package docker
import (
"bufio"
"fmt"
"io"
"net"
"net/http"
"net/http/httputil"
"net/url"
"strconv"
"strings"
"sync"
"time"
log "github.com/Sirupsen/logrus"
docker "github.com/fsouza/go-dockerclient"
"github.com/ugorji/go/codec"
"github.com/weaveworks/scope/common/mtime"
"github.com/weaveworks/scope/report"
)
// These constants are keys used in node metadata
const (
ContainerName = "docker_container_name"
ContainerCommand = "docker_container_command"
ContainerPorts = "docker_container_ports"
ContainerCreated = "docker_container_created"
ContainerIPs = "docker_container_ips"
ContainerHostname = "docker_container_hostname"
ContainerIPsWithScopes = "docker_container_ips_with_scopes"
ContainerState = "docker_container_state"
ContainerUptime = "docker_container_uptime"
ContainerRestartCount = "docker_container_restart_count"
ContainerNetworkMode = "docker_container_network_mode"
NetworkRxDropped = "network_rx_dropped"
NetworkRxBytes = "network_rx_bytes"
NetworkRxErrors = "network_rx_errors"
NetworkTxPackets = "network_tx_packets"
NetworkTxDropped = "network_tx_dropped"
NetworkRxPackets = "network_rx_packets"
NetworkTxErrors = "network_tx_errors"
NetworkTxBytes = "network_tx_bytes"
MemoryMaxUsage = "docker_memory_max_usage"
MemoryUsage = "docker_memory_usage"
MemoryFailcnt = "docker_memory_failcnt"
MemoryLimit = "docker_memory_limit"
CPUPercpuUsage = "docker_cpu_per_cpu_usage"
CPUUsageInUsermode = "docker_cpu_usage_in_usermode"
CPUTotalUsage = "docker_cpu_total_usage"
CPUUsageInKernelmode = "docker_cpu_usage_in_kernelmode"
CPUSystemCPUUsage = "docker_cpu_system_cpu_usage"
StateRunning = "running"
StateStopped = "stopped"
StatePaused = "paused"
NetworkModeHost = "host"
stopTimeout = 10
)
// Exported for testing
var (
DialStub = net.Dial
NewClientConnStub = newClientConn
)
func newClientConn(c net.Conn, r *bufio.Reader) ClientConn {
return httputil.NewClientConn(c, r)
}
// ClientConn is exported for testing
type ClientConn interface {
Do(req *http.Request) (resp *http.Response, err error)
Close() error
}
// Container represents a Docker container
type Container interface {
UpdateState(*docker.Container)
ID() string
Image() string
PID() int
Hostname() string
GetNode(string, []net.IP) report.Node
State() string
HasTTY() bool
Container() *docker.Container
StartGatheringStats() error
StopGatheringStats()
}
type container struct {
sync.RWMutex
container *docker.Container
statsConn ClientConn
latestStats docker.Stats
pendingStats [20]docker.Stats
numPending int
}
// NewContainer creates a new Container
func NewContainer(c *docker.Container) Container {
return &container{
container: c,
}
}
func (c *container) UpdateState(container *docker.Container) {
c.Lock()
defer c.Unlock()
c.container = container
}
func (c *container) ID() string {
return c.container.ID
}
func (c *container) Image() string {
return c.container.Image
}
func (c *container) PID() int {
return c.container.State.Pid
}
func (c *container) Hostname() string {
if c.container.Config.Domainname == "" {
return c.container.Config.Hostname
}
return fmt.Sprintf("%s.%s", c.container.Config.Hostname,
c.container.Config.Domainname)
}
func (c *container) HasTTY() bool {
return c.container.Config.Tty
}
func (c *container) State() string {
if c.container.State.Paused {
return StatePaused
} else if c.container.State.Running {
return StateRunning
}
return StateStopped
}
func (c *container) Container() *docker.Container {
return c.container
}
func (c *container) StartGatheringStats() error {
c.Lock()
defer c.Unlock()
if c.statsConn != nil {
return fmt.Errorf("already gather stats for container %s", c.container.ID)
}
go func() {
log.Infof("docker container: collecting stats for %s", c.container.ID)
req, err := http.NewRequest("GET", fmt.Sprintf("/containers/%s/stats", c.container.ID), nil)
if err != nil {
log.Errorf("docker container: %v", err)
return
}
req.Header.Set("User-Agent", "weavescope")
url, err := url.Parse(endpoint)
if err != nil {
log.Errorf("docker container: %v", err)
return
}
dial, err := DialStub(url.Scheme, url.Path)
if err != nil {
log.Errorf("docker container: %v", err)
return
}
conn := NewClientConnStub(dial, nil)
resp, err := conn.Do(req)
if err != nil {
log.Errorf("docker container: %v", err)
return
}
defer resp.Body.Close()
c.Lock()
c.statsConn = conn
c.Unlock()
defer func() {
c.Lock()
defer c.Unlock()
log.Infof("docker container: stopped collecting stats for %s", c.container.ID)
c.statsConn = nil
}()
var stats docker.Stats
// Use a buffer since the codec library doesn't implicitly do it
bufReader := bufio.NewReader(resp.Body)
decoder := codec.NewDecoder(bufReader, &codec.JsonHandle{})
for err := decoder.Decode(&stats); err != io.EOF; err = decoder.Decode(&stats) {
if err != nil {
log.Errorf("docker container: error reading event, did container stop? %v", err)
return
}
c.Lock()
if c.numPending >= len(c.pendingStats) {
log.Warnf("docker container: dropping stats.")
} else {
c.latestStats = stats
c.pendingStats[c.numPending] = stats
c.numPending++
}
c.Unlock()
stats = docker.Stats{}
}
}()
return nil
}
func (c *container) StopGatheringStats() {
c.Lock()
defer c.Unlock()
if c.statsConn == nil {
return
}
c.statsConn.Close()
c.statsConn = nil
return
}
func (c *container) ports(localAddrs []net.IP) report.StringSet {
if c.container.NetworkSettings == nil {
return report.MakeStringSet()
}
ports := []string{}
for port, bindings := range c.container.NetworkSettings.Ports {
if len(bindings) == 0 {
ports = append(ports, fmt.Sprintf("%s", port))
continue
}
for _, b := range bindings {
if b.HostIP == "0.0.0.0" {
for _, ip := range localAddrs {
ports = append(ports, fmt.Sprintf("%s:%s->%s", ip, b.HostPort, port))
}
} else {
ports = append(ports, fmt.Sprintf("%s:%s->%s", b.HostIP, b.HostPort, port))
}
}
}
return report.MakeStringSet(ports...)
}
func (c *container) memoryUsageMetric(stats []docker.Stats) report.Metric {
result := report.MakeMetric()
for _, s := range stats {
result = result.Add(s.Read, float64(s.MemoryStats.Usage))
}
return result
}
func (c *container) cpuPercentMetric(stats []docker.Stats) report.Metric {
result := report.MakeMetric()
if len(stats) < 2 {
return result
}
previous := stats[0]
for _, s := range stats[1:] {
// Copies from docker/api/client/stats.go#L205
cpuDelta := float64(s.CPUStats.CPUUsage.TotalUsage - previous.CPUStats.CPUUsage.TotalUsage)
systemDelta := float64(s.CPUStats.SystemCPUUsage - previous.CPUStats.SystemCPUUsage)
cpuPercent := 0.0
if systemDelta > 0.0 && cpuDelta > 0.0 {
cpuPercent = (cpuDelta / systemDelta) * float64(len(s.CPUStats.CPUUsage.PercpuUsage)) * 100.0
}
result = result.Add(s.Read, cpuPercent)
available := float64(len(s.CPUStats.CPUUsage.PercpuUsage)) * 100.0
if available >= result.Max {
result.Max = available
}
previous = s
}
return result
}
func (c *container) metrics() report.Metrics {
if c.numPending == 0 {
return report.Metrics{}
}
pendingStats := c.pendingStats[:c.numPending]
result := report.Metrics{
MemoryUsage: c.memoryUsageMetric(pendingStats),
CPUTotalUsage: c.cpuPercentMetric(pendingStats),
}
// leave one stat to help with relative metrics
c.pendingStats[0] = c.pendingStats[c.numPending-1]
c.numPending = 1
return result
}
func (c *container) GetNode(hostID string, localAddrs []net.IP) report.Node {
c.RLock()
defer c.RUnlock()
ips := c.container.NetworkSettings.SecondaryIPAddresses
if c.container.NetworkSettings.IPAddress != "" {
ips = append(ips, c.container.NetworkSettings.IPAddress)
}
// Treat all Docker IPs as local scoped.
ipsWithScopes := []string{}
for _, ip := range ips {
ipsWithScopes = append(ipsWithScopes, report.MakeScopedAddressNodeID(hostID, ip))
}
state := c.State()
result := report.MakeNodeWith(map[string]string{
ContainerID: c.ID(),
ContainerName: strings.TrimPrefix(c.container.Name, "/"),
ContainerCreated: c.container.Created.Format(time.RFC822),
ContainerCommand: c.container.Path + " " + strings.Join(c.container.Args, " "),
ImageID: c.container.Image,
ContainerHostname: c.Hostname(),
ContainerState: state,
}).WithSets(report.EmptySets.
Add(ContainerPorts, c.ports(localAddrs)).
Add(ContainerIPs, report.MakeStringSet(ips...)).
Add(ContainerIPsWithScopes, report.MakeStringSet(ipsWithScopes...)),
).WithMetrics(
c.metrics(),
).WithParents(report.EmptySets.
Add(report.ContainerImage, report.MakeStringSet(report.MakeContainerImageNodeID(c.container.Image))),
)
if c.container.State.Paused {
result = result.WithControls(UnpauseContainer)
} else if c.container.State.Running {
uptime := (mtime.Now().Sub(c.container.State.StartedAt) / time.Second) * time.Second
networkMode := ""
if c.container.HostConfig != nil {
networkMode = c.container.HostConfig.NetworkMode
}
result = result.WithLatests(map[string]string{
ContainerUptime: uptime.String(),
ContainerRestartCount: strconv.Itoa(c.container.RestartCount),
ContainerNetworkMode: networkMode,
})
result = result.WithControls(
RestartContainer, StopContainer, PauseContainer, AttachContainer, ExecContainer,
)
} else {
result = result.WithControls(StartContainer)
}
result = AddLabels(result, c.container.Config.Labels)
result = result.WithMetrics(c.metrics())
return result
}
// ExtractContainerIPs returns the list of container IPs given a Node from the Container topology.
func ExtractContainerIPs(nmd report.Node) []string {
v, _ := nmd.Sets.Lookup(ContainerIPs)
return []string(v)
}
// ExtractContainerIPsWithScopes returns the list of container IPs, prepended
// with scopes, given a Node from the Container topology.
func ExtractContainerIPsWithScopes(nmd report.Node) []string {
v, _ := nmd.Sets.Lookup(ContainerIPsWithScopes)
return []string(v)
}