Merge pull request #163 from tomwilkie/84-events

Use Docker events API instead of polling Docker daemon.
This commit is contained in:
Tom Wilkie
2015-06-02 17:58:26 +01:00
4 changed files with 196 additions and 50 deletions

View File

@@ -10,25 +10,48 @@ import (
docker "github.com/fsouza/go-dockerclient"
)
const (
stop = "stop"
start = "start"
)
type dockerMapper struct {
sync.RWMutex
containers map[int]*docker.Container
images map[string]*docker.APIImages
procRoot string
containers map[string]*docker.Container
containersByPID map[int]*docker.Container
images map[string]*docker.APIImages
procRoot string
pidTree *pidTree
interval time.Duration
}
func newDockerMapper(procRoot string, interval time.Duration) *dockerMapper {
m := dockerMapper{
procRoot: procRoot,
containers: map[int]*docker.Container{},
func newDockerMapper(procRoot string, interval time.Duration) (*dockerMapper, error) {
pidTree, err := newPIDTreeStub(procRoot)
if err != nil {
return nil, err
}
m.update()
go m.loop(interval)
return &m
m := dockerMapper{
containers: map[string]*docker.Container{},
containersByPID: map[int]*docker.Container{},
images: map[string]*docker.APIImages{},
procRoot: procRoot,
pidTree: pidTree,
interval: interval,
}
go m.loop()
return &m, nil
}
func (m *dockerMapper) loop(d time.Duration) {
for range time.Tick(d) {
func (m *dockerMapper) loop() {
m.update()
for range time.Tick(m.interval) {
m.update()
}
}
@@ -38,6 +61,8 @@ type dockerClient interface {
ListContainers(docker.ListContainersOptions) ([]docker.APIContainers, error)
InspectContainer(string) (*docker.Container, error)
ListImages(docker.ListImagesOptions) ([]docker.APIImages, error)
AddEventListener(chan<- *docker.APIEvents) error
RemoveEventListener(chan *docker.APIEvents) error
}
func newRealDockerClient(endpoint string) (dockerClient, error) {
@@ -50,12 +75,6 @@ var (
)
func (m *dockerMapper) update() {
pidTree, err := newPIDTreeStub(m.procRoot)
if err != nil {
log.Printf("docker mapper: %s", err)
return
}
endpoint := "unix:///var/run/docker.sock"
client, err := newDockerClient(endpoint)
if err != nil {
@@ -63,50 +82,137 @@ func (m *dockerMapper) update() {
return
}
containers, err := client.ListContainers(docker.ListContainersOptions{All: true})
if err != nil {
events := make(chan *docker.APIEvents)
if err := client.AddEventListener(events); err != nil {
log.Printf("docker mapper: %s", err)
return
}
defer func() {
if err := client.RemoveEventListener(events); err != nil {
log.Printf("docker mapper: %s", err)
}
}()
if err := m.updateContainers(client); err != nil {
log.Printf("docker mapper: %s", err)
return
}
pmap := map[int]*docker.Container{}
for _, container := range containers {
info, err := client.InspectContainer(container.ID)
if err := m.updateImages(client); err != nil {
log.Printf("docker mapper: %s", err)
return
}
otherUpdates := time.Tick(m.interval)
for {
select {
case event := <-events:
m.handleEvent(event, client)
case <-otherUpdates:
if err := m.updatePIDTree(); err != nil {
log.Printf("docker mapper: %s", err)
continue
}
if err := m.updateImages(client); err != nil {
log.Printf("docker mapper: %s", err)
continue
}
}
}
}
func (m *dockerMapper) updateContainers(client dockerClient) error {
apiContainers, err := client.ListContainers(docker.ListContainersOptions{All: true})
if err != nil {
return err
}
containers := []*docker.Container{}
for _, apiContainer := range apiContainers {
container, err := client.InspectContainer(apiContainer.ID)
if err != nil {
log.Printf("docker mapper: %s", err)
continue
}
if !info.State.Running {
if !container.State.Running {
continue
}
pids, err := pidTree.allChildren(info.State.Pid)
if err != nil {
log.Printf("docker mapper: %s", err)
continue
}
for _, pid := range pids {
pmap[pid] = info
}
}
imageList, err := client.ListImages(docker.ListImagesOptions{})
if err != nil {
log.Printf("docker mapper: %s", err)
return
}
imageMap := map[string]*docker.APIImages{}
for i := range imageList {
image := &imageList[i]
imageMap[image.ID] = image
containers = append(containers, container)
}
m.Lock()
m.containers = pmap
m.images = imageMap
for _, container := range containers {
m.containers[container.ID] = container
m.containersByPID[container.State.Pid] = container
}
m.Unlock()
return nil
}
func (m *dockerMapper) updateImages(client dockerClient) error {
images, err := client.ListImages(docker.ListImagesOptions{})
if err != nil {
return err
}
m.Lock()
for i := range images {
image := &images[i]
m.images[image.ID] = image
}
m.Unlock()
return nil
}
func (m *dockerMapper) handleEvent(event *docker.APIEvents, client dockerClient) {
switch event.Status {
case stop:
containerID := event.ID
m.Lock()
if container, ok := m.containers[containerID]; ok {
delete(m.containers, containerID)
delete(m.containersByPID, container.State.Pid)
} else {
log.Printf("docker mapper: container %s not found", containerID)
}
m.Unlock()
case start:
containerID := event.ID
container, err := client.InspectContainer(containerID)
if err != nil {
log.Printf("docker mapper: %s", err)
return
}
if !container.State.Running {
log.Printf("docker mapper: container %s not running", containerID)
return
}
m.Lock()
m.containers[containerID] = container
m.containersByPID[container.State.Pid] = container
m.Unlock()
}
}
func (m *dockerMapper) updatePIDTree() error {
pidTree, err := newPIDTreeStub(m.procRoot)
if err != nil {
return err
}
m.Lock()
m.pidTree = pidTree
m.Unlock()
return nil
}
type dockerProcessMapper struct {
@@ -117,11 +223,27 @@ type dockerProcessMapper struct {
func (m *dockerProcessMapper) Key() string { return m.key }
func (m *dockerProcessMapper) Map(pid uint) (string, error) {
var (
container *docker.Container
ok bool
err error
candidate = int(pid)
)
m.RLock()
container, ok := m.containers[int(pid)]
for {
container, ok = m.containersByPID[candidate]
if ok {
break
}
candidate, err = m.pidTree.getParent(candidate)
if err != nil {
break
}
}
m.RUnlock()
if !ok {
if err != nil {
return "", fmt.Errorf("no container found for PID %d", pid)
}

View File

@@ -1,6 +1,7 @@
package main
import (
"runtime"
"testing"
"time"
@@ -25,6 +26,14 @@ func (m mockDockerClient) ListImages(options docker.ListImagesOptions) ([]docker
return m.apiImages, nil
}
func (m mockDockerClient) AddEventListener(events chan<- *docker.APIEvents) error {
return nil
}
func (m mockDockerClient) RemoveEventListener(events chan *docker.APIEvents) error {
return nil
}
func TestDockerProcessMapper(t *testing.T) {
oldPIDTreeStub, oldDockerClientStub := newPIDTreeStub, newDockerClient
defer func() {
@@ -59,12 +68,14 @@ func TestDockerProcessMapper(t *testing.T) {
}, nil
}
dockerMapper := newDockerMapper("/proc", 10*time.Second)
dockerMapper, _ := newDockerMapper("/proc", 10*time.Second)
dockerIDMapper := dockerMapper.idMapper()
dockerNameMapper := dockerMapper.nameMapper()
dockerImageIDMapper := dockerMapper.imageIDMapper()
dockerImageNameMapper := dockerMapper.imageNameMapper()
runtime.Gosched()
for pid, want := range map[uint]struct{ id, name, imageID, imageName string }{
1: {"foo", "bar", "baz", "tag"},
2: {"foo", "bar", "baz", "tag"},

View File

@@ -78,7 +78,11 @@ func main() {
}
if *dockerMapper {
docker := newDockerMapper(*procRoot, *dockerInterval)
docker, err := newDockerMapper(*procRoot, *dockerInterval)
if err != nil {
log.Fatal(err)
}
pms = append(pms,
docker.idMapper(),
docker.nameMapper(),

View File

@@ -64,6 +64,15 @@ func newPIDTree(procRoot string) (*pidTree, error) {
return &pt, nil
}
func (pt *pidTree) getParent(pid int) (int, error) {
proc, ok := pt.processes[pid]
if !ok {
return -1, fmt.Errorf("PID %d not found", pid)
}
return proc.ppid, nil
}
// allChildren returns a flattened list of child pids including the given pid
func (pt *pidTree) allChildren(pid int) ([]int, error) {
proc, ok := pt.processes[pid]