Files
weave-scope/probe/docker_process_mapper.go
Tom Wilkie 121c86a52d Use Docker events API instead of polling Docker daemon.
Unfortunately we still have to poll pidtree and docker images, but I think we can get rid of the docker image polling as we only care about images on running containers.
2015-06-02 16:41:32 +00:00

284 lines
5.7 KiB
Go

package main
import (
"fmt"
"log"
"strings"
"sync"
"time"
docker "github.com/fsouza/go-dockerclient"
)
const (
stop = "stop"
start = "start"
)
type dockerMapper struct {
sync.RWMutex
containers map[string]*docker.Container
containersByPID map[int]*docker.Container
images map[string]*docker.APIImages
procRoot string
pidTree *pidTree
interval time.Duration
}
func newDockerMapper(procRoot string, interval time.Duration) (*dockerMapper, error) {
pidTree, err := newPIDTreeStub(procRoot)
if err != nil {
return nil, err
}
m := dockerMapper{
containers: map[string]*docker.Container{},
containersByPID: map[int]*docker.Container{},
images: map[string]*docker.APIImages{},
procRoot: procRoot,
pidTree: pidTree,
interval: interval,
}
go m.loop()
return &m, nil
}
func (m *dockerMapper) loop() {
m.update()
for range time.Tick(m.interval) {
m.update()
}
}
// for mocking
type dockerClient interface {
ListContainers(docker.ListContainersOptions) ([]docker.APIContainers, error)
InspectContainer(string) (*docker.Container, error)
ListImages(docker.ListImagesOptions) ([]docker.APIImages, error)
AddEventListener(chan<- *docker.APIEvents) error
RemoveEventListener(chan *docker.APIEvents) error
}
func newRealDockerClient(endpoint string) (dockerClient, error) {
return docker.NewClient(endpoint)
}
var (
newDockerClient = newRealDockerClient
newPIDTreeStub = newPIDTree
)
func (m *dockerMapper) update() {
endpoint := "unix:///var/run/docker.sock"
client, err := newDockerClient(endpoint)
if err != nil {
log.Printf("docker mapper: %s", err)
return
}
events := make(chan *docker.APIEvents)
if err := client.AddEventListener(events); err != nil {
log.Printf("docker mapper: %s", err)
return
}
defer func() {
if err := client.RemoveEventListener(events); err != nil {
log.Printf("docker mapper: %s", err)
}
}()
if err := m.updateContainers(client); err != nil {
log.Printf("docker mapper: %s", err)
return
}
if err := m.updateImages(client); err != nil {
log.Printf("docker mapper: %s", err)
return
}
otherUpdates := time.Tick(m.interval)
for {
select {
case event := <-events:
m.handleEvent(event, client)
case <-otherUpdates:
if err := m.updatePIDTree(); err != nil {
log.Printf("docker mapper: %s", err)
continue
}
if err := m.updateImages(client); err != nil {
log.Printf("docker mapper: %s", err)
continue
}
}
}
}
func (m *dockerMapper) updateContainers(client dockerClient) error {
apiContainers, err := client.ListContainers(docker.ListContainersOptions{All: true})
if err != nil {
return err
}
containers := []*docker.Container{}
for _, apiContainer := range apiContainers {
container, err := client.InspectContainer(apiContainer.ID)
if err != nil {
log.Printf("docker mapper: %s", err)
continue
}
if !container.State.Running {
continue
}
containers = append(containers, container)
}
m.Lock()
for _, container := range containers {
m.containers[container.ID] = container
m.containersByPID[container.State.Pid] = container
}
m.Unlock()
return nil
}
func (m *dockerMapper) updateImages(client dockerClient) error {
images, err := client.ListImages(docker.ListImagesOptions{})
if err != nil {
return err
}
m.Lock()
for i := range images {
image := &images[i]
m.images[image.ID] = image
}
m.Unlock()
return nil
}
func (m *dockerMapper) handleEvent(event *docker.APIEvents, client dockerClient) {
switch event.Status {
case stop:
containerID := event.ID
m.Lock()
if container, ok := m.containers[containerID]; ok {
delete(m.containers, containerID)
delete(m.containersByPID, container.State.Pid)
} else {
log.Printf("docker mapper: container %s not found", containerID)
}
m.Unlock()
case start:
containerID := event.ID
container, err := client.InspectContainer(containerID)
if err != nil {
log.Printf("docker mapper: %s", err)
return
}
if !container.State.Running {
log.Printf("docker mapper: container %s not running", containerID)
return
}
m.Lock()
m.containers[containerID] = container
m.containersByPID[container.State.Pid] = container
m.Unlock()
}
}
func (m *dockerMapper) updatePIDTree() error {
pidTree, err := newPIDTreeStub(m.procRoot)
if err != nil {
return err
}
m.Lock()
m.pidTree = pidTree
m.Unlock()
return nil
}
type dockerProcessMapper struct {
*dockerMapper
key string
f func(*docker.Container) string
}
func (m *dockerProcessMapper) Key() string { return m.key }
func (m *dockerProcessMapper) Map(pid uint) (string, error) {
var (
container *docker.Container
ok bool
err error
candidate = int(pid)
)
m.RLock()
for {
container, ok = m.containersByPID[candidate]
if ok {
break
}
candidate, err = m.pidTree.getParent(candidate)
if err != nil {
break
}
}
m.RUnlock()
if err != nil {
return "", fmt.Errorf("no container found for PID %d", pid)
}
return m.f(container), nil
}
func (m *dockerMapper) idMapper() processMapper {
return &dockerProcessMapper{m, "docker_id", func(c *docker.Container) string {
return c.ID
}}
}
func (m *dockerMapper) nameMapper() processMapper {
return &dockerProcessMapper{m, "docker_name", func(c *docker.Container) string {
return strings.TrimPrefix(c.Name, "/")
}}
}
func (m *dockerMapper) imageIDMapper() processMapper {
return &dockerProcessMapper{m, "docker_image_id", func(c *docker.Container) string {
return c.Image
}}
}
func (m *dockerMapper) imageNameMapper() processMapper {
return &dockerProcessMapper{m, "docker_image_name", func(c *docker.Container) string {
m.RLock()
image, ok := m.images[c.Image]
m.RUnlock()
if !ok || len(image.RepoTags) == 0 {
return ""
}
return image.RepoTags[0]
}}
}