Merge pull request #178 from weaveworks/taggers-over-mappers

Refactor process mappers into taggers.
This commit is contained in:
Peter Bourgon
2015-06-08 15:11:18 +02:00
21 changed files with 376 additions and 522 deletions

View File

@@ -23,7 +23,7 @@ $(SCOPE_EXPORT): $(APP_EXE) $(PROBE_EXE) docker/*
$(APP_EXE): app/*.go report/*.go xfer/*.go
$(PROBE_EXE): probe/*.go report/*.go xfer/*.go
$(PROBE_EXE): probe/*.go probe/tag/*.go report/*.go xfer/*.go
$(APP_EXE) $(PROBE_EXE):
go get -tags netgo ./$(@D)

View File

@@ -30,17 +30,6 @@ type APIEdge struct {
Metadata report.AggregateMetadata `json:"metadata"`
}
// topologySelecter selects a single topology from a report.
type topologySelecter func(r report.Report) report.Topology
func selectProcess(r report.Report) report.Topology {
return r.Process
}
func selectNetwork(r report.Report) report.Topology {
return r.Network
}
// Full topology.
func handleTopology(rep Reporter, t topologyView, w http.ResponseWriter, r *http.Request) {
respondWith(w, http.StatusOK, APITopology{

View File

@@ -34,8 +34,8 @@ func originNodeForProcess(node report.NodeMetadata) OriginNode {
{Key: "Process name", ValueMajor: node["name"], ValueMinor: ""},
}
for _, tuple := range []struct{ key, human string }{
{"docker_id", "Container ID"},
{"docker_name", "Container name"},
{"docker_container_id", "Container ID"},
{"docker_container_name", "Container name"},
{"docker_image_id", "Container image ID"},
{"docker_image_name", "Container image name"},
{"cgroup", "cgroup"},

View File

@@ -47,16 +47,16 @@ func apiHandler(w http.ResponseWriter, r *http.Request) {
type topologyView struct {
human string
selector topologySelecter
selector report.TopologySelector
mapper report.MapFunc
pseudo report.PseudoFunc
groupedTopology string
}
var topologyRegistry = map[string]topologyView{
"applications": {"Applications", selectProcess, report.ProcessPID, report.GenericPseudoNode, "applications-grouped"},
"applications-grouped": {"Applications", selectProcess, report.ProcessName, report.GenericGroupedPseudoNode, ""},
"containers": {"Containers", selectProcess, report.ProcessContainer, report.InternetOnlyPseudoNode, "containers-grouped"},
"containers-grouped": {"Containers", selectProcess, report.ProcessContainerImage, report.InternetOnlyPseudoNode, ""},
"hosts": {"Hosts", selectNetwork, report.NetworkHostname, report.GenericPseudoNode, ""},
"applications": {"Applications", report.SelectProcess, report.ProcessPID, report.GenericPseudoNode, "applications-grouped"},
"applications-grouped": {"Applications", report.SelectProcess, report.ProcessName, report.GenericGroupedPseudoNode, ""},
"containers": {"Containers", report.SelectProcess, report.ProcessContainer, report.InternetOnlyPseudoNode, "containers-grouped"},
"containers-grouped": {"Containers", report.SelectProcess, report.ProcessContainerImage, report.InternetOnlyPseudoNode, ""},
"hosts": {"Hosts", report.SelectNetwork, report.NetworkHostname, report.GenericPseudoNode, ""},
}

View File

@@ -1,100 +0,0 @@
package main
import (
"runtime"
"testing"
"time"
docker "github.com/fsouza/go-dockerclient"
)
type mockDockerClient struct {
apiContainers []docker.APIContainers
containers map[string]*docker.Container
apiImages []docker.APIImages
}
func (m mockDockerClient) ListContainers(options docker.ListContainersOptions) ([]docker.APIContainers, error) {
return m.apiContainers, nil
}
func (m mockDockerClient) InspectContainer(id string) (*docker.Container, error) {
return m.containers[id], nil
}
func (m mockDockerClient) ListImages(options docker.ListImagesOptions) ([]docker.APIImages, error) {
return m.apiImages, nil
}
func (m mockDockerClient) AddEventListener(events chan<- *docker.APIEvents) error {
return nil
}
func (m mockDockerClient) RemoveEventListener(events chan *docker.APIEvents) error {
return nil
}
func TestDockerProcessMapper(t *testing.T) {
oldPIDTreeStub, oldDockerClientStub := newPIDTreeStub, newDockerClient
defer func() {
newPIDTreeStub = oldPIDTreeStub
newDockerClient = oldDockerClientStub
}()
newPIDTreeStub = func(procRoot string) (*pidTree, error) {
pid1 := &process{pid: 1}
pid2 := &process{pid: 2, ppid: 1, parent: pid1}
pid1.children = []*process{pid2}
return &pidTree{
processes: map[int]*process{
1: pid1, 2: pid2,
},
}, nil
}
newDockerClient = func(endpoint string) (dockerClient, error) {
return mockDockerClient{
apiContainers: []docker.APIContainers{{ID: "foo"}},
containers: map[string]*docker.Container{
"foo": {
ID: "foo",
Name: "bar",
Image: "baz",
State: docker.State{Pid: 1, Running: true},
},
},
apiImages: []docker.APIImages{{ID: "baz", RepoTags: []string{"tag"}}},
}, nil
}
dockerMapper, _ := newDockerMapper("/proc", 10*time.Second)
dockerIDMapper := dockerMapper.idMapper()
dockerNameMapper := dockerMapper.nameMapper()
dockerImageIDMapper := dockerMapper.imageIDMapper()
dockerImageNameMapper := dockerMapper.imageNameMapper()
runtime.Gosched()
for pid, want := range map[uint]struct{ id, name, imageID, imageName string }{
1: {"foo", "bar", "baz", "tag"},
2: {"foo", "bar", "baz", "tag"},
} {
haveID, err := dockerIDMapper.Map(pid)
if err != nil || want.id != haveID {
t.Errorf("%d: want %q, have %q (%v)", pid, want.id, haveID, err)
}
haveName, err := dockerNameMapper.Map(pid)
if err != nil || want.name != haveName {
t.Errorf("%d: want %q, have %q (%v)", pid, want.name, haveName, err)
}
haveImageID, err := dockerImageIDMapper.Map(pid)
if err != nil || want.imageID != haveImageID {
t.Errorf("%d: want %q, have %q (%v)", pid, want.imageID, haveImageID, err)
}
haveImageName, err := dockerImageNameMapper.Map(pid)
if err != nil || want.imageName != haveImageName {
t.Errorf("%d: want %q, have %q (%v)", pid, want.imageName, haveImageName, err)
}
}
}

View File

@@ -5,7 +5,6 @@ import (
"log"
"net"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"runtime"
@@ -14,12 +13,12 @@ import (
"time"
"github.com/weaveworks/procspy"
"github.com/weaveworks/scope/probe/tag"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/xfer"
)
// Set during buildtime.
var version = "unknown"
var version = "dev" // set at build time
func main() {
var (
@@ -29,9 +28,7 @@ func main() {
listen = flag.String("listen", ":"+strconv.Itoa(xfer.ProbePort), "listen address")
prometheusEndpoint = flag.String("prometheus.endpoint", "/metrics", "Prometheus metrics exposition endpoint (requires -http.listen)")
spyProcs = flag.Bool("processes", true, "report processes (needs root)")
cgroupsRoot = flag.String("cgroups.root", "", "if provided, enrich -processes with cgroup names from this root (e.g. /mnt/cgroups)")
cgroupsInterval = flag.Duration("cgroups.interval", 10*time.Second, "how often to update cgroup names")
dockerMapper = flag.Bool("docker", true, "collect Docker-related attributes for processes")
dockerTagger = flag.Bool("docker", true, "collect Docker-related attributes for processes")
dockerInterval = flag.Duration("docker.interval", 10*time.Second, "how often to update Docker attributes")
procRoot = flag.String("proc.root", "/proc", "location of the proc filesystem")
)
@@ -42,7 +39,7 @@ func main() {
os.Exit(1)
}
log.Printf("probe starting, version %s", version)
log.Printf("probe version %s", version)
procspy.SetProcRoot(*procRoot)
@@ -66,32 +63,14 @@ func main() {
}
defer publisher.Close()
pms := []processMapper{identityMapper{}}
if *cgroupsRoot != "" {
if fi, err := os.Stat(*cgroupsRoot); err == nil && fi.IsDir() {
log.Printf("enriching -processes with cgroup names from %s", *cgroupsRoot)
cgroupMapper := newCgroupMapper(*cgroupsRoot, *cgroupsInterval)
defer cgroupMapper.Stop()
pms = append(pms, cgroupMapper)
} else {
log.Printf("-cgroups.root=%s: %v", *cgroupsRoot, err)
}
}
if *dockerMapper && runtime.GOOS == "Linux" {
docker, err := newDockerMapper(*procRoot, *dockerInterval)
taggers := []tag.Tagger{tag.NewTopologyTagger()}
if *dockerTagger {
t, err := tag.NewDockerTagger(*procRoot, *dockerInterval)
if err != nil {
log.Fatal(err)
log.Fatalf("failed to start docker tagger: %v", err)
}
defer docker.Stop()
pms = append(pms,
docker.idMapper(),
docker.nameMapper(),
docker.imageIDMapper(),
docker.imageNameMapper(),
)
defer t.Stop()
taggers = append(taggers, t)
}
log.Printf("listening on %s", *listen)
@@ -116,7 +95,8 @@ func main() {
r = report.MakeReport()
case <-spyTick:
r.Merge(spy(hostname, hostname, *spyProcs, pms))
r.Merge(spy(hostname, hostname, *spyProcs))
r = tag.Apply(r, taggers)
// log.Printf("merged report:\n%#v\n", r)
case <-quit:

View File

@@ -1,143 +0,0 @@
package main
import (
"bufio"
"fmt"
"log"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync"
"time"
)
type processMapper interface {
Key() string
Map(pid uint) (string, error)
}
type identityMapper struct{}
func (m identityMapper) Key() string { return "identity" }
func (m identityMapper) Map(pid uint) (string, error) { return strconv.FormatUint(uint64(pid), 10), nil }
// cgroupMapper is a cgroup task mapper.
type cgroupMapper struct {
sync.RWMutex
root string
d map[uint]string
quit chan struct{}
}
func newCgroupMapper(root string, interval time.Duration) *cgroupMapper {
m := cgroupMapper{
root: root,
d: map[uint]string{},
quit: make(chan struct{}),
}
m.update()
go m.loop(interval)
return &m
}
func (m *cgroupMapper) Stop() {
close(m.quit)
}
func (m *cgroupMapper) Key() string { return "cgroup" }
// Map uses the cache to find the process name for pid. It is safe for
// concurrent use.
func (m *cgroupMapper) Map(pid uint) (string, error) {
m.RLock()
p, ok := m.d[pid]
m.RUnlock()
if !ok {
return "", fmt.Errorf("no cgroup for PID %d", pid)
}
return p, nil
}
func (m *cgroupMapper) loop(d time.Duration) {
ticker := time.Tick(d)
for {
select {
case <-ticker:
m.update()
case <-m.quit:
return
}
}
}
func (m *cgroupMapper) update() {
// We want to read "<root>/<processname>/tasks" files.
fh, err := os.Open(m.root)
if err != nil {
log.Printf("cgroup mapper: %s", err)
return
}
dirNames, err := fh.Readdirnames(-1)
fh.Close()
if err != nil {
log.Printf("cgroup mapper: %s", err)
return
}
pmap := map[uint]string{}
for _, d := range dirNames {
cg := normalizeCgroup(d)
dirFilename := filepath.Join(m.root, d)
s, err := os.Stat(dirFilename)
if err != nil || !s.IsDir() {
continue
}
taskFilename := filepath.Join(dirFilename, "tasks")
f, err := os.Open(taskFilename)
if err != nil {
continue
}
r := bufio.NewReader(f)
for {
line, _, err := r.ReadLine()
if err != nil {
break // we expect an EOF
}
pid, err := strconv.ParseUint(string(line), 10, 64)
if err != nil {
log.Printf("continue mapper: %s", err)
continue
}
pmap[uint(pid)] = cg
}
f.Close()
}
m.Lock()
m.d = pmap
m.Unlock()
}
var lxcRe = regexp.MustCompile(`^([^-]+)-([^-]+)-([A-Fa-f0-9]+)-([0-9]+)$`)
func normalizeCgroup(s string) string {
// Format is currently "primarykey-secondarykey-revision-instance". We
// want to collapse all instances (and maybe all revisions, in the future)
// to the same node. So we remove the instance.
if m := lxcRe.FindStringSubmatch(s); len(m) > 0 {
return strings.Join([]string{m[1], m[2], m[3]}, "-")
}
return s
}

View File

@@ -1,63 +0,0 @@
package main
import (
"io/ioutil"
"os"
"path"
"path/filepath"
"testing"
"time"
)
func TestCgroupMapper(t *testing.T) {
tmp := setupTmpFS(t, map[string]string{
"/systemd/tasks": "1\n2\n4911\n1000\n25156\n",
"/systemd/notify_on_release": "0\n",
"/netscape/tasks": "666\n4242\n",
"/netscape/notify_on_release": "0\n",
"/weirdfile": "",
})
defer removeAll(t, tmp)
m := newCgroupMapper(tmp, 1*time.Second)
for pid, want := range map[uint]string{
111: "",
999: "",
4911: "systemd",
1: "systemd", // first one in the file
25156: "systemd", // last one in the tasks file
4242: "netscape",
} {
if have, _ := m.Map(pid); want != have {
t.Errorf("%d: want %q, have %q", pid, want, have)
}
}
}
func setupTmpFS(t *testing.T, fs map[string]string) string {
tmp, err := ioutil.TempDir(os.TempDir(), "scope-probe-test-cgroup-mapper")
if err != nil {
t.Fatal(err)
}
//t.Logf("using TempDir %s", tmp)
for file, content := range fs {
dir := path.Dir(file)
if err := os.MkdirAll(filepath.Join(tmp, dir), 0777); err != nil {
removeAll(t, tmp)
t.Fatalf("MkdirAll: %v", err)
}
if err := ioutil.WriteFile(filepath.Join(tmp, file), []byte(content), 0655); err != nil {
removeAll(t, tmp)
t.Fatalf("WriteFile: %v", err)
}
}
return tmp
}
func removeAll(t *testing.T, path string) {
if err := os.RemoveAll(path); err != nil {
t.Error(err)
}
}

View File

@@ -18,7 +18,6 @@ import (
func spy(
hostID, hostName string,
includeProcesses bool,
pms []processMapper,
) report.Report {
defer func(begin time.Time) {
spyDuration.WithLabelValues().Observe(float64(time.Since(begin)))
@@ -33,7 +32,7 @@ func spy(
}
for conn := conns.Next(); conn != nil; conn = conns.Next() {
addConnection(&r, conn, hostID, hostName, pms)
addConnection(&r, conn, hostID, hostName)
}
return r
@@ -43,7 +42,6 @@ func addConnection(
r *report.Report,
c *procspy.Connection,
hostID, hostName string,
pms []processMapper,
) {
var (
scopedLocal = scopedIP(hostID, c.LocalAddress)
@@ -84,14 +82,6 @@ func addConnection(
"domain": hostID,
}
for _, pm := range pms {
v, err := pm.Map(c.PID)
if err != nil {
continue
}
md[pm.Key()] = v
}
r.Process.NodeMetadatas[scopedLocal] = md
}
// Count the TCP connection.

View File

@@ -87,7 +87,7 @@ func TestSpyNetwork(t *testing.T) {
nodeName = "frenchs-since-1904"
)
r := spy(nodeID, nodeName, false, []processMapper{})
r := spy(nodeID, nodeName, false)
//buf, _ := json.MarshalIndent(r, "", " ")
//t.Logf("\n%s\n", buf)
@@ -123,7 +123,7 @@ func TestSpyProcess(t *testing.T) {
nodeName = "fishermans-friend"
)
r := spy(nodeID, nodeName, true, []processMapper{})
r := spy(nodeID, nodeName, true)
// buf, _ := json.MarshalIndent(r, "", " ") ; t.Logf("\n%s\n", buf)
var (
@@ -150,26 +150,3 @@ func TestSpyProcess(t *testing.T) {
}
}
}
func TestSpyProcessDataSource(t *testing.T) {
procspy.SetFixtures(fixConnectionsWithProcesses)
const (
nodeID = "chianti"
nodeName = "harmonisch"
)
m := identityMapper{}
r := spy(nodeID, nodeName, true, []processMapper{m})
scopedLocal := scopedIPPort(nodeID, fixLocalAddress, fixLocalPort)
k := m.Key()
v, err := m.Map(fixProcessPID)
if err != nil {
t.Fatal(err)
}
if want, have := v, r.Process.NodeMetadatas[scopedLocal][k]; want != have {
t.Fatalf("%s: want %q, have %q", k, want, have)
}
}

View File

@@ -1,13 +1,14 @@
package main
package tag
import (
"fmt"
"log"
"strconv"
"strings"
"sync"
"time"
docker "github.com/fsouza/go-dockerclient"
"github.com/weaveworks/scope/report"
)
const (
@@ -15,7 +16,14 @@ const (
start = "start"
)
type dockerMapper struct {
var (
newDockerClientStub = newDockerClient
newPIDTreeStub = newPIDTree
)
// DockerTagger is a tagger that tags Docker container information to process
// nodes that have a PID.
type DockerTagger struct {
sync.RWMutex
quit chan struct{}
interval time.Duration
@@ -28,13 +36,14 @@ type dockerMapper struct {
pidTree *pidTree
}
func newDockerMapper(procRoot string, interval time.Duration) (*dockerMapper, error) {
// NewDockerTagger returns a usable DockerTagger. Don't forget to Stop it.
func NewDockerTagger(procRoot string, interval time.Duration) (*DockerTagger, error) {
pidTree, err := newPIDTreeStub(procRoot)
if err != nil {
return nil, err
}
m := dockerMapper{
t := DockerTagger{
containers: map[string]*docker.Container{},
containersByPID: map[int]*docker.Container{},
images: map[string]*docker.APIImages{},
@@ -46,34 +55,35 @@ func newDockerMapper(procRoot string, interval time.Duration) (*dockerMapper, er
quit: make(chan struct{}),
}
go m.loop()
return &m, nil
go t.loop()
return &t, nil
}
func (m *dockerMapper) Stop() {
close(m.quit)
// Stop stops the Docker tagger's event subscriber.
func (t *DockerTagger) Stop() {
close(t.quit)
}
func (m *dockerMapper) loop() {
if !m.update() {
func (t *DockerTagger) loop() {
if !t.update() {
return
}
ticker := time.Tick(m.interval)
ticker := time.Tick(t.interval)
for {
select {
case <-ticker:
if !m.update() {
if !t.update() {
return
}
case <-m.quit:
case <-t.quit:
return
}
}
}
// for mocking
// Sub-interface for mocking.
type dockerClient interface {
ListContainers(docker.ListContainersOptions) ([]docker.APIContainers, error)
InspectContainer(string) (*docker.Container, error)
@@ -82,19 +92,13 @@ type dockerClient interface {
RemoveEventListener(chan *docker.APIEvents) error
}
func newRealDockerClient(endpoint string) (dockerClient, error) {
func newDockerClient(endpoint string) (dockerClient, error) {
return docker.NewClient(endpoint)
}
var (
newDockerClient = newRealDockerClient
newPIDTreeStub = newPIDTree
)
// returns false when stopping.
func (m *dockerMapper) update() bool {
func (t *DockerTagger) update() bool {
endpoint := "unix:///var/run/docker.sock"
client, err := newDockerClient(endpoint)
client, err := newDockerClientStub(endpoint)
if err != nil {
log.Printf("docker mapper: %s", err)
return true
@@ -111,40 +115,40 @@ func (m *dockerMapper) update() bool {
}
}()
if err := m.updateContainers(client); err != nil {
if err := t.updateContainers(client); err != nil {
log.Printf("docker mapper: %s", err)
return true
}
if err := m.updateImages(client); err != nil {
if err := t.updateImages(client); err != nil {
log.Printf("docker mapper: %s", err)
return true
}
otherUpdates := time.Tick(m.interval)
otherUpdates := time.Tick(t.interval)
for {
select {
case event := <-events:
m.handleEvent(event, client)
t.handleEvent(event, client)
case <-otherUpdates:
if err := m.updatePIDTree(); err != nil {
if err := t.updatePIDTree(); err != nil {
log.Printf("docker mapper: %s", err)
continue
}
if err := m.updateImages(client); err != nil {
if err := t.updateImages(client); err != nil {
log.Printf("docker mapper: %s", err)
continue
}
case <-m.quit:
case <-t.quit:
return false
}
}
}
func (m *dockerMapper) updateContainers(client dockerClient) error {
func (t *DockerTagger) updateContainers(client dockerClient) error {
apiContainers, err := client.ListContainers(docker.ListContainersOptions{All: true})
if err != nil {
return err
@@ -165,44 +169,44 @@ func (m *dockerMapper) updateContainers(client dockerClient) error {
containers = append(containers, container)
}
m.Lock()
t.Lock()
for _, container := range containers {
m.containers[container.ID] = container
m.containersByPID[container.State.Pid] = container
t.containers[container.ID] = container
t.containersByPID[container.State.Pid] = container
}
m.Unlock()
t.Unlock()
return nil
}
func (m *dockerMapper) updateImages(client dockerClient) error {
func (t *DockerTagger) updateImages(client dockerClient) error {
images, err := client.ListImages(docker.ListImagesOptions{})
if err != nil {
return err
}
m.Lock()
t.Lock()
for i := range images {
image := &images[i]
m.images[image.ID] = image
t.images[image.ID] = image
}
m.Unlock()
t.Unlock()
return nil
}
func (m *dockerMapper) handleEvent(event *docker.APIEvents, client dockerClient) {
func (t *DockerTagger) handleEvent(event *docker.APIEvents, client dockerClient) {
switch event.Status {
case stop:
containerID := event.ID
m.Lock()
if container, ok := m.containers[containerID]; ok {
delete(m.containers, containerID)
delete(m.containersByPID, container.State.Pid)
t.Lock()
if container, ok := t.containers[containerID]; ok {
delete(t.containers, containerID)
delete(t.containersByPID, container.State.Pid)
} else {
log.Printf("docker mapper: container %s not found", containerID)
}
m.Unlock()
t.Unlock()
case start:
containerID := event.ID
@@ -217,88 +221,77 @@ func (m *dockerMapper) handleEvent(event *docker.APIEvents, client dockerClient)
return
}
m.Lock()
m.containers[containerID] = container
m.containersByPID[container.State.Pid] = container
m.Unlock()
t.Lock()
t.containers[containerID] = container
t.containersByPID[container.State.Pid] = container
t.Unlock()
}
}
func (m *dockerMapper) updatePIDTree() error {
pidTree, err := newPIDTreeStub(m.procRoot)
func (t *DockerTagger) updatePIDTree() error {
pidTree, err := newPIDTreeStub(t.procRoot)
if err != nil {
return err
}
m.Lock()
m.pidTree = pidTree
m.Unlock()
t.Lock()
t.pidTree = pidTree
t.Unlock()
return nil
}
type dockerProcessMapper struct {
*dockerMapper
key string
f func(*docker.Container) string
}
func (m *dockerProcessMapper) Key() string { return m.key }
func (m *dockerProcessMapper) Map(pid uint) (string, error) {
var (
container *docker.Container
ok bool
err error
candidate = int(pid)
)
m.RLock()
for {
container, ok = m.containersByPID[candidate]
if ok {
break
// Tag implements Tagger.
func (t *DockerTagger) Tag(r report.Report) report.Report {
for nodeID, nodeMetadata := range r.Process.NodeMetadatas {
pidStr, ok := nodeMetadata["pid"]
if !ok {
//log.Printf("dockerTagger: %q: no process node ID", id)
continue
}
candidate, err = m.pidTree.getParent(candidate)
pid, err := strconv.ParseUint(pidStr, 10, 64)
if err != nil {
break
}
}
m.RUnlock()
if err != nil {
return "", fmt.Errorf("no container found for PID %d", pid)
}
return m.f(container), nil
}
func (m *dockerMapper) idMapper() processMapper {
return &dockerProcessMapper{m, "docker_id", func(c *docker.Container) string {
return c.ID
}}
}
func (m *dockerMapper) nameMapper() processMapper {
return &dockerProcessMapper{m, "docker_name", func(c *docker.Container) string {
return strings.TrimPrefix(c.Name, "/")
}}
}
func (m *dockerMapper) imageIDMapper() processMapper {
return &dockerProcessMapper{m, "docker_image_id", func(c *docker.Container) string {
return c.Image
}}
}
func (m *dockerMapper) imageNameMapper() processMapper {
return &dockerProcessMapper{m, "docker_image_name", func(c *docker.Container) string {
m.RLock()
image, ok := m.images[c.Image]
m.RUnlock()
if !ok || len(image.RepoTags) == 0 {
return ""
//log.Printf("dockerTagger: %q: bad process node PID (%v)", id, err)
continue
}
return image.RepoTags[0]
}}
var (
container *docker.Container
candidate = int(pid)
)
t.RLock()
for {
container, ok = t.containersByPID[candidate]
if ok {
break
}
candidate, err = t.pidTree.getParent(candidate)
if err != nil {
break
}
}
t.RUnlock()
if !ok {
continue
}
md := report.NodeMetadata{
"docker_container_id": container.ID,
"docker_container_name": strings.TrimPrefix(container.Name, "/"),
"docker_image_id": container.Image,
}
t.RLock()
image, ok := t.images[container.Image]
t.RUnlock()
if ok && len(image.RepoTags) > 0 {
md["docker_image_name"] = image.RepoTags[0]
}
r.Process.NodeMetadatas[nodeID].Merge(md)
}
return r
}

View File

@@ -0,0 +1,94 @@
package tag
import (
"reflect"
"runtime"
"testing"
"time"
docker "github.com/fsouza/go-dockerclient"
"github.com/weaveworks/scope/report"
)
type mockDockerClient struct {
apiContainers []docker.APIContainers
containers map[string]*docker.Container
apiImages []docker.APIImages
}
func (m mockDockerClient) ListContainers(docker.ListContainersOptions) ([]docker.APIContainers, error) {
return m.apiContainers, nil
}
func (m mockDockerClient) InspectContainer(id string) (*docker.Container, error) {
return m.containers[id], nil
}
func (m mockDockerClient) ListImages(docker.ListImagesOptions) ([]docker.APIImages, error) {
return m.apiImages, nil
}
func (m mockDockerClient) AddEventListener(events chan<- *docker.APIEvents) error {
return nil
}
func (m mockDockerClient) RemoveEventListener(events chan *docker.APIEvents) error {
return nil
}
func TestDockerTagger(t *testing.T) {
oldPIDTree, oldDockerClient := newPIDTreeStub, newDockerClientStub
defer func() { newPIDTreeStub, newDockerClientStub = oldPIDTree, oldDockerClient }()
newPIDTreeStub = func(procRoot string) (*pidTree, error) {
pid1 := &process{pid: 1}
pid2 := &process{pid: 2, ppid: 1, parent: pid1}
pid1.children = []*process{pid2}
return &pidTree{
processes: map[int]*process{
1: pid1, 2: pid2,
},
}, nil
}
newDockerClientStub = func(endpoint string) (dockerClient, error) {
return mockDockerClient{
apiContainers: []docker.APIContainers{{ID: "foo"}},
containers: map[string]*docker.Container{
"foo": {
ID: "foo",
Name: "bar",
Image: "baz",
State: docker.State{Pid: 1, Running: true},
},
},
apiImages: []docker.APIImages{{ID: "baz", RepoTags: []string{"bang", "not-chosen"}}},
}, nil
}
var (
endpoint1NodeID = "somehost.com;192.168.1.1;12345"
endpoint2NodeID = "somehost.com;192.168.1.1;67890"
processNodeMetadata = report.NodeMetadata{
"docker_container_id": "foo",
"docker_container_name": "bar",
"docker_image_id": "baz",
"docker_image_name": "bang",
}
)
r := report.MakeReport()
r.Process.NodeMetadatas[endpoint1NodeID] = report.NodeMetadata{"pid": "1"}
r.Process.NodeMetadatas[endpoint2NodeID] = report.NodeMetadata{"pid": "2"}
dockerTagger, _ := NewDockerTagger("/irrelevant", 10*time.Second)
runtime.Gosched()
for _, endpointNodeID := range []string{endpoint1NodeID, endpoint2NodeID} {
want := processNodeMetadata.Copy()
have := dockerTagger.Tag(r).Process.NodeMetadatas[endpointNodeID].Copy()
delete(have, "pid")
if !reflect.DeepEqual(want, have) {
t.Errorf("%q: want %+v, have %+v", endpointNodeID, want, have)
}
}
}

View File

@@ -1,4 +1,4 @@
package main
package tag
import (
"fmt"

View File

@@ -1,4 +1,4 @@
package main
package tag
import (
"fmt"
@@ -48,7 +48,7 @@ func TestPIDTree(t *testing.T) {
return []byte(fmt.Sprintf("%d na R %d", pid, parent)), nil
}
pidtree, err := newPIDTree("/proc")
pidtree, err := newPIDTreeStub("/proc")
if err != nil {
t.Fatalf("newPIDTree error: %v", err)
}

16
probe/tag/tagger.go Normal file
View File

@@ -0,0 +1,16 @@
package tag
import "github.com/weaveworks/scope/report"
// Tagger tags nodes with value-add node metadata.
type Tagger interface {
Tag(r report.Report) report.Report
}
// Apply tags the report with all the taggers.
func Apply(r report.Report, taggers []Tagger) report.Report {
for _, tagger := range taggers {
r = tagger.Tag(r)
}
return r
}

44
probe/tag/tagger_test.go Normal file
View File

@@ -0,0 +1,44 @@
package tag_test
import (
"reflect"
"testing"
"github.com/weaveworks/scope/probe/tag"
"github.com/weaveworks/scope/report"
)
func TestApply(t *testing.T) {
var (
processNodeID = "c"
networkNodeID = "d"
processNodeMetadata = report.NodeMetadata{"5": "6"}
networkNodeMetadata = report.NodeMetadata{"7": "8"}
)
r := report.MakeReport()
r.Process.NodeMetadatas[processNodeID] = processNodeMetadata
r.Network.NodeMetadatas[networkNodeID] = networkNodeMetadata
r = tag.Apply(r, []tag.Tagger{tag.NewTopologyTagger()})
for _, tuple := range []struct {
want report.NodeMetadata
from report.Topology
via string
}{
{copy(processNodeMetadata).Merge(report.NodeMetadata{"topology": "process"}), r.Process, processNodeID},
{copy(networkNodeMetadata).Merge(report.NodeMetadata{"topology": "network"}), r.Network, networkNodeID},
} {
if want, have := tuple.want, tuple.from.NodeMetadatas[tuple.via]; !reflect.DeepEqual(want, have) {
t.Errorf("want %+v, have %+v", want, have)
}
}
}
func copy(input report.NodeMetadata) report.NodeMetadata {
output := make(report.NodeMetadata, len(input))
for k, v := range input {
output[k] = v
}
return output
}

View File

@@ -0,0 +1,25 @@
package tag
import (
"github.com/weaveworks/scope/report"
)
type topologyTagger struct{}
// NewTopologyTagger tags each node with the topology that it comes from.
func NewTopologyTagger() Tagger {
return &topologyTagger{}
}
func (topologyTagger) Tag(r report.Report) report.Report {
for val, topology := range map[string]*report.Topology{
"process": &(r.Process),
"network": &(r.Network),
} {
md := report.NodeMetadata{"topology": val}
for nodeID := range topology.NodeMetadatas {
(*topology).NodeMetadatas[nodeID].Merge(md)
}
}
return r
}

View File

@@ -0,0 +1,19 @@
package tag_test
import (
"reflect"
"testing"
"github.com/weaveworks/scope/probe/tag"
"github.com/weaveworks/scope/report"
)
func TestTagMissingID(t *testing.T) {
const nodeID = "not-found"
r := report.MakeReport()
want := report.NodeMetadata{}
have := tag.NewTopologyTagger().Tag(r).Process.NodeMetadatas[nodeID].Copy()
if !reflect.DeepEqual(want, have) {
t.Error("TopologyTagger erroneously tagged a missing node ID")
}
}

View File

@@ -33,6 +33,19 @@ type MapFunc func(string, NodeMetadata) (MappedNode, bool)
// node IDs prior to mapping.
type PseudoFunc func(srcNodeID string, srcNode RenderableNode, dstNodeID string) (MappedNode, bool)
// TopologySelector selects a single topology from a report.
type TopologySelector func(r Report) Topology
// SelectProcess selects the process topology.
func SelectProcess(r Report) Topology {
return r.Process
}
// SelectNetwork selects the network topology.
func SelectNetwork(r Report) Topology {
return r.Network
}
// ProcessPID takes a node NodeMetadata from a Process topology, and returns a
// representation with the ID based on the process PID and the labels based
// on the process name.
@@ -70,10 +83,10 @@ func ProcessName(_ string, m NodeMetadata) (MappedNode, bool) {
// are grouped into the Uncontained node.
func ProcessContainer(_ string, m NodeMetadata) (MappedNode, bool) {
var id, major, minor, rank string
if m["docker_id"] == "" {
if m["docker_container_id"] == "" {
id, major, minor, rank = "uncontained", "Uncontained", "", "uncontained"
} else {
id, major, minor, rank = m["docker_id"], m["docker_name"], m["domain"], m["docker_image_id"]
id, major, minor, rank = m["docker_container_id"], m["docker_container_name"], m["domain"], m["docker_image_id"]
}
return MappedNode{

View File

@@ -69,13 +69,13 @@ func TestUngroupedMapping(t *testing.T) {
f: ProcessContainer,
id: "bar-id",
meta: NodeMetadata{
"pid": "42",
"name": "curl",
"domain": "hosta",
"docker_id": "d321fe0",
"docker_name": "walking_sparrow",
"docker_image_id": "1101fff",
"docker_image_name": "org/app:latest",
"pid": "42",
"name": "curl",
"domain": "hosta",
"docker_container_id": "d321fe0",
"docker_container_name": "walking_sparrow",
"docker_image_id": "1101fff",
"docker_image_name": "org/app:latest",
},
wantOK: true,
wantID: "d321fe0",

View File

@@ -54,6 +54,26 @@ type EdgeMetadata struct {
// which should probably change (see comment on type MapFunc).
type NodeMetadata map[string]string
// Copy returns a value copy, useful for tests.
func (nm NodeMetadata) Copy() NodeMetadata {
cp := make(NodeMetadata, len(nm))
for k, v := range nm {
cp[k] = v
}
return cp
}
// Merge merges two node metadata maps together. In case of conflict, the
// other (right-hand) side wins. Always reassign the result of merge to the
// destination. Merge is defined on the value-type, but node metadata map is
// itself a reference type, so if you want to maintain immutability, use copy.
func (nm NodeMetadata) Merge(other NodeMetadata) NodeMetadata {
for k, v := range other {
nm[k] = v // other takes precedence
}
return nm
}
// NewTopology gives you a Topology.
func NewTopology() Topology {
return Topology{