mirror of
https://github.com/weaveworks/scope.git
synced 2026-03-04 02:30:45 +00:00
Use Docker events API instead of polling Docker daemon.
Unfortunately we still have to poll pidtree and docker images, but I think we can get rid of the docker image polling as we only care about images on running containers.
This commit is contained in:
@@ -10,25 +10,48 @@ import (
|
||||
docker "github.com/fsouza/go-dockerclient"
|
||||
)
|
||||
|
||||
const (
|
||||
stop = "stop"
|
||||
start = "start"
|
||||
)
|
||||
|
||||
type dockerMapper struct {
|
||||
sync.RWMutex
|
||||
containers map[int]*docker.Container
|
||||
images map[string]*docker.APIImages
|
||||
procRoot string
|
||||
|
||||
containers map[string]*docker.Container
|
||||
containersByPID map[int]*docker.Container
|
||||
images map[string]*docker.APIImages
|
||||
|
||||
procRoot string
|
||||
pidTree *pidTree
|
||||
|
||||
interval time.Duration
|
||||
}
|
||||
|
||||
func newDockerMapper(procRoot string, interval time.Duration) *dockerMapper {
|
||||
m := dockerMapper{
|
||||
procRoot: procRoot,
|
||||
containers: map[int]*docker.Container{},
|
||||
func newDockerMapper(procRoot string, interval time.Duration) (*dockerMapper, error) {
|
||||
pidTree, err := newPIDTreeStub(procRoot)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
m.update()
|
||||
go m.loop(interval)
|
||||
return &m
|
||||
|
||||
m := dockerMapper{
|
||||
containers: map[string]*docker.Container{},
|
||||
containersByPID: map[int]*docker.Container{},
|
||||
images: map[string]*docker.APIImages{},
|
||||
|
||||
procRoot: procRoot,
|
||||
pidTree: pidTree,
|
||||
|
||||
interval: interval,
|
||||
}
|
||||
|
||||
go m.loop()
|
||||
return &m, nil
|
||||
}
|
||||
|
||||
func (m *dockerMapper) loop(d time.Duration) {
|
||||
for range time.Tick(d) {
|
||||
func (m *dockerMapper) loop() {
|
||||
m.update()
|
||||
for range time.Tick(m.interval) {
|
||||
m.update()
|
||||
}
|
||||
}
|
||||
@@ -38,6 +61,8 @@ type dockerClient interface {
|
||||
ListContainers(docker.ListContainersOptions) ([]docker.APIContainers, error)
|
||||
InspectContainer(string) (*docker.Container, error)
|
||||
ListImages(docker.ListImagesOptions) ([]docker.APIImages, error)
|
||||
AddEventListener(chan<- *docker.APIEvents) error
|
||||
RemoveEventListener(chan *docker.APIEvents) error
|
||||
}
|
||||
|
||||
func newRealDockerClient(endpoint string) (dockerClient, error) {
|
||||
@@ -50,12 +75,6 @@ var (
|
||||
)
|
||||
|
||||
func (m *dockerMapper) update() {
|
||||
pidTree, err := newPIDTreeStub(m.procRoot)
|
||||
if err != nil {
|
||||
log.Printf("docker mapper: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
endpoint := "unix:///var/run/docker.sock"
|
||||
client, err := newDockerClient(endpoint)
|
||||
if err != nil {
|
||||
@@ -63,50 +82,137 @@ func (m *dockerMapper) update() {
|
||||
return
|
||||
}
|
||||
|
||||
containers, err := client.ListContainers(docker.ListContainersOptions{All: true})
|
||||
if err != nil {
|
||||
events := make(chan *docker.APIEvents)
|
||||
if err := client.AddEventListener(events); err != nil {
|
||||
log.Printf("docker mapper: %s", err)
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
if err := client.RemoveEventListener(events); err != nil {
|
||||
log.Printf("docker mapper: %s", err)
|
||||
}
|
||||
}()
|
||||
|
||||
if err := m.updateContainers(client); err != nil {
|
||||
log.Printf("docker mapper: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
pmap := map[int]*docker.Container{}
|
||||
for _, container := range containers {
|
||||
info, err := client.InspectContainer(container.ID)
|
||||
if err := m.updateImages(client); err != nil {
|
||||
log.Printf("docker mapper: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
otherUpdates := time.Tick(m.interval)
|
||||
for {
|
||||
select {
|
||||
case event := <-events:
|
||||
m.handleEvent(event, client)
|
||||
|
||||
case <-otherUpdates:
|
||||
if err := m.updatePIDTree(); err != nil {
|
||||
log.Printf("docker mapper: %s", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if err := m.updateImages(client); err != nil {
|
||||
log.Printf("docker mapper: %s", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *dockerMapper) updateContainers(client dockerClient) error {
|
||||
apiContainers, err := client.ListContainers(docker.ListContainersOptions{All: true})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
containers := []*docker.Container{}
|
||||
for _, apiContainer := range apiContainers {
|
||||
container, err := client.InspectContainer(apiContainer.ID)
|
||||
if err != nil {
|
||||
log.Printf("docker mapper: %s", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if !info.State.Running {
|
||||
if !container.State.Running {
|
||||
continue
|
||||
}
|
||||
|
||||
pids, err := pidTree.allChildren(info.State.Pid)
|
||||
if err != nil {
|
||||
log.Printf("docker mapper: %s", err)
|
||||
continue
|
||||
}
|
||||
for _, pid := range pids {
|
||||
pmap[pid] = info
|
||||
}
|
||||
}
|
||||
|
||||
imageList, err := client.ListImages(docker.ListImagesOptions{})
|
||||
if err != nil {
|
||||
log.Printf("docker mapper: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
imageMap := map[string]*docker.APIImages{}
|
||||
for i := range imageList {
|
||||
image := &imageList[i]
|
||||
imageMap[image.ID] = image
|
||||
containers = append(containers, container)
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
m.containers = pmap
|
||||
m.images = imageMap
|
||||
for _, container := range containers {
|
||||
m.containers[container.ID] = container
|
||||
m.containersByPID[container.State.Pid] = container
|
||||
}
|
||||
m.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *dockerMapper) updateImages(client dockerClient) error {
|
||||
images, err := client.ListImages(docker.ListImagesOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
for i := range images {
|
||||
image := &images[i]
|
||||
m.images[image.ID] = image
|
||||
}
|
||||
m.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *dockerMapper) handleEvent(event *docker.APIEvents, client dockerClient) {
|
||||
switch event.Status {
|
||||
case stop:
|
||||
containerID := event.ID
|
||||
m.Lock()
|
||||
if container, ok := m.containers[containerID]; ok {
|
||||
delete(m.containers, containerID)
|
||||
delete(m.containersByPID, container.State.Pid)
|
||||
} else {
|
||||
log.Printf("docker mapper: container %s not found", containerID)
|
||||
}
|
||||
m.Unlock()
|
||||
|
||||
case start:
|
||||
containerID := event.ID
|
||||
container, err := client.InspectContainer(containerID)
|
||||
if err != nil {
|
||||
log.Printf("docker mapper: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
if !container.State.Running {
|
||||
log.Printf("docker mapper: container %s not running", containerID)
|
||||
return
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
m.containers[containerID] = container
|
||||
m.containersByPID[container.State.Pid] = container
|
||||
m.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (m *dockerMapper) updatePIDTree() error {
|
||||
pidTree, err := newPIDTreeStub(m.procRoot)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
m.pidTree = pidTree
|
||||
m.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
type dockerProcessMapper struct {
|
||||
@@ -117,11 +223,27 @@ type dockerProcessMapper struct {
|
||||
|
||||
func (m *dockerProcessMapper) Key() string { return m.key }
|
||||
func (m *dockerProcessMapper) Map(pid uint) (string, error) {
|
||||
var (
|
||||
container *docker.Container
|
||||
ok bool
|
||||
err error
|
||||
candidate = int(pid)
|
||||
)
|
||||
|
||||
m.RLock()
|
||||
container, ok := m.containers[int(pid)]
|
||||
for {
|
||||
container, ok = m.containersByPID[candidate]
|
||||
if ok {
|
||||
break
|
||||
}
|
||||
candidate, err = m.pidTree.getParent(candidate)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
m.RUnlock()
|
||||
|
||||
if !ok {
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("no container found for PID %d", pid)
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -25,6 +26,14 @@ func (m mockDockerClient) ListImages(options docker.ListImagesOptions) ([]docker
|
||||
return m.apiImages, nil
|
||||
}
|
||||
|
||||
func (m mockDockerClient) AddEventListener(events chan<- *docker.APIEvents) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m mockDockerClient) RemoveEventListener(events chan *docker.APIEvents) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestDockerProcessMapper(t *testing.T) {
|
||||
oldPIDTreeStub, oldDockerClientStub := newPIDTreeStub, newDockerClient
|
||||
defer func() {
|
||||
@@ -59,12 +68,14 @@ func TestDockerProcessMapper(t *testing.T) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
dockerMapper := newDockerMapper("/proc", 10*time.Second)
|
||||
dockerMapper, _ := newDockerMapper("/proc", 10*time.Second)
|
||||
dockerIDMapper := dockerMapper.idMapper()
|
||||
dockerNameMapper := dockerMapper.nameMapper()
|
||||
dockerImageIDMapper := dockerMapper.imageIDMapper()
|
||||
dockerImageNameMapper := dockerMapper.imageNameMapper()
|
||||
|
||||
runtime.Gosched()
|
||||
|
||||
for pid, want := range map[uint]struct{ id, name, imageID, imageName string }{
|
||||
1: {"foo", "bar", "baz", "tag"},
|
||||
2: {"foo", "bar", "baz", "tag"},
|
||||
|
||||
@@ -78,7 +78,11 @@ func main() {
|
||||
}
|
||||
|
||||
if *dockerMapper {
|
||||
docker := newDockerMapper(*procRoot, *dockerInterval)
|
||||
docker, err := newDockerMapper(*procRoot, *dockerInterval)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
pms = append(pms,
|
||||
docker.idMapper(),
|
||||
docker.nameMapper(),
|
||||
|
||||
@@ -64,6 +64,15 @@ func newPIDTree(procRoot string) (*pidTree, error) {
|
||||
return &pt, nil
|
||||
}
|
||||
|
||||
func (pt *pidTree) getParent(pid int) (int, error) {
|
||||
proc, ok := pt.processes[pid]
|
||||
if !ok {
|
||||
return -1, fmt.Errorf("PID %d not found", pid)
|
||||
}
|
||||
|
||||
return proc.ppid, nil
|
||||
}
|
||||
|
||||
// allChildren returns a flattened list of child pids including the given pid
|
||||
func (pt *pidTree) allChildren(pid int) ([]int, error) {
|
||||
proc, ok := pt.processes[pid]
|
||||
|
||||
Reference in New Issue
Block a user