mirror of
https://github.com/weaveworks/scope.git
synced 2026-03-03 18:20:27 +00:00
Merge pull request #257 from tomwilkie/237-docker-tests
More tests for docker integration
This commit is contained in:
2
Makefile
2
Makefile
@@ -23,7 +23,7 @@ $(SCOPE_EXPORT): $(APP_EXE) $(PROBE_EXE) docker/*
|
||||
|
||||
$(APP_EXE): app/*.go render/*.go report/*.go xfer/*.go
|
||||
|
||||
$(PROBE_EXE): probe/*.go probe/tag/*.go report/*.go xfer/*.go
|
||||
$(PROBE_EXE): probe/*.go probe/tag/*.go probe/docker/*.go report/*.go xfer/*.go
|
||||
|
||||
$(APP_EXE) $(PROBE_EXE):
|
||||
go get -tags netgo ./$(@D)
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package tag
|
||||
package docker
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
@@ -10,9 +11,12 @@ import (
|
||||
"net/http/httputil"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
docker "github.com/fsouza/go-dockerclient"
|
||||
|
||||
"github.com/weaveworks/scope/report"
|
||||
)
|
||||
|
||||
// These constants are keys used in node metadata
|
||||
@@ -39,51 +43,102 @@ const (
|
||||
CPUSystemCPUUsage = "cpu_system_cpu_usage"
|
||||
)
|
||||
|
||||
type dockerContainer struct {
|
||||
sync.RWMutex
|
||||
*docker.Container
|
||||
// Exported for testing
|
||||
var (
|
||||
DialStub = net.Dial
|
||||
NewClientConnStub = newClientConn
|
||||
)
|
||||
|
||||
statsConn *httputil.ClientConn
|
||||
func newClientConn(c net.Conn, r *bufio.Reader) ClientConn {
|
||||
return httputil.NewClientConn(c, r)
|
||||
}
|
||||
|
||||
// ClientConn is exported for testing
|
||||
type ClientConn interface {
|
||||
Do(req *http.Request) (resp *http.Response, err error)
|
||||
Close() error
|
||||
}
|
||||
|
||||
// Container represents a docker container
|
||||
type Container interface {
|
||||
ID() string
|
||||
Image() string
|
||||
PID() int
|
||||
GetNodeMetadata() report.NodeMetadata
|
||||
|
||||
StartGatheringStats() error
|
||||
StopGatheringStats()
|
||||
}
|
||||
|
||||
type container struct {
|
||||
sync.RWMutex
|
||||
container *docker.Container
|
||||
statsConn ClientConn
|
||||
latestStats *docker.Stats
|
||||
}
|
||||
|
||||
// called whilst holding t.Lock() for writes
|
||||
func (c *dockerContainer) startGatheringStats(containerID string) error {
|
||||
// NewContainer creates a new Container
|
||||
func NewContainer(c *docker.Container) Container {
|
||||
return &container{container: c}
|
||||
}
|
||||
|
||||
func (c *container) ID() string {
|
||||
return c.container.ID
|
||||
}
|
||||
|
||||
func (c *container) Image() string {
|
||||
return c.container.Image
|
||||
}
|
||||
|
||||
func (c *container) PID() int {
|
||||
return c.container.State.Pid
|
||||
}
|
||||
|
||||
func (c *container) StartGatheringStats() error {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
if c.statsConn != nil {
|
||||
return fmt.Errorf("already gather stats for container %s", containerID)
|
||||
return fmt.Errorf("already gather stats for container %s", c.container.ID)
|
||||
}
|
||||
|
||||
log.Printf("docker mapper: collecting stats for %s", containerID)
|
||||
req, err := http.NewRequest("GET", fmt.Sprintf("/containers/%s/stats", containerID), nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Set("User-Agent", "weavescope")
|
||||
|
||||
url, err := url.Parse(endpoint)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
dial, err := net.Dial(url.Scheme, url.Path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
conn := httputil.NewClientConn(dial, nil)
|
||||
resp, err := conn.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.statsConn = conn
|
||||
|
||||
go func() {
|
||||
log.Printf("docker container: collecting stats for %s", c.container.ID)
|
||||
req, err := http.NewRequest("GET", fmt.Sprintf("/containers/%s/stats", c.container.ID), nil)
|
||||
if err != nil {
|
||||
log.Printf("docker container: %v", err)
|
||||
return
|
||||
}
|
||||
req.Header.Set("User-Agent", "weavescope")
|
||||
|
||||
url, err := url.Parse(endpoint)
|
||||
if err != nil {
|
||||
log.Printf("docker container: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
dial, err := net.Dial(url.Scheme, url.Path)
|
||||
if err != nil {
|
||||
log.Printf("docker container: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
conn := NewClientConnStub(dial, nil)
|
||||
resp, err := conn.Do(req)
|
||||
if err != nil {
|
||||
log.Printf("docker container: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
c.Lock()
|
||||
c.statsConn = conn
|
||||
c.Unlock()
|
||||
|
||||
defer func() {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
log.Printf("docker mapper: stopped collecting stats for %s", containerID)
|
||||
log.Printf("docker container: stopped collecting stats for %s", c.container.ID)
|
||||
c.statsConn = nil
|
||||
c.latestStats = nil
|
||||
}()
|
||||
@@ -93,7 +148,7 @@ func (c *dockerContainer) startGatheringStats(containerID string) error {
|
||||
|
||||
for err := decoder.Decode(&stats); err != io.EOF; err = decoder.Decode(&stats) {
|
||||
if err != nil {
|
||||
log.Printf("docker mapper: error reading event %v", err)
|
||||
log.Printf("docker container: error reading event %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -109,7 +164,7 @@ func (c *dockerContainer) startGatheringStats(containerID string) error {
|
||||
}
|
||||
|
||||
// called whilst holding t.Lock()
|
||||
func (c *dockerContainer) stopGatheringStats(containerID string) {
|
||||
func (c *container) StopGatheringStats() {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
@@ -124,15 +179,21 @@ func (c *dockerContainer) stopGatheringStats(containerID string) {
|
||||
}
|
||||
|
||||
// called whilst holding t.RLock()
|
||||
func (c *dockerContainer) getStats() map[string]string {
|
||||
func (c *container) GetNodeMetadata() report.NodeMetadata {
|
||||
c.RLock()
|
||||
defer c.RUnlock()
|
||||
|
||||
if c.latestStats == nil {
|
||||
return map[string]string{}
|
||||
result := report.NodeMetadata{
|
||||
ContainerID: c.ID(),
|
||||
ContainerName: strings.TrimPrefix(c.container.Name, "/"),
|
||||
ImageID: c.container.Image,
|
||||
}
|
||||
|
||||
return map[string]string{
|
||||
if c.latestStats == nil {
|
||||
return result
|
||||
}
|
||||
|
||||
result.Merge(report.NodeMetadata{
|
||||
NetworkRxDropped: strconv.FormatUint(c.latestStats.Network.RxDropped, 10),
|
||||
NetworkRxBytes: strconv.FormatUint(c.latestStats.Network.RxBytes, 10),
|
||||
NetworkRxErrors: strconv.FormatUint(c.latestStats.Network.RxErrors, 10),
|
||||
@@ -152,5 +213,6 @@ func (c *dockerContainer) getStats() map[string]string {
|
||||
CPUTotalUsage: strconv.FormatUint(c.latestStats.CPUStats.CPUUsage.TotalUsage, 10),
|
||||
CPUUsageInKernelmode: strconv.FormatUint(c.latestStats.CPUStats.CPUUsage.UsageInKernelmode, 10),
|
||||
CPUSystemCPUUsage: strconv.FormatUint(c.latestStats.CPUStats.SystemCPUUsage, 10),
|
||||
}
|
||||
})
|
||||
return result
|
||||
}
|
||||
67
probe/docker/container_test.go
Normal file
67
probe/docker/container_test.go
Normal file
@@ -0,0 +1,67 @@
|
||||
package docker_test
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"runtime"
|
||||
"testing"
|
||||
|
||||
client "github.com/fsouza/go-dockerclient"
|
||||
"github.com/weaveworks/scope/probe/docker"
|
||||
)
|
||||
|
||||
type mockConnection struct {
|
||||
reader *io.PipeReader
|
||||
}
|
||||
|
||||
func (c *mockConnection) Do(req *http.Request) (resp *http.Response, err error) {
|
||||
return &http.Response{
|
||||
Body: c.reader,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *mockConnection) Close() error {
|
||||
return c.reader.Close()
|
||||
}
|
||||
|
||||
func TestContainer(t *testing.T) {
|
||||
oldDialStub, oldNewClientConnStub := docker.DialStub, docker.NewClientConnStub
|
||||
defer func() { docker.DialStub, docker.NewClientConnStub = oldDialStub, oldNewClientConnStub }()
|
||||
|
||||
docker.DialStub = func(network, address string) (net.Conn, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
reader, writer := io.Pipe()
|
||||
connection := &mockConnection{reader}
|
||||
|
||||
docker.NewClientConnStub = func(c net.Conn, r *bufio.Reader) docker.ClientConn {
|
||||
return connection
|
||||
}
|
||||
|
||||
c := docker.NewContainer(container1)
|
||||
err := c.StartGatheringStats()
|
||||
if err != nil {
|
||||
t.Errorf("%v", err)
|
||||
}
|
||||
defer c.StopGatheringStats()
|
||||
runtime.Gosched() // wait for StartGatheringStats goroutine to call connection.Do
|
||||
|
||||
// Send some stats to the docker container
|
||||
stats := &client.Stats{}
|
||||
stats.MemoryStats.Usage = 12345
|
||||
err = json.NewEncoder(writer).Encode(&stats)
|
||||
if err != nil {
|
||||
t.Errorf("%v", err)
|
||||
}
|
||||
runtime.Gosched() // wait for StartGatheringStats goroutine to receive the stats
|
||||
|
||||
// Now see if we go them
|
||||
nmd := c.GetNodeMetadata()
|
||||
if nmd[docker.MemoryUsage] != "12345" {
|
||||
t.Errorf("want 12345, got %s", nmd[docker.MemoryUsage])
|
||||
}
|
||||
}
|
||||
288
probe/docker/registry.go
Normal file
288
probe/docker/registry.go
Normal file
@@ -0,0 +1,288 @@
|
||||
package docker
|
||||
|
||||
import (
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
docker_client "github.com/fsouza/go-dockerclient"
|
||||
)
|
||||
|
||||
// Consts exported for testing.
|
||||
const (
|
||||
StartEvent = "start"
|
||||
DieEvent = "die"
|
||||
endpoint = "unix:///var/run/docker.sock"
|
||||
)
|
||||
|
||||
// Vars exported for testing.
|
||||
var (
|
||||
NewDockerClientStub = newDockerClient
|
||||
NewContainerStub = NewContainer
|
||||
)
|
||||
|
||||
// Registry keeps track of running docker containers and their images
|
||||
type Registry interface {
|
||||
Stop()
|
||||
LockedPIDLookup(f func(func(int) Container))
|
||||
WalkContainers(f func(Container))
|
||||
WalkImages(f func(*docker_client.APIImages))
|
||||
}
|
||||
|
||||
type registry struct {
|
||||
sync.RWMutex
|
||||
quit chan chan struct{}
|
||||
interval time.Duration
|
||||
client Client
|
||||
|
||||
containers map[string]Container
|
||||
containersByPID map[int]Container
|
||||
images map[string]*docker_client.APIImages
|
||||
}
|
||||
|
||||
// Client interface for mocking.
|
||||
type Client interface {
|
||||
ListContainers(docker_client.ListContainersOptions) ([]docker_client.APIContainers, error)
|
||||
InspectContainer(string) (*docker_client.Container, error)
|
||||
ListImages(docker_client.ListImagesOptions) ([]docker_client.APIImages, error)
|
||||
AddEventListener(chan<- *docker_client.APIEvents) error
|
||||
RemoveEventListener(chan *docker_client.APIEvents) error
|
||||
}
|
||||
|
||||
func newDockerClient(endpoint string) (Client, error) {
|
||||
return docker_client.NewClient(endpoint)
|
||||
}
|
||||
|
||||
// NewRegistry returns a usable Registry. Don't forget to Stop it.
|
||||
func NewRegistry(interval time.Duration) (Registry, error) {
|
||||
client, err := NewDockerClientStub(endpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
r := ®istry{
|
||||
containers: map[string]Container{},
|
||||
containersByPID: map[int]Container{},
|
||||
images: map[string]*docker_client.APIImages{},
|
||||
|
||||
client: client,
|
||||
interval: interval,
|
||||
quit: make(chan chan struct{}),
|
||||
}
|
||||
|
||||
go r.loop()
|
||||
return r, nil
|
||||
}
|
||||
|
||||
// Stop stops the Docker registry's event subscriber.
|
||||
func (r *registry) Stop() {
|
||||
ch := make(chan struct{})
|
||||
r.quit <- ch
|
||||
<-ch
|
||||
}
|
||||
|
||||
func (r *registry) loop() {
|
||||
for {
|
||||
// NB listenForEvents blocks.
|
||||
// Returning false means we should exit.
|
||||
if !r.listenForEvents() {
|
||||
return
|
||||
}
|
||||
|
||||
// Sleep here so we don't hammer the
|
||||
// logs if docker is down
|
||||
time.Sleep(r.interval)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *registry) listenForEvents() bool {
|
||||
// First we empty the store lists.
|
||||
// This ensure any containers that went away inbetween calls to
|
||||
// listenForEvents don't hang around.
|
||||
r.reset()
|
||||
|
||||
// Next, start listening for events. We do this before fetching
|
||||
// the list of containers so we don't miss containers created
|
||||
// after listing but before listening for events.
|
||||
events := make(chan *docker_client.APIEvents)
|
||||
if err := r.client.AddEventListener(events); err != nil {
|
||||
log.Printf("docker registry: %s", err)
|
||||
return true
|
||||
}
|
||||
defer func() {
|
||||
if err := r.client.RemoveEventListener(events); err != nil {
|
||||
log.Printf("docker registry: %s", err)
|
||||
}
|
||||
}()
|
||||
|
||||
if err := r.updateContainers(); err != nil {
|
||||
log.Printf("docker registry: %s", err)
|
||||
return true
|
||||
}
|
||||
|
||||
if err := r.updateImages(); err != nil {
|
||||
log.Printf("docker registry: %s", err)
|
||||
return true
|
||||
}
|
||||
|
||||
otherUpdates := time.Tick(r.interval)
|
||||
for {
|
||||
select {
|
||||
case event := <-events:
|
||||
r.handleEvent(event)
|
||||
|
||||
case <-otherUpdates:
|
||||
if err := r.updateImages(); err != nil {
|
||||
log.Printf("docker registry: %s", err)
|
||||
return true
|
||||
}
|
||||
|
||||
case ch := <-r.quit:
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
for _, c := range r.containers {
|
||||
c.StopGatheringStats()
|
||||
}
|
||||
close(ch)
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *registry) reset() {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
for _, c := range r.containers {
|
||||
c.StopGatheringStats()
|
||||
}
|
||||
|
||||
r.containers = map[string]Container{}
|
||||
r.containersByPID = map[int]Container{}
|
||||
r.images = map[string]*docker_client.APIImages{}
|
||||
}
|
||||
|
||||
func (r *registry) updateContainers() error {
|
||||
apiContainers, err := r.client.ListContainers(docker_client.ListContainersOptions{All: true})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, apiContainer := range apiContainers {
|
||||
if err := r.addContainer(apiContainer.ID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *registry) updateImages() error {
|
||||
images, err := r.client.ListImages(docker_client.ListImagesOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
for i := range images {
|
||||
image := &images[i]
|
||||
r.images[image.ID] = image
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *registry) handleEvent(event *docker_client.APIEvents) {
|
||||
switch event.Status {
|
||||
case DieEvent:
|
||||
containerID := event.ID
|
||||
r.removeContainer(containerID)
|
||||
|
||||
case StartEvent:
|
||||
containerID := event.ID
|
||||
if err := r.addContainer(containerID); err != nil {
|
||||
log.Printf("docker registry: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *registry) addContainer(containerID string) error {
|
||||
dockerContainer, err := r.client.InspectContainer(containerID)
|
||||
if err != nil {
|
||||
// Don't spam the logs if the container was short lived
|
||||
if _, ok := err.(*docker_client.NoSuchContainer); ok {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
if !dockerContainer.State.Running {
|
||||
// We get events late, and the containers sometimes have already
|
||||
// stopped. Not an error, so don't return it.
|
||||
return nil
|
||||
}
|
||||
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
c := NewContainerStub(dockerContainer)
|
||||
r.containers[containerID] = c
|
||||
r.containersByPID[dockerContainer.State.Pid] = c
|
||||
|
||||
return c.StartGatheringStats()
|
||||
}
|
||||
|
||||
func (r *registry) removeContainer(containerID string) {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
container, ok := r.containers[containerID]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
delete(r.containers, containerID)
|
||||
delete(r.containersByPID, container.PID())
|
||||
container.StopGatheringStats()
|
||||
}
|
||||
|
||||
// LockedPIDLookup runs f under a read lock, and gives f a function for
|
||||
// use doing pid->container lookups.
|
||||
func (r *registry) LockedPIDLookup(f func(func(int) Container)) {
|
||||
r.RLock()
|
||||
defer r.RUnlock()
|
||||
|
||||
lookup := func(pid int) Container {
|
||||
return r.containersByPID[pid]
|
||||
}
|
||||
|
||||
f(lookup)
|
||||
}
|
||||
|
||||
// WalkContainers runs f on every running containers the registry knows of.
|
||||
func (r *registry) WalkContainers(f func(Container)) {
|
||||
r.RLock()
|
||||
defer r.RUnlock()
|
||||
|
||||
for _, container := range r.containers {
|
||||
f(container)
|
||||
}
|
||||
}
|
||||
|
||||
// WalkImages runs f on every image of running containers the registry
|
||||
// knows of. f may be run on the same image more than once.
|
||||
func (r *registry) WalkImages(f func(*docker_client.APIImages)) {
|
||||
r.RLock()
|
||||
defer r.RUnlock()
|
||||
|
||||
// Loop over containers so we only emit images for running containers.
|
||||
for _, container := range r.containers {
|
||||
image, ok := r.images[container.Image()]
|
||||
if ok {
|
||||
f(image)
|
||||
}
|
||||
}
|
||||
}
|
||||
241
probe/docker/registry_test.go
Normal file
241
probe/docker/registry_test.go
Normal file
@@ -0,0 +1,241 @@
|
||||
package docker_test
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"runtime"
|
||||
"sort"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
client "github.com/fsouza/go-dockerclient"
|
||||
|
||||
"github.com/weaveworks/scope/probe/docker"
|
||||
"github.com/weaveworks/scope/report"
|
||||
"github.com/weaveworks/scope/test"
|
||||
)
|
||||
|
||||
type mockContainer struct {
|
||||
c *client.Container
|
||||
}
|
||||
|
||||
func (c *mockContainer) ID() string {
|
||||
return c.c.ID
|
||||
}
|
||||
|
||||
func (c *mockContainer) PID() int {
|
||||
return c.c.State.Pid
|
||||
}
|
||||
|
||||
func (c *mockContainer) Image() string {
|
||||
return c.c.Image
|
||||
}
|
||||
|
||||
func (c *mockContainer) StartGatheringStats() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *mockContainer) StopGatheringStats() {}
|
||||
|
||||
func (c *mockContainer) GetNodeMetadata() report.NodeMetadata {
|
||||
return report.NodeMetadata{
|
||||
docker.ContainerID: c.c.ID,
|
||||
docker.ContainerName: c.c.Name,
|
||||
docker.ImageID: c.c.Image,
|
||||
}
|
||||
}
|
||||
|
||||
type mockDockerClient struct {
|
||||
sync.Mutex
|
||||
apiContainers []client.APIContainers
|
||||
containers map[string]*client.Container
|
||||
apiImages []client.APIImages
|
||||
events []chan<- *client.APIEvents
|
||||
}
|
||||
|
||||
func (m *mockDockerClient) ListContainers(client.ListContainersOptions) ([]client.APIContainers, error) {
|
||||
return m.apiContainers, nil
|
||||
}
|
||||
|
||||
func (m *mockDockerClient) InspectContainer(id string) (*client.Container, error) {
|
||||
return m.containers[id], nil
|
||||
}
|
||||
|
||||
func (m *mockDockerClient) ListImages(client.ListImagesOptions) ([]client.APIImages, error) {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
return m.apiImages, nil
|
||||
}
|
||||
|
||||
func (m *mockDockerClient) AddEventListener(events chan<- *client.APIEvents) error {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
m.events = append(m.events, events)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockDockerClient) RemoveEventListener(events chan *client.APIEvents) error {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
for i, c := range m.events {
|
||||
if c == events {
|
||||
m.events = append(m.events[:i], m.events[i+1:]...)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockDockerClient) send(event *client.APIEvents) {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
for _, c := range m.events {
|
||||
c <- event
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
container1 = &client.Container{
|
||||
ID: "ping",
|
||||
Name: "pong",
|
||||
Image: "baz",
|
||||
State: client.State{Pid: 1, Running: true},
|
||||
}
|
||||
container2 = &client.Container{
|
||||
ID: "wiff",
|
||||
Name: "waff",
|
||||
Image: "baz",
|
||||
State: client.State{Pid: 1, Running: true},
|
||||
}
|
||||
apiContainer1 = client.APIContainers{ID: "ping"}
|
||||
apiImage1 = client.APIImages{ID: "baz", RepoTags: []string{"bang", "not-chosen"}}
|
||||
mockClient = mockDockerClient{
|
||||
apiContainers: []client.APIContainers{apiContainer1},
|
||||
containers: map[string]*client.Container{"ping": container1},
|
||||
apiImages: []client.APIImages{apiImage1},
|
||||
}
|
||||
)
|
||||
|
||||
func setupStubs(mdc *mockDockerClient, f func()) {
|
||||
oldDockerClient, oldNewContainer := docker.NewDockerClientStub, docker.NewContainerStub
|
||||
defer func() { docker.NewDockerClientStub, docker.NewContainerStub = oldDockerClient, oldNewContainer }()
|
||||
|
||||
docker.NewDockerClientStub = func(endpoint string) (docker.Client, error) {
|
||||
return mdc, nil
|
||||
}
|
||||
|
||||
docker.NewContainerStub = func(c *client.Container) docker.Container {
|
||||
return &mockContainer{c}
|
||||
}
|
||||
|
||||
f()
|
||||
}
|
||||
|
||||
type containers []docker.Container
|
||||
|
||||
func (c containers) Len() int { return len(c) }
|
||||
func (c containers) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
|
||||
func (c containers) Less(i, j int) bool { return c[i].ID() < c[j].ID() }
|
||||
|
||||
func allContainers(r docker.Registry) []docker.Container {
|
||||
result := []docker.Container{}
|
||||
r.WalkContainers(func(c docker.Container) {
|
||||
result = append(result, c)
|
||||
})
|
||||
sort.Sort(containers(result))
|
||||
return result
|
||||
}
|
||||
|
||||
func allImages(r docker.Registry) []*client.APIImages {
|
||||
result := []*client.APIImages{}
|
||||
r.WalkImages(func(i *client.APIImages) {
|
||||
result = append(result, i)
|
||||
})
|
||||
return result
|
||||
}
|
||||
|
||||
func TestRegistry(t *testing.T) {
|
||||
mdc := mockClient // take a copy
|
||||
setupStubs(&mdc, func() {
|
||||
registry, _ := docker.NewRegistry(10 * time.Second)
|
||||
defer registry.Stop()
|
||||
runtime.Gosched()
|
||||
|
||||
{
|
||||
have := allContainers(registry)
|
||||
want := []docker.Container{&mockContainer{container1}}
|
||||
if !reflect.DeepEqual(want, have) {
|
||||
t.Errorf("%s", test.Diff(want, have))
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
have := allImages(registry)
|
||||
want := []*client.APIImages{&apiImage1}
|
||||
if !reflect.DeepEqual(want, have) {
|
||||
t.Errorf("%s", test.Diff(want, have))
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestRegistryEvents(t *testing.T) {
|
||||
mdc := mockClient // take a copy
|
||||
setupStubs(&mdc, func() {
|
||||
registry, _ := docker.NewRegistry(10 * time.Second)
|
||||
defer registry.Stop()
|
||||
runtime.Gosched()
|
||||
|
||||
{
|
||||
mdc.Lock()
|
||||
mdc.containers["wiff"] = container2
|
||||
mdc.Unlock()
|
||||
mdc.send(&client.APIEvents{Status: docker.StartEvent, ID: "wiff"})
|
||||
runtime.Gosched()
|
||||
|
||||
have := allContainers(registry)
|
||||
want := []docker.Container{&mockContainer{container1}, &mockContainer{container2}}
|
||||
if !reflect.DeepEqual(want, have) {
|
||||
t.Errorf("%s", test.Diff(want, have))
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
mdc.Lock()
|
||||
delete(mdc.containers, "wiff")
|
||||
mdc.Unlock()
|
||||
mdc.send(&client.APIEvents{Status: docker.DieEvent, ID: "wiff"})
|
||||
runtime.Gosched()
|
||||
|
||||
have := allContainers(registry)
|
||||
want := []docker.Container{&mockContainer{container1}}
|
||||
if !reflect.DeepEqual(want, have) {
|
||||
t.Errorf("%s", test.Diff(want, have))
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
mdc.Lock()
|
||||
delete(mdc.containers, "ping")
|
||||
mdc.Unlock()
|
||||
mdc.send(&client.APIEvents{Status: docker.DieEvent, ID: "ping"})
|
||||
runtime.Gosched()
|
||||
|
||||
have := allContainers(registry)
|
||||
want := []docker.Container{}
|
||||
if !reflect.DeepEqual(want, have) {
|
||||
t.Errorf("%s", test.Diff(want, have))
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
mdc.send(&client.APIEvents{Status: docker.DieEvent, ID: "doesntexist"})
|
||||
runtime.Gosched()
|
||||
|
||||
have := allContainers(registry)
|
||||
want := []docker.Container{}
|
||||
if !reflect.DeepEqual(want, have) {
|
||||
t.Errorf("%s", test.Diff(want, have))
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
66
probe/docker/reporter.go
Normal file
66
probe/docker/reporter.go
Normal file
@@ -0,0 +1,66 @@
|
||||
package docker
|
||||
|
||||
import (
|
||||
docker_client "github.com/fsouza/go-dockerclient"
|
||||
|
||||
"github.com/weaveworks/scope/report"
|
||||
)
|
||||
|
||||
// Keys for use in NodeMetadata
|
||||
const (
|
||||
ContainerName = "docker_container_name"
|
||||
ImageID = "docker_image_id"
|
||||
ImageName = "docker_image_name"
|
||||
)
|
||||
|
||||
// Reporter generate Reports containing Container and ContainerImage topologies
|
||||
type Reporter struct {
|
||||
registry Registry
|
||||
scope string
|
||||
}
|
||||
|
||||
// NewReporter makes a new Reporter
|
||||
func NewReporter(registry Registry, scope string) *Reporter {
|
||||
return &Reporter{
|
||||
registry: registry,
|
||||
scope: scope,
|
||||
}
|
||||
}
|
||||
|
||||
// Report generates a Report containing Container and ContainerImage topologies
|
||||
func (r *Reporter) Report() report.Report {
|
||||
result := report.MakeReport()
|
||||
result.Container.Merge(r.containerTopology())
|
||||
result.ContainerImage.Merge(r.containerImageTopology())
|
||||
return result
|
||||
}
|
||||
|
||||
func (r *Reporter) containerTopology() report.Topology {
|
||||
result := report.NewTopology()
|
||||
|
||||
r.registry.WalkContainers(func(c Container) {
|
||||
nodeID := report.MakeContainerNodeID(r.scope, c.ID())
|
||||
result.NodeMetadatas[nodeID] = c.GetNodeMetadata()
|
||||
})
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func (r *Reporter) containerImageTopology() report.Topology {
|
||||
result := report.NewTopology()
|
||||
|
||||
r.registry.WalkImages(func(image *docker_client.APIImages) {
|
||||
nmd := report.NodeMetadata{
|
||||
ImageID: image.ID,
|
||||
}
|
||||
|
||||
if len(image.RepoTags) > 0 {
|
||||
nmd[ImageName] = image.RepoTags[0]
|
||||
}
|
||||
|
||||
nodeID := report.MakeContainerNodeID(r.scope, image.ID)
|
||||
result.NodeMetadatas[nodeID] = nmd
|
||||
})
|
||||
|
||||
return result
|
||||
}
|
||||
79
probe/docker/reporter_test.go
Normal file
79
probe/docker/reporter_test.go
Normal file
@@ -0,0 +1,79 @@
|
||||
package docker_test
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
client "github.com/fsouza/go-dockerclient"
|
||||
|
||||
"github.com/weaveworks/scope/probe/docker"
|
||||
"github.com/weaveworks/scope/report"
|
||||
"github.com/weaveworks/scope/test"
|
||||
)
|
||||
|
||||
type mockRegistry struct {
|
||||
containersByPID map[int]docker.Container
|
||||
images map[string]*client.APIImages
|
||||
}
|
||||
|
||||
func (r *mockRegistry) Stop() {}
|
||||
|
||||
func (r *mockRegistry) LockedPIDLookup(f func(func(int) docker.Container)) {
|
||||
f(func(pid int) docker.Container {
|
||||
return r.containersByPID[pid]
|
||||
})
|
||||
}
|
||||
|
||||
func (r *mockRegistry) WalkContainers(f func(docker.Container)) {
|
||||
for _, c := range r.containersByPID {
|
||||
f(c)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *mockRegistry) WalkImages(f func(*client.APIImages)) {
|
||||
for _, i := range r.images {
|
||||
f(i)
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
mockRegistryInstance = &mockRegistry{
|
||||
containersByPID: map[int]docker.Container{
|
||||
1: &mockContainer{container1},
|
||||
},
|
||||
images: map[string]*client.APIImages{
|
||||
"baz": &apiImage1,
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
func TestReporter(t *testing.T) {
|
||||
want := report.MakeReport()
|
||||
want.Container = report.Topology{
|
||||
Adjacency: report.Adjacency{},
|
||||
EdgeMetadatas: report.EdgeMetadatas{},
|
||||
NodeMetadatas: report.NodeMetadatas{
|
||||
report.MakeContainerNodeID("", "ping"): report.NodeMetadata{
|
||||
docker.ContainerID: "ping",
|
||||
docker.ContainerName: "pong",
|
||||
docker.ImageID: "baz",
|
||||
},
|
||||
},
|
||||
}
|
||||
want.ContainerImage = report.Topology{
|
||||
Adjacency: report.Adjacency{},
|
||||
EdgeMetadatas: report.EdgeMetadatas{},
|
||||
NodeMetadatas: report.NodeMetadatas{
|
||||
report.MakeContainerNodeID("", "baz"): report.NodeMetadata{
|
||||
docker.ImageID: "baz",
|
||||
docker.ImageName: "bang",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
reporter := docker.NewReporter(mockRegistryInstance, "")
|
||||
have := reporter.Report()
|
||||
if !reflect.DeepEqual(want, have) {
|
||||
t.Errorf("%s", test.Diff(want, have))
|
||||
}
|
||||
}
|
||||
87
probe/docker/tagger.go
Normal file
87
probe/docker/tagger.go
Normal file
@@ -0,0 +1,87 @@
|
||||
package docker
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
|
||||
"github.com/weaveworks/scope/probe/tag"
|
||||
"github.com/weaveworks/scope/report"
|
||||
)
|
||||
|
||||
// These constants are keys used in node metadata
|
||||
// TODO: use these constants in report/{mapping.go, detailed_node.go} - pending some circular references
|
||||
const (
|
||||
ContainerID = "docker_container_id"
|
||||
)
|
||||
|
||||
// These vars are exported for testing.
|
||||
var (
|
||||
NewPIDTreeStub = tag.NewPIDTree
|
||||
)
|
||||
|
||||
// Tagger is a tagger that tags Docker container information to process
|
||||
// nodes that have a PID.
|
||||
type Tagger struct {
|
||||
procRoot string
|
||||
registry Registry
|
||||
}
|
||||
|
||||
// NewTagger returns a usable Tagger.
|
||||
func NewTagger(registry Registry, procRoot string) *Tagger {
|
||||
return &Tagger{
|
||||
registry: registry,
|
||||
procRoot: procRoot,
|
||||
}
|
||||
}
|
||||
|
||||
// Tag implements Tagger.
|
||||
func (t *Tagger) Tag(r report.Report) (report.Report, error) {
|
||||
pidTree, err := NewPIDTreeStub(t.procRoot)
|
||||
if err != nil {
|
||||
return report.MakeReport(), err
|
||||
}
|
||||
t.tag(pidTree, &r.Process)
|
||||
return r, nil
|
||||
}
|
||||
|
||||
func (t *Tagger) tag(pidTree tag.PIDTree, topology *report.Topology) {
|
||||
for nodeID, nodeMetadata := range topology.NodeMetadatas {
|
||||
pidStr, ok := nodeMetadata["pid"]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
pid, err := strconv.ParseUint(pidStr, 10, 64)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
var (
|
||||
c Container
|
||||
candidate = int(pid)
|
||||
)
|
||||
|
||||
t.registry.LockedPIDLookup(func(lookup func(int) Container) {
|
||||
for {
|
||||
c = lookup(candidate)
|
||||
if c != nil {
|
||||
break
|
||||
}
|
||||
|
||||
candidate, err = pidTree.GetParent(candidate)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
if c == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
md := report.NodeMetadata{
|
||||
ContainerID: c.ID(),
|
||||
}
|
||||
|
||||
topology.NodeMetadatas[nodeID].Merge(md)
|
||||
}
|
||||
}
|
||||
60
probe/docker/tagger_test.go
Normal file
60
probe/docker/tagger_test.go
Normal file
@@ -0,0 +1,60 @@
|
||||
package docker_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/weaveworks/scope/probe/docker"
|
||||
"github.com/weaveworks/scope/probe/tag"
|
||||
"github.com/weaveworks/scope/report"
|
||||
"github.com/weaveworks/scope/test"
|
||||
)
|
||||
|
||||
type mockPIDTree struct {
|
||||
parents map[int]int
|
||||
}
|
||||
|
||||
func (m *mockPIDTree) GetParent(pid int) (int, error) {
|
||||
parent, ok := m.parents[pid]
|
||||
if !ok {
|
||||
return -1, fmt.Errorf("Not found %d", pid)
|
||||
}
|
||||
return parent, nil
|
||||
}
|
||||
|
||||
func (m *mockPIDTree) ProcessTopology(hostID string) report.Topology {
|
||||
panic("")
|
||||
}
|
||||
|
||||
func TestTagger(t *testing.T) {
|
||||
oldPIDTree := docker.NewPIDTreeStub
|
||||
defer func() { docker.NewPIDTreeStub = oldPIDTree }()
|
||||
|
||||
docker.NewPIDTreeStub = func(procRoot string) (tag.PIDTree, error) {
|
||||
return &mockPIDTree{map[int]int{2: 1}}, nil
|
||||
}
|
||||
|
||||
var (
|
||||
pid1NodeID = report.MakeProcessNodeID("somehost.com", "1")
|
||||
pid2NodeID = report.MakeProcessNodeID("somehost.com", "2")
|
||||
wantNodeMetadata = report.NodeMetadata{docker.ContainerID: "ping"}
|
||||
)
|
||||
|
||||
input := report.MakeReport()
|
||||
input.Process.NodeMetadatas[pid1NodeID] = report.NodeMetadata{"pid": "1"}
|
||||
input.Process.NodeMetadatas[pid2NodeID] = report.NodeMetadata{"pid": "2"}
|
||||
|
||||
want := report.MakeReport()
|
||||
want.Process.NodeMetadatas[pid1NodeID] = report.NodeMetadata{"pid": "1"}.Merge(wantNodeMetadata)
|
||||
want.Process.NodeMetadatas[pid2NodeID] = report.NodeMetadata{"pid": "2"}.Merge(wantNodeMetadata)
|
||||
|
||||
tagger := docker.NewTagger(mockRegistryInstance, "/irrelevant")
|
||||
have, err := tagger.Tag(input)
|
||||
if err != nil {
|
||||
t.Errorf("%v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(want, have) {
|
||||
t.Errorf("%s", test.Diff(want, have))
|
||||
}
|
||||
}
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/weaveworks/procspy"
|
||||
"github.com/weaveworks/scope/probe/docker"
|
||||
"github.com/weaveworks/scope/probe/tag"
|
||||
"github.com/weaveworks/scope/report"
|
||||
"github.com/weaveworks/scope/xfer"
|
||||
@@ -73,18 +74,21 @@ func main() {
|
||||
)
|
||||
|
||||
var (
|
||||
dockerTagger *tag.DockerTagger
|
||||
weaveTagger *tag.WeaveTagger
|
||||
weaveTagger *tag.WeaveTagger
|
||||
)
|
||||
|
||||
taggers := []tag.Tagger{tag.NewTopologyTagger(), tag.NewOriginHostTagger(hostID)}
|
||||
reporters := []tag.Reporter{}
|
||||
|
||||
if *dockerEnabled && runtime.GOOS == linux {
|
||||
var err error
|
||||
dockerTagger, err = tag.NewDockerTagger(*procRoot, *dockerInterval)
|
||||
dockerRegistry, err := docker.NewRegistry(*dockerInterval)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to start docker tagger: %v", err)
|
||||
log.Fatalf("failed to start docker registry: %v", err)
|
||||
}
|
||||
defer dockerTagger.Stop()
|
||||
taggers = append(taggers, dockerTagger)
|
||||
defer dockerRegistry.Stop()
|
||||
|
||||
taggers = append(taggers, docker.NewTagger(dockerRegistry, *procRoot))
|
||||
reporters = append(reporters, docker.NewReporter(dockerRegistry, hostID))
|
||||
}
|
||||
|
||||
if *weaveRouterAddr != "" {
|
||||
@@ -128,9 +132,8 @@ func main() {
|
||||
}
|
||||
}
|
||||
|
||||
if dockerTagger != nil {
|
||||
r.Container.Merge(dockerTagger.ContainerTopology(hostID))
|
||||
r.ContainerImage.Merge(dockerTagger.ContainerImageTopology(hostID))
|
||||
for _, reporter := range reporters {
|
||||
r.Merge(reporter.Report())
|
||||
}
|
||||
|
||||
if weaveTagger != nil {
|
||||
|
||||
@@ -1,364 +0,0 @@
|
||||
package tag
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
docker "github.com/fsouza/go-dockerclient"
|
||||
"github.com/weaveworks/scope/report"
|
||||
)
|
||||
|
||||
const (
|
||||
start = "start"
|
||||
die = "die"
|
||||
endpoint = "unix:///var/run/docker.sock"
|
||||
)
|
||||
|
||||
// These constants are keys used in node metadata
|
||||
// TODO: use these constants in report/{mapping.go, detailed_node.go} - pending some circular references
|
||||
const (
|
||||
ContainerID = "docker_container_id"
|
||||
ContainerName = "docker_container_name"
|
||||
ImageID = "docker_image_id"
|
||||
ImageName = "docker_image_name"
|
||||
)
|
||||
|
||||
var (
|
||||
newDockerClientStub = newDockerClient
|
||||
newPIDTreeStub = NewPIDTree
|
||||
)
|
||||
|
||||
// DockerTagger is a tagger that tags Docker container information to process
|
||||
// nodes that have a PID.
|
||||
type DockerTagger struct {
|
||||
sync.RWMutex
|
||||
quit chan struct{}
|
||||
interval time.Duration
|
||||
client dockerClient
|
||||
|
||||
containers map[string]*dockerContainer
|
||||
containersByPID map[int]*dockerContainer
|
||||
images map[string]*docker.APIImages
|
||||
|
||||
procRoot string
|
||||
pidTree *PIDTree
|
||||
}
|
||||
|
||||
// Sub-interface for mocking.
|
||||
type dockerClient interface {
|
||||
ListContainers(docker.ListContainersOptions) ([]docker.APIContainers, error)
|
||||
InspectContainer(string) (*docker.Container, error)
|
||||
ListImages(docker.ListImagesOptions) ([]docker.APIImages, error)
|
||||
AddEventListener(chan<- *docker.APIEvents) error
|
||||
RemoveEventListener(chan *docker.APIEvents) error
|
||||
}
|
||||
|
||||
func newDockerClient(endpoint string) (dockerClient, error) {
|
||||
return docker.NewClient(endpoint)
|
||||
}
|
||||
|
||||
// NewDockerTagger returns a usable DockerTagger. Don't forget to Stop it.
|
||||
func NewDockerTagger(procRoot string, interval time.Duration) (*DockerTagger, error) {
|
||||
pidTree, err := newPIDTreeStub(procRoot)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
t := DockerTagger{
|
||||
containers: map[string]*dockerContainer{},
|
||||
containersByPID: map[int]*dockerContainer{},
|
||||
images: map[string]*docker.APIImages{},
|
||||
|
||||
procRoot: procRoot,
|
||||
pidTree: pidTree,
|
||||
|
||||
interval: interval,
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
|
||||
go t.loop()
|
||||
return &t, nil
|
||||
}
|
||||
|
||||
// Stop stops the Docker tagger's event subscriber.
|
||||
func (t *DockerTagger) Stop() {
|
||||
close(t.quit)
|
||||
}
|
||||
|
||||
func (t *DockerTagger) loop() {
|
||||
if !t.update() {
|
||||
return
|
||||
}
|
||||
|
||||
ticker := time.Tick(t.interval)
|
||||
for {
|
||||
select {
|
||||
case <-ticker:
|
||||
if !t.update() {
|
||||
return
|
||||
}
|
||||
|
||||
case <-t.quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (t *DockerTagger) update() bool {
|
||||
client, err := newDockerClientStub(endpoint)
|
||||
if err != nil {
|
||||
log.Printf("docker mapper: %s", err)
|
||||
return true
|
||||
}
|
||||
t.client = client
|
||||
|
||||
events := make(chan *docker.APIEvents)
|
||||
if err := client.AddEventListener(events); err != nil {
|
||||
log.Printf("docker mapper: %s", err)
|
||||
return true
|
||||
}
|
||||
defer func() {
|
||||
if err := client.RemoveEventListener(events); err != nil {
|
||||
log.Printf("docker mapper: %s", err)
|
||||
}
|
||||
}()
|
||||
|
||||
if err := t.updateContainers(); err != nil {
|
||||
log.Printf("docker mapper: %s", err)
|
||||
return true
|
||||
}
|
||||
|
||||
if err := t.updateImages(); err != nil {
|
||||
log.Printf("docker mapper: %s", err)
|
||||
return true
|
||||
}
|
||||
|
||||
otherUpdates := time.Tick(t.interval)
|
||||
for {
|
||||
select {
|
||||
case event := <-events:
|
||||
t.handleEvent(event)
|
||||
|
||||
case <-otherUpdates:
|
||||
if err := t.updatePIDTree(); err != nil {
|
||||
log.Printf("docker mapper: %s", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if err := t.updateImages(); err != nil {
|
||||
log.Printf("docker mapper: %s", err)
|
||||
continue
|
||||
}
|
||||
|
||||
case <-t.quit:
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (t *DockerTagger) updateContainers() error {
|
||||
apiContainers, err := t.client.ListContainers(docker.ListContainersOptions{All: true})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, apiContainer := range apiContainers {
|
||||
if err := t.addContainer(apiContainer.ID); err != nil {
|
||||
log.Printf("docker mapper: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *DockerTagger) updateImages() error {
|
||||
images, err := t.client.ListImages(docker.ListImagesOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
t.Lock()
|
||||
for i := range images {
|
||||
image := &images[i]
|
||||
t.images[image.ID] = image
|
||||
}
|
||||
t.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *DockerTagger) handleEvent(event *docker.APIEvents) {
|
||||
switch event.Status {
|
||||
case die:
|
||||
containerID := event.ID
|
||||
t.removeContainer(containerID)
|
||||
|
||||
case start:
|
||||
containerID := event.ID
|
||||
if err := t.addContainer(containerID); err != nil {
|
||||
log.Printf("docker mapper: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (t *DockerTagger) updatePIDTree() error {
|
||||
pidTree, err := newPIDTreeStub(t.procRoot)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
t.Lock()
|
||||
t.pidTree = pidTree
|
||||
t.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *DockerTagger) addContainer(containerID string) error {
|
||||
container, err := t.client.InspectContainer(containerID)
|
||||
if err != nil {
|
||||
// Don't spam the logs if the container was short lived
|
||||
if _, ok := err.(*docker.NoSuchContainer); !ok {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if !container.State.Running {
|
||||
return fmt.Errorf("docker mapper: container %s not running", containerID)
|
||||
}
|
||||
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
|
||||
dockerContainer := &dockerContainer{Container: container}
|
||||
|
||||
t.containers[containerID] = dockerContainer
|
||||
t.containersByPID[container.State.Pid] = dockerContainer
|
||||
|
||||
return dockerContainer.startGatheringStats(containerID)
|
||||
}
|
||||
|
||||
func (t *DockerTagger) removeContainer(containerID string) {
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
|
||||
container, ok := t.containers[containerID]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
delete(t.containers, containerID)
|
||||
delete(t.containersByPID, container.State.Pid)
|
||||
container.stopGatheringStats(containerID)
|
||||
}
|
||||
|
||||
// Containers returns the Containers the DockerTagger knows about.
|
||||
func (t *DockerTagger) Containers() []*docker.Container {
|
||||
containers := []*docker.Container{}
|
||||
|
||||
t.RLock()
|
||||
for _, container := range t.containers {
|
||||
containers = append(containers, container.Container)
|
||||
}
|
||||
t.RUnlock()
|
||||
|
||||
return containers
|
||||
}
|
||||
|
||||
// Tag implements Tagger.
|
||||
func (t *DockerTagger) Tag(r report.Report) report.Report {
|
||||
t.tag(&r.Process)
|
||||
return r
|
||||
}
|
||||
|
||||
func (t *DockerTagger) tag(topology *report.Topology) {
|
||||
for nodeID, nodeMetadata := range topology.NodeMetadatas {
|
||||
pidStr, ok := nodeMetadata["pid"]
|
||||
if !ok {
|
||||
//log.Printf("dockerTagger: %q: no process node ID", id)
|
||||
continue
|
||||
}
|
||||
pid, err := strconv.ParseUint(pidStr, 10, 64)
|
||||
if err != nil {
|
||||
//log.Printf("dockerTagger: %q: bad process node PID (%v)", id, err)
|
||||
continue
|
||||
}
|
||||
|
||||
var (
|
||||
container *dockerContainer
|
||||
candidate = int(pid)
|
||||
)
|
||||
|
||||
t.RLock()
|
||||
for {
|
||||
container, ok = t.containersByPID[candidate]
|
||||
if ok {
|
||||
break
|
||||
}
|
||||
candidate, err = t.pidTree.getParent(candidate)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
t.RUnlock()
|
||||
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
md := report.NodeMetadata{
|
||||
ContainerID: container.ID,
|
||||
}
|
||||
|
||||
topology.NodeMetadatas[nodeID].Merge(md)
|
||||
}
|
||||
}
|
||||
|
||||
// ContainerTopology produces a Toplogy of Containers
|
||||
func (t *DockerTagger) ContainerTopology(scope string) report.Topology {
|
||||
t.RLock()
|
||||
defer t.RUnlock()
|
||||
|
||||
result := report.NewTopology()
|
||||
for _, container := range t.containers {
|
||||
nmd := report.NodeMetadata{
|
||||
ContainerID: container.ID,
|
||||
ContainerName: strings.TrimPrefix(container.Name, "/"),
|
||||
ImageID: container.Image,
|
||||
}
|
||||
|
||||
nmd.Merge(container.getStats())
|
||||
|
||||
nodeID := report.MakeContainerNodeID(scope, container.ID)
|
||||
result.NodeMetadatas[nodeID] = nmd
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// ContainerImageTopology produces a Toplogy of Container Images
|
||||
func (t *DockerTagger) ContainerImageTopology(scope string) report.Topology {
|
||||
t.RLock()
|
||||
defer t.RUnlock()
|
||||
|
||||
result := report.NewTopology()
|
||||
|
||||
// Loop over containers so we only emit images for running containers.
|
||||
for _, container := range t.containers {
|
||||
nmd := report.NodeMetadata{
|
||||
ImageID: container.Image,
|
||||
}
|
||||
|
||||
image, ok := t.images[container.Image]
|
||||
if ok && len(image.RepoTags) > 0 {
|
||||
nmd[ImageName] = image.RepoTags[0]
|
||||
}
|
||||
|
||||
nodeID := report.MakeContainerNodeID(scope, container.Image)
|
||||
result.NodeMetadatas[nodeID] = nmd
|
||||
}
|
||||
return result
|
||||
}
|
||||
@@ -1,123 +0,0 @@
|
||||
package tag
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"runtime"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
docker "github.com/fsouza/go-dockerclient"
|
||||
"github.com/weaveworks/scope/report"
|
||||
"github.com/weaveworks/scope/test"
|
||||
)
|
||||
|
||||
type mockDockerClient struct {
|
||||
apiContainers []docker.APIContainers
|
||||
containers map[string]*docker.Container
|
||||
apiImages []docker.APIImages
|
||||
}
|
||||
|
||||
func (m mockDockerClient) ListContainers(docker.ListContainersOptions) ([]docker.APIContainers, error) {
|
||||
return m.apiContainers, nil
|
||||
}
|
||||
|
||||
func (m mockDockerClient) InspectContainer(id string) (*docker.Container, error) {
|
||||
return m.containers[id], nil
|
||||
}
|
||||
|
||||
func (m mockDockerClient) ListImages(docker.ListImagesOptions) ([]docker.APIImages, error) {
|
||||
return m.apiImages, nil
|
||||
}
|
||||
|
||||
func (m mockDockerClient) AddEventListener(events chan<- *docker.APIEvents) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m mockDockerClient) RemoveEventListener(events chan *docker.APIEvents) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestDockerTagger(t *testing.T) {
|
||||
oldPIDTree, oldDockerClient := newPIDTreeStub, newDockerClientStub
|
||||
defer func() { newPIDTreeStub, newDockerClientStub = oldPIDTree, oldDockerClient }()
|
||||
|
||||
newPIDTreeStub = func(procRoot string) (*PIDTree, error) {
|
||||
pid1 := &Process{PID: 1}
|
||||
pid2 := &Process{PID: 2, PPID: 1, parent: pid1}
|
||||
pid1.children = []*Process{pid2}
|
||||
return &PIDTree{
|
||||
processes: map[int]*Process{
|
||||
1: pid1, 2: pid2,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
newDockerClientStub = func(endpoint string) (dockerClient, error) {
|
||||
return mockDockerClient{
|
||||
apiContainers: []docker.APIContainers{{ID: "foo"}},
|
||||
containers: map[string]*docker.Container{
|
||||
"foo": {
|
||||
ID: "foo",
|
||||
Name: "bar",
|
||||
Image: "baz",
|
||||
State: docker.State{Pid: 1, Running: true},
|
||||
},
|
||||
},
|
||||
apiImages: []docker.APIImages{{ID: "baz", RepoTags: []string{"bang", "not-chosen"}}},
|
||||
}, nil
|
||||
}
|
||||
|
||||
var (
|
||||
pid1NodeID = report.MakeProcessNodeID("somehost.com", "1")
|
||||
pid2NodeID = report.MakeProcessNodeID("somehost.com", "2")
|
||||
processNodeMetadata = report.NodeMetadata{
|
||||
ContainerID: "foo",
|
||||
}
|
||||
wantContainerTopology = report.Topology{
|
||||
Adjacency: report.Adjacency{},
|
||||
EdgeMetadatas: report.EdgeMetadatas{},
|
||||
NodeMetadatas: report.NodeMetadatas{
|
||||
report.MakeContainerNodeID("", "foo"): report.NodeMetadata{
|
||||
ContainerID: "foo",
|
||||
ContainerName: "bar",
|
||||
ImageID: "baz",
|
||||
},
|
||||
},
|
||||
}
|
||||
wantContainerImageTopology = report.Topology{
|
||||
Adjacency: report.Adjacency{},
|
||||
EdgeMetadatas: report.EdgeMetadatas{},
|
||||
NodeMetadatas: report.NodeMetadatas{
|
||||
report.MakeContainerNodeID("", "baz"): report.NodeMetadata{
|
||||
ImageID: "baz",
|
||||
ImageName: "bang",
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
r := report.MakeReport()
|
||||
r.Process.NodeMetadatas[pid1NodeID] = report.NodeMetadata{"pid": "1"}
|
||||
r.Process.NodeMetadatas[pid2NodeID] = report.NodeMetadata{"pid": "2"}
|
||||
|
||||
dockerTagger, _ := NewDockerTagger("/irrelevant", 10*time.Second)
|
||||
runtime.Gosched()
|
||||
for _, nodeID := range []string{pid1NodeID, pid2NodeID} {
|
||||
want := processNodeMetadata.Copy()
|
||||
have := dockerTagger.Tag(r).Process.NodeMetadatas[nodeID].Copy()
|
||||
delete(have, "pid")
|
||||
if !reflect.DeepEqual(want, have) {
|
||||
t.Errorf("%q: want %+v, have %+v", nodeID, want, have)
|
||||
}
|
||||
}
|
||||
|
||||
haveContainerTopology := dockerTagger.ContainerTopology("")
|
||||
if !reflect.DeepEqual(wantContainerTopology, haveContainerTopology) {
|
||||
t.Errorf("%s", test.Diff(wantContainerTopology, haveContainerTopology))
|
||||
}
|
||||
|
||||
haveContainerImageTopology := dockerTagger.ContainerImageTopology("")
|
||||
if !reflect.DeepEqual(wantContainerImageTopology, haveContainerImageTopology) {
|
||||
t.Errorf("%s", test.Diff(wantContainerImageTopology, haveContainerImageTopology))
|
||||
}
|
||||
}
|
||||
@@ -12,12 +12,12 @@ func NewOriginHostTagger(hostID string) Tagger {
|
||||
return &originHostTagger{hostNodeID: report.MakeHostNodeID(hostID)}
|
||||
}
|
||||
|
||||
func (t originHostTagger) Tag(r report.Report) report.Report {
|
||||
func (t originHostTagger) Tag(r report.Report) (report.Report, error) {
|
||||
for _, topology := range r.Topologies() {
|
||||
md := report.NodeMetadata{report.HostNodeID: t.hostNodeID}
|
||||
for nodeID := range topology.NodeMetadatas {
|
||||
topology.NodeMetadatas[nodeID].Merge(md)
|
||||
}
|
||||
}
|
||||
return r
|
||||
return r, nil
|
||||
}
|
||||
|
||||
@@ -18,7 +18,8 @@ func TestOriginHostTagger(t *testing.T) {
|
||||
r := report.MakeReport()
|
||||
r.Endpoint.NodeMetadatas[endpointNodeID] = nodeMetadata
|
||||
want := nodeMetadata.Merge(report.NodeMetadata{report.HostNodeID: report.MakeHostNodeID(hostID)})
|
||||
have := tag.NewOriginHostTagger(hostID).Tag(r).Endpoint.NodeMetadatas[endpointNodeID].Copy()
|
||||
rpt, _ := tag.NewOriginHostTagger(hostID).Tag(r)
|
||||
have := rpt.Endpoint.NodeMetadatas[endpointNodeID].Copy()
|
||||
if !reflect.DeepEqual(want, have) {
|
||||
t.Errorf("\nwant %+v\nhave %+v", want, have)
|
||||
}
|
||||
|
||||
@@ -11,16 +11,21 @@ import (
|
||||
)
|
||||
|
||||
// PIDTree represents all processes on the machine.
|
||||
type PIDTree struct {
|
||||
processes map[int]*Process
|
||||
type PIDTree interface {
|
||||
GetParent(pid int) (int, error)
|
||||
ProcessTopology(hostID string) report.Topology
|
||||
}
|
||||
|
||||
type pidTree struct {
|
||||
processes map[int]*process
|
||||
}
|
||||
|
||||
// Process represents a single process.
|
||||
type Process struct {
|
||||
PID, PPID int
|
||||
Comm string
|
||||
parent *Process
|
||||
children []*Process
|
||||
type process struct {
|
||||
pid, ppid int
|
||||
comm string
|
||||
parent *process
|
||||
children []*process
|
||||
}
|
||||
|
||||
// Hooks for mocking
|
||||
@@ -30,13 +35,13 @@ var (
|
||||
)
|
||||
|
||||
// NewPIDTree returns a new PIDTree that can be polled.
|
||||
func NewPIDTree(procRoot string) (*PIDTree, error) {
|
||||
func NewPIDTree(procRoot string) (PIDTree, error) {
|
||||
dirEntries, err := readDir(procRoot)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pt := PIDTree{processes: map[int]*Process{}}
|
||||
pt := pidTree{processes: map[int]*process{}}
|
||||
for _, dirEntry := range dirEntries {
|
||||
filename := dirEntry.Name()
|
||||
pid, err := strconv.Atoi(filename)
|
||||
@@ -59,15 +64,15 @@ func NewPIDTree(procRoot string) (*PIDTree, error) {
|
||||
comm = string(commBuf)
|
||||
}
|
||||
|
||||
pt.processes[pid] = &Process{
|
||||
PID: pid,
|
||||
PPID: ppid,
|
||||
Comm: comm,
|
||||
pt.processes[pid] = &process{
|
||||
pid: pid,
|
||||
ppid: ppid,
|
||||
comm: comm,
|
||||
}
|
||||
}
|
||||
|
||||
for _, child := range pt.processes {
|
||||
parent, ok := pt.processes[child.PPID]
|
||||
parent, ok := pt.processes[child.ppid]
|
||||
if !ok {
|
||||
// This can happen as listing proc is not a consistent snapshot
|
||||
continue
|
||||
@@ -79,48 +84,28 @@ func NewPIDTree(procRoot string) (*PIDTree, error) {
|
||||
return &pt, nil
|
||||
}
|
||||
|
||||
func (pt *PIDTree) getParent(pid int) (int, error) {
|
||||
// GetParent returns the pid of the parent process for a given pid
|
||||
func (pt *pidTree) GetParent(pid int) (int, error) {
|
||||
proc, ok := pt.processes[pid]
|
||||
if !ok {
|
||||
return -1, fmt.Errorf("PID %d not found", pid)
|
||||
}
|
||||
|
||||
return proc.PPID, nil
|
||||
}
|
||||
|
||||
// allChildren returns a flattened list of child pids including the given pid
|
||||
func (pt *PIDTree) allChildren(pid int) ([]int, error) {
|
||||
proc, ok := pt.processes[pid]
|
||||
if !ok {
|
||||
return []int{}, fmt.Errorf("PID %d not found", pid)
|
||||
}
|
||||
|
||||
var result []int
|
||||
|
||||
var f func(*Process)
|
||||
f = func(p *Process) {
|
||||
result = append(result, p.PID)
|
||||
for _, child := range p.children {
|
||||
f(child)
|
||||
}
|
||||
}
|
||||
|
||||
f(proc)
|
||||
return result, nil
|
||||
return proc.ppid, nil
|
||||
}
|
||||
|
||||
// ProcessTopology returns a process topology based on the current state of the PIDTree.
|
||||
func (pt *PIDTree) ProcessTopology(hostID string) report.Topology {
|
||||
func (pt *pidTree) ProcessTopology(hostID string) report.Topology {
|
||||
t := report.NewTopology()
|
||||
for pid, proc := range pt.processes {
|
||||
pidstr := strconv.Itoa(pid)
|
||||
nodeID := report.MakeProcessNodeID(hostID, pidstr)
|
||||
t.NodeMetadatas[nodeID] = report.NodeMetadata{
|
||||
"pid": pidstr,
|
||||
"comm": proc.Comm,
|
||||
"comm": proc.comm,
|
||||
}
|
||||
if proc.PPID > 0 {
|
||||
t.NodeMetadatas[nodeID]["ppid"] = strconv.Itoa(proc.PPID)
|
||||
if proc.ppid > 0 {
|
||||
t.NodeMetadatas[nodeID]["ppid"] = strconv.Itoa(proc.ppid)
|
||||
}
|
||||
}
|
||||
return t
|
||||
|
||||
@@ -48,16 +48,16 @@ func TestPIDTree(t *testing.T) {
|
||||
return []byte(fmt.Sprintf("%d na R %d", pid, parent)), nil
|
||||
}
|
||||
|
||||
pidtree, err := newPIDTreeStub("/proc")
|
||||
pidtree, err := NewPIDTree("/proc")
|
||||
if err != nil {
|
||||
t.Fatalf("newPIDTree error: %v", err)
|
||||
}
|
||||
|
||||
for pid, want := range map[int][]int{
|
||||
1: {1, 2, 3, 4},
|
||||
2: {2, 3, 4},
|
||||
for pid, want := range map[int]int{
|
||||
2: 1,
|
||||
3: 2,
|
||||
} {
|
||||
have, err := pidtree.allChildren(pid)
|
||||
have, err := pidtree.GetParent(pid)
|
||||
if err != nil || !reflect.DeepEqual(want, have) {
|
||||
t.Errorf("%d: want %#v, have %#v (%v)", pid, want, have, err)
|
||||
}
|
||||
|
||||
@@ -1,16 +1,29 @@
|
||||
package tag
|
||||
|
||||
import "github.com/weaveworks/scope/report"
|
||||
import (
|
||||
"log"
|
||||
|
||||
"github.com/weaveworks/scope/report"
|
||||
)
|
||||
|
||||
// Tagger tags nodes with value-add node metadata.
|
||||
type Tagger interface {
|
||||
Tag(r report.Report) report.Report
|
||||
Tag(r report.Report) (report.Report, error)
|
||||
}
|
||||
|
||||
// Reporter generates Reports.
|
||||
type Reporter interface {
|
||||
Report() report.Report
|
||||
}
|
||||
|
||||
// Apply tags the report with all the taggers.
|
||||
func Apply(r report.Report, taggers []Tagger) report.Report {
|
||||
var err error
|
||||
for _, tagger := range taggers {
|
||||
r = tagger.Tag(r)
|
||||
r, err = tagger.Tag(r)
|
||||
if err != nil {
|
||||
log.Printf("error applying tagger: %v", err)
|
||||
}
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
@@ -11,7 +11,7 @@ func NewTopologyTagger() Tagger {
|
||||
return &topologyTagger{}
|
||||
}
|
||||
|
||||
func (topologyTagger) Tag(r report.Report) report.Report {
|
||||
func (topologyTagger) Tag(r report.Report) (report.Report, error) {
|
||||
for val, topology := range map[string]*report.Topology{
|
||||
"endpoint": &(r.Endpoint),
|
||||
"address": &(r.Address),
|
||||
@@ -24,5 +24,5 @@ func (topologyTagger) Tag(r report.Report) report.Report {
|
||||
(*topology).NodeMetadatas[nodeID].Merge(md)
|
||||
}
|
||||
}
|
||||
return r
|
||||
return r, nil
|
||||
}
|
||||
|
||||
@@ -12,7 +12,8 @@ func TestTagMissingID(t *testing.T) {
|
||||
const nodeID = "not-found"
|
||||
r := report.MakeReport()
|
||||
want := report.NodeMetadata{}
|
||||
have := tag.NewTopologyTagger().Tag(r).Endpoint.NodeMetadatas[nodeID].Copy()
|
||||
rpt, _ := tag.NewTopologyTagger().Tag(r)
|
||||
have := rpt.Endpoint.NodeMetadatas[nodeID].Copy()
|
||||
if !reflect.DeepEqual(want, have) {
|
||||
t.Error("TopologyTagger erroneously tagged a missing node ID")
|
||||
}
|
||||
|
||||
@@ -41,10 +41,10 @@ func NewWeaveTagger(weaveRouterAddress string) (*WeaveTagger, error) {
|
||||
}
|
||||
|
||||
// Tag implements Tagger.
|
||||
func (t WeaveTagger) Tag(r report.Report) report.Report {
|
||||
func (t WeaveTagger) Tag(r report.Report) (report.Report, error) {
|
||||
// The status-json endpoint doesn't return any link information, so
|
||||
// there's nothing to tag, yet.
|
||||
return r
|
||||
return r, nil
|
||||
}
|
||||
|
||||
// OverlayTopology produces an overlay topology from the Weave router.
|
||||
|
||||
Reference in New Issue
Block a user