diff --git a/probe/docker_process_mapper.go b/probe/docker_process_mapper.go index 941616efb..faffbcb4d 100644 --- a/probe/docker_process_mapper.go +++ b/probe/docker_process_mapper.go @@ -10,25 +10,48 @@ import ( docker "github.com/fsouza/go-dockerclient" ) +const ( + stop = "stop" + start = "start" +) + type dockerMapper struct { sync.RWMutex - containers map[int]*docker.Container - images map[string]*docker.APIImages - procRoot string + + containers map[string]*docker.Container + containersByPID map[int]*docker.Container + images map[string]*docker.APIImages + + procRoot string + pidTree *pidTree + + interval time.Duration } -func newDockerMapper(procRoot string, interval time.Duration) *dockerMapper { - m := dockerMapper{ - procRoot: procRoot, - containers: map[int]*docker.Container{}, +func newDockerMapper(procRoot string, interval time.Duration) (*dockerMapper, error) { + pidTree, err := newPIDTreeStub(procRoot) + if err != nil { + return nil, err } - m.update() - go m.loop(interval) - return &m + + m := dockerMapper{ + containers: map[string]*docker.Container{}, + containersByPID: map[int]*docker.Container{}, + images: map[string]*docker.APIImages{}, + + procRoot: procRoot, + pidTree: pidTree, + + interval: interval, + } + + go m.loop() + return &m, nil } -func (m *dockerMapper) loop(d time.Duration) { - for range time.Tick(d) { +func (m *dockerMapper) loop() { + m.update() + for range time.Tick(m.interval) { m.update() } } @@ -38,6 +61,8 @@ type dockerClient interface { ListContainers(docker.ListContainersOptions) ([]docker.APIContainers, error) InspectContainer(string) (*docker.Container, error) ListImages(docker.ListImagesOptions) ([]docker.APIImages, error) + AddEventListener(chan<- *docker.APIEvents) error + RemoveEventListener(chan *docker.APIEvents) error } func newRealDockerClient(endpoint string) (dockerClient, error) { @@ -50,12 +75,6 @@ var ( ) func (m *dockerMapper) update() { - pidTree, err := newPIDTreeStub(m.procRoot) - if err != nil { - log.Printf("docker mapper: %s", err) - return - } - endpoint := "unix:///var/run/docker.sock" client, err := newDockerClient(endpoint) if err != nil { @@ -63,50 +82,137 @@ func (m *dockerMapper) update() { return } - containers, err := client.ListContainers(docker.ListContainersOptions{All: true}) - if err != nil { + events := make(chan *docker.APIEvents) + if err := client.AddEventListener(events); err != nil { + log.Printf("docker mapper: %s", err) + return + } + defer func() { + if err := client.RemoveEventListener(events); err != nil { + log.Printf("docker mapper: %s", err) + } + }() + + if err := m.updateContainers(client); err != nil { log.Printf("docker mapper: %s", err) return } - pmap := map[int]*docker.Container{} - for _, container := range containers { - info, err := client.InspectContainer(container.ID) + if err := m.updateImages(client); err != nil { + log.Printf("docker mapper: %s", err) + return + } + + otherUpdates := time.Tick(m.interval) + for { + select { + case event := <-events: + m.handleEvent(event, client) + + case <-otherUpdates: + if err := m.updatePIDTree(); err != nil { + log.Printf("docker mapper: %s", err) + continue + } + + if err := m.updateImages(client); err != nil { + log.Printf("docker mapper: %s", err) + continue + } + } + } +} + +func (m *dockerMapper) updateContainers(client dockerClient) error { + apiContainers, err := client.ListContainers(docker.ListContainersOptions{All: true}) + if err != nil { + return err + } + + containers := []*docker.Container{} + for _, apiContainer := range apiContainers { + container, err := client.InspectContainer(apiContainer.ID) if err != nil { log.Printf("docker mapper: %s", err) continue } - if !info.State.Running { + if !container.State.Running { continue } - pids, err := pidTree.allChildren(info.State.Pid) - if err != nil { - log.Printf("docker mapper: %s", err) - continue - } - for _, pid := range pids { - pmap[pid] = info - } - } - - imageList, err := client.ListImages(docker.ListImagesOptions{}) - if err != nil { - log.Printf("docker mapper: %s", err) - return - } - - imageMap := map[string]*docker.APIImages{} - for i := range imageList { - image := &imageList[i] - imageMap[image.ID] = image + containers = append(containers, container) } m.Lock() - m.containers = pmap - m.images = imageMap + for _, container := range containers { + m.containers[container.ID] = container + m.containersByPID[container.State.Pid] = container + } m.Unlock() + + return nil +} + +func (m *dockerMapper) updateImages(client dockerClient) error { + images, err := client.ListImages(docker.ListImagesOptions{}) + if err != nil { + return err + } + + m.Lock() + for i := range images { + image := &images[i] + m.images[image.ID] = image + } + m.Unlock() + + return nil +} + +func (m *dockerMapper) handleEvent(event *docker.APIEvents, client dockerClient) { + switch event.Status { + case stop: + containerID := event.ID + m.Lock() + if container, ok := m.containers[containerID]; ok { + delete(m.containers, containerID) + delete(m.containersByPID, container.State.Pid) + } else { + log.Printf("docker mapper: container %s not found", containerID) + } + m.Unlock() + + case start: + containerID := event.ID + container, err := client.InspectContainer(containerID) + if err != nil { + log.Printf("docker mapper: %s", err) + return + } + + if !container.State.Running { + log.Printf("docker mapper: container %s not running", containerID) + return + } + + m.Lock() + m.containers[containerID] = container + m.containersByPID[container.State.Pid] = container + m.Unlock() + } +} + +func (m *dockerMapper) updatePIDTree() error { + pidTree, err := newPIDTreeStub(m.procRoot) + if err != nil { + return err + } + + m.Lock() + m.pidTree = pidTree + m.Unlock() + return nil } type dockerProcessMapper struct { @@ -117,11 +223,27 @@ type dockerProcessMapper struct { func (m *dockerProcessMapper) Key() string { return m.key } func (m *dockerProcessMapper) Map(pid uint) (string, error) { + var ( + container *docker.Container + ok bool + err error + candidate = int(pid) + ) + m.RLock() - container, ok := m.containers[int(pid)] + for { + container, ok = m.containersByPID[candidate] + if ok { + break + } + candidate, err = m.pidTree.getParent(candidate) + if err != nil { + break + } + } m.RUnlock() - if !ok { + if err != nil { return "", fmt.Errorf("no container found for PID %d", pid) } diff --git a/probe/docker_process_mapper_test.go b/probe/docker_process_mapper_test.go index 5ca17bf54..aaba6f8d6 100644 --- a/probe/docker_process_mapper_test.go +++ b/probe/docker_process_mapper_test.go @@ -1,6 +1,7 @@ package main import ( + "runtime" "testing" "time" @@ -25,6 +26,14 @@ func (m mockDockerClient) ListImages(options docker.ListImagesOptions) ([]docker return m.apiImages, nil } +func (m mockDockerClient) AddEventListener(events chan<- *docker.APIEvents) error { + return nil +} + +func (m mockDockerClient) RemoveEventListener(events chan *docker.APIEvents) error { + return nil +} + func TestDockerProcessMapper(t *testing.T) { oldPIDTreeStub, oldDockerClientStub := newPIDTreeStub, newDockerClient defer func() { @@ -59,12 +68,14 @@ func TestDockerProcessMapper(t *testing.T) { }, nil } - dockerMapper := newDockerMapper("/proc", 10*time.Second) + dockerMapper, _ := newDockerMapper("/proc", 10*time.Second) dockerIDMapper := dockerMapper.idMapper() dockerNameMapper := dockerMapper.nameMapper() dockerImageIDMapper := dockerMapper.imageIDMapper() dockerImageNameMapper := dockerMapper.imageNameMapper() + runtime.Gosched() + for pid, want := range map[uint]struct{ id, name, imageID, imageName string }{ 1: {"foo", "bar", "baz", "tag"}, 2: {"foo", "bar", "baz", "tag"}, diff --git a/probe/main.go b/probe/main.go index dacc1db43..79d1ee3d7 100644 --- a/probe/main.go +++ b/probe/main.go @@ -78,7 +78,11 @@ func main() { } if *dockerMapper { - docker := newDockerMapper(*procRoot, *dockerInterval) + docker, err := newDockerMapper(*procRoot, *dockerInterval) + if err != nil { + log.Fatal(err) + } + pms = append(pms, docker.idMapper(), docker.nameMapper(), diff --git a/probe/pidtree.go b/probe/pidtree.go index c9a62b6c2..65d0f2d61 100644 --- a/probe/pidtree.go +++ b/probe/pidtree.go @@ -64,6 +64,15 @@ func newPIDTree(procRoot string) (*pidTree, error) { return &pt, nil } +func (pt *pidTree) getParent(pid int) (int, error) { + proc, ok := pt.processes[pid] + if !ok { + return -1, fmt.Errorf("PID %d not found", pid) + } + + return proc.ppid, nil +} + // allChildren returns a flattened list of child pids including the given pid func (pt *pidTree) allChildren(pid int) ([]int, error) { proc, ok := pt.processes[pid]