Merge pull request #386 from weaveworks/356-conntrack

Use conntrack to detect short lived connections.
This commit is contained in:
Tom Wilkie
2015-08-28 10:28:46 +01:00
16 changed files with 851 additions and 379 deletions

28
common/exec/exec.go Normal file
View File

@@ -0,0 +1,28 @@
package exec
import (
"io"
"os"
"os/exec"
)
// Cmd is a hook for mocking
type Cmd interface {
StdoutPipe() (io.ReadCloser, error)
Start() error
Wait() error
Process() *os.Process
}
// Command is a hook for mocking
var Command = func(name string, args ...string) Cmd {
return &realCmd{exec.Command(name, args...)}
}
type realCmd struct {
*exec.Cmd
}
func (c *realCmd) Process() *os.Process {
return c.Cmd.Process
}

225
probe/endpoint/conntrack.go Normal file
View File

@@ -0,0 +1,225 @@
package endpoint
import (
"bufio"
"encoding/xml"
"fmt"
"log"
"os"
"strings"
"sync"
"github.com/weaveworks/scope/common/exec"
)
// Constants exported for testing
const (
modules = "/proc/modules"
conntrackModule = "nf_conntrack"
XMLHeader = "<?xml version=\"1.0\" encoding=\"utf-8\"?>\n"
ConntrackOpenTag = "<conntrack>\n"
TimeWait = "TIME_WAIT"
TCP = "tcp"
New = "new"
Update = "update"
Destroy = "destroy"
)
// Layer3 - these structs are for the parsed conntrack output
type Layer3 struct {
XMLName xml.Name `xml:"layer3"`
SrcIP string `xml:"src"`
DstIP string `xml:"dst"`
}
// Layer4 - these structs are for the parsed conntrack output
type Layer4 struct {
XMLName xml.Name `xml:"layer4"`
SrcPort int `xml:"sport"`
DstPort int `xml:"dport"`
Proto string `xml:"protoname,attr"`
}
// Meta - these structs are for the parsed conntrack output
type Meta struct {
XMLName xml.Name `xml:"meta"`
Direction string `xml:"direction,attr"`
Layer3 Layer3 `xml:"layer3"`
Layer4 Layer4 `xml:"layer4"`
ID int64 `xml:"id"`
State string `xml:"state"`
}
// Flow - these structs are for the parsed conntrack output
type Flow struct {
XMLName xml.Name `xml:"flow"`
Metas []Meta `xml:"meta"`
Type string `xml:"type,attr"`
Original, Reply, Independent *Meta `xml:"-"`
}
// Conntracker uses the conntrack command to track network connections
type Conntracker struct {
sync.Mutex
cmd exec.Cmd
activeFlows map[int64]Flow // active flows in state != TIME_WAIT
bufferedFlows []Flow // flows coming out of activeFlows spend 1 walk cycle here
}
// NewConntracker creates and starts a new Conntracter
func NewConntracker(args ...string) (*Conntracker, error) {
if !ConntrackModulePresent() {
return nil, fmt.Errorf("No conntrack module")
}
result := &Conntracker{
activeFlows: map[int64]Flow{},
}
go result.run(args...)
return result, nil
}
// ConntrackModulePresent returns true if the kernel has the conntrack module
// present. It is made public for mocking.
var ConntrackModulePresent = func() bool {
f, err := os.Open(modules)
if err != nil {
return false
}
defer f.Close()
scanner := bufio.NewScanner(f)
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, conntrackModule) {
return true
}
}
if err := scanner.Err(); err != nil {
log.Printf("conntrack error: %v", err)
}
log.Printf("conntrack: failed to find module %s", conntrackModule)
return false
}
// NB this is not re-entrant!
func (c *Conntracker) run(args ...string) {
args = append([]string{"-E", "-o", "xml"}, args...)
cmd := exec.Command("conntrack", args...)
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Printf("conntrack error: %v", err)
return
}
if err := cmd.Start(); err != nil {
log.Printf("conntrack error: %v", err)
return
}
c.Lock()
c.cmd = cmd
c.Unlock()
defer func() {
if err := cmd.Wait(); err != nil {
log.Printf("conntrack error: %v", err)
}
}()
// Swallow the first two lines
reader := bufio.NewReader(stdout)
if line, err := reader.ReadString('\n'); err != nil {
log.Printf("conntrack error: %v", err)
return
} else if line != XMLHeader {
log.Printf("conntrack invalid output: '%s'", line)
return
}
if line, err := reader.ReadString('\n'); err != nil {
log.Printf("conntrack error: %v", err)
return
} else if line != ConntrackOpenTag {
log.Printf("conntrack invalid output: '%s'", line)
return
}
// Now loop on the output stream
decoder := xml.NewDecoder(reader)
for {
var f Flow
if err := decoder.Decode(&f); err != nil {
log.Printf("conntrack error: %v", err)
}
c.handleFlow(f)
}
}
// Stop stop stop
func (c *Conntracker) Stop() {
c.Lock()
defer c.Unlock()
if c.cmd == nil {
return
}
if p := c.cmd.Process(); p != nil {
p.Kill()
}
}
func (c *Conntracker) handleFlow(f Flow) {
// A flow consists of 3 'metas' - the 'original' 4 tuple (as seen by this
// host) and the 'reply' 4 tuple, which is what it has been rewritten to.
// This code finds those metas, which are identified by a Direction
// attribute.
for i := range f.Metas {
meta := &f.Metas[i]
switch meta.Direction {
case "original":
f.Original = meta
case "reply":
f.Reply = meta
case "independent":
f.Independent = meta
}
}
// For not, I'm only interested in tcp connections - there is too much udp
// traffic going on (every container talking to weave dns, for example) to
// render nicely. TODO: revisit this.
if f.Original.Layer4.Proto != TCP {
return
}
c.Lock()
defer c.Unlock()
switch f.Type {
case New, Update:
if f.Independent.State != TimeWait {
c.activeFlows[f.Independent.ID] = f
} else if _, ok := c.activeFlows[f.Independent.ID]; ok {
delete(c.activeFlows, f.Independent.ID)
c.bufferedFlows = append(c.bufferedFlows, f)
}
case Destroy:
if _, ok := c.activeFlows[f.Independent.ID]; ok {
delete(c.activeFlows, f.Independent.ID)
c.bufferedFlows = append(c.bufferedFlows, f)
}
}
}
// WalkFlows calls f with all active flows and flows that have come and gone
// since the last call to WalkFlows
func (c *Conntracker) WalkFlows(f func(Flow)) {
c.Lock()
defer c.Unlock()
for _, flow := range c.activeFlows {
f(flow)
}
for _, flow := range c.bufferedFlows {
f(flow)
}
c.bufferedFlows = c.bufferedFlows[:0]
}

View File

@@ -0,0 +1,143 @@
package endpoint_test
import (
"bufio"
"encoding/xml"
"io"
"testing"
"time"
"github.com/weaveworks/scope/common/exec"
. "github.com/weaveworks/scope/probe/endpoint"
"github.com/weaveworks/scope/test"
testExec "github.com/weaveworks/scope/test/exec"
)
func makeFlow(id int64, srcIP, dstIP string, srcPort, dstPort int, ty, state string) Flow {
return Flow{
XMLName: xml.Name{
Local: "flow",
},
Type: ty,
Metas: []Meta{
{
XMLName: xml.Name{
Local: "meta",
},
Direction: "original",
Layer3: Layer3{
XMLName: xml.Name{
Local: "layer3",
},
SrcIP: srcIP,
DstIP: dstIP,
},
Layer4: Layer4{
XMLName: xml.Name{
Local: "layer4",
},
SrcPort: srcPort,
DstPort: dstPort,
Proto: TCP,
},
},
{
XMLName: xml.Name{
Local: "meta",
},
Direction: "independent",
ID: id,
State: state,
Layer3: Layer3{
XMLName: xml.Name{
Local: "layer3",
},
},
Layer4: Layer4{
XMLName: xml.Name{
Local: "layer4",
},
},
},
},
}
}
func TestConntracker(t *testing.T) {
oldExecCmd, oldConntrackPresent := exec.Command, ConntrackModulePresent
defer func() { exec.Command, ConntrackModulePresent = oldExecCmd, oldConntrackPresent }()
ConntrackModulePresent = func() bool {
return true
}
reader, writer := io.Pipe()
exec.Command = func(name string, args ...string) exec.Cmd {
return testExec.NewMockCmd(reader)
}
conntracker, err := NewConntracker()
if err != nil {
t.Fatal(err)
}
bw := bufio.NewWriter(writer)
if _, err := bw.WriteString(XMLHeader); err != nil {
t.Fatal(err)
}
if _, err := bw.WriteString(ConntrackOpenTag); err != nil {
t.Fatal(err)
}
if err := bw.Flush(); err != nil {
t.Fatal(err)
}
have := func() interface{} {
result := []Flow{}
conntracker.WalkFlows(func(f Flow) {
f.Original = nil
f.Reply = nil
f.Independent = nil
result = append(result, f)
})
return result
}
ts := 100 * time.Millisecond
// First, assert we have no flows
test.Poll(t, ts, []Flow{}, have)
// Now add some flows
xmlEncoder := xml.NewEncoder(bw)
writeFlow := func(f Flow) {
if err := xmlEncoder.Encode(f); err != nil {
t.Fatal(err)
}
if _, err := bw.WriteString("\n"); err != nil {
t.Fatal(err)
}
if err := bw.Flush(); err != nil {
t.Fatal(err)
}
}
flow1 := makeFlow(1, "1.2.3.4", "2.3.4.5", 2, 3, New, "")
writeFlow(flow1)
test.Poll(t, ts, []Flow{flow1}, have)
// Now check when we remove the flow, we still get it in the next Walk
flow1.Type = Destroy
writeFlow(flow1)
test.Poll(t, ts, []Flow{flow1}, have)
test.Poll(t, ts, []Flow{}, have)
// This time we're not going to remove it, but put it in state TIME_WAIT
flow1.Type = New
writeFlow(flow1)
test.Poll(t, ts, []Flow{flow1}, have)
flow1.Metas[1].State = TimeWait
writeFlow(flow1)
test.Poll(t, ts, []Flow{flow1}, have)
test.Poll(t, ts, []Flow{}, have)
}

View File

@@ -1,54 +1,11 @@
package endpoint
import (
"bufio"
"encoding/xml"
"io"
"log"
"os"
"os/exec"
"strconv"
"strings"
"github.com/weaveworks/scope/report"
)
const (
modules = "/proc/modules"
conntrackModule = "nf_conntrack"
)
// these structs are for the parsed conntrack output
type layer3 struct {
XMLName xml.Name `xml:"layer3"`
SrcIP string `xml:"src"`
DstIP string `xml:"dst"`
}
type layer4 struct {
XMLName xml.Name `xml:"layer4"`
SrcPort int `xml:"sport"`
DstPort int `xml:"dport"`
Proto string `xml:"protoname,attr"`
}
type meta struct {
XMLName xml.Name `xml:"meta"`
Direction string `xml:"direction,attr"`
Layer3 layer3 `xml:"layer3"`
Layer4 layer4 `xml:"layer4"`
}
type flow struct {
XMLName xml.Name `xml:"flow"`
Metas []meta `xml:"meta"`
}
type conntrack struct {
XMLName xml.Name `xml:"conntrack"`
Flows []flow `xml:"flow"`
}
// This is our 'abstraction' of the endpoint that have been rewritten by NAT.
// Original is the private IP that has been rewritten.
type endpointMapping struct {
@@ -59,111 +16,51 @@ type endpointMapping struct {
rewrittenPort int
}
// natTable returns a list of endpoints that have been remapped by NAT.
func natTable() ([]endpointMapping, error) {
var conntrack conntrack
cmd := exec.Command("conntrack", "-L", "--any-nat", "-o", "xml")
stdout, err := cmd.StdoutPipe()
type natmapper struct {
*Conntracker
}
func newNATMapper() (*natmapper, error) {
ct, err := NewConntracker("--any-nat")
if err != nil {
return nil, err
}
if err := cmd.Start(); err != nil {
return nil, err
}
defer func() {
if err := cmd.Wait(); err != nil {
log.Printf("conntrack error: %v", err)
return &natmapper{ct}, nil
}
func toMapping(f Flow) *endpointMapping {
var mapping endpointMapping
if f.Original.Layer3.SrcIP == f.Reply.Layer3.DstIP {
mapping = endpointMapping{
originalIP: f.Reply.Layer3.SrcIP,
originalPort: f.Reply.Layer4.SrcPort,
rewrittenIP: f.Original.Layer3.DstIP,
rewrittenPort: f.Original.Layer4.DstPort,
}
}()
if err := xml.NewDecoder(stdout).Decode(&conntrack); err != nil {
if err == io.EOF {
return []endpointMapping{}, nil
} else {
mapping = endpointMapping{
originalIP: f.Original.Layer3.SrcIP,
originalPort: f.Original.Layer4.SrcPort,
rewrittenIP: f.Reply.Layer3.DstIP,
rewrittenPort: f.Reply.Layer4.DstPort,
}
return nil, err
}
output := []endpointMapping{}
for _, flow := range conntrack.Flows {
// A flow consists of 3 'metas' - the 'original' 4 tuple (as seen by this
// host) and the 'reply' 4 tuple, which is what it has been rewritten to.
// This code finds those metas, which are identified by a Direction
// attribute.
original, reply := meta{}, meta{}
for _, meta := range flow.Metas {
if meta.Direction == "original" {
original = meta
} else if meta.Direction == "reply" {
reply = meta
}
}
if original.Layer4.Proto != "tcp" {
continue
}
var conn endpointMapping
if original.Layer3.SrcIP == reply.Layer3.DstIP {
conn = endpointMapping{
originalIP: reply.Layer3.SrcIP,
originalPort: reply.Layer4.SrcPort,
rewrittenIP: original.Layer3.DstIP,
rewrittenPort: original.Layer4.DstPort,
}
} else {
conn = endpointMapping{
originalIP: original.Layer3.SrcIP,
originalPort: original.Layer4.SrcPort,
rewrittenIP: reply.Layer3.DstIP,
rewrittenPort: reply.Layer4.DstPort,
}
}
output = append(output, conn)
}
return output, nil
return &mapping
}
// applyNAT duplicates NodeMetadatas in the endpoint topology of a
// report, based on the NAT table as returns by natTable.
func applyNAT(rpt report.Report, scope string) error {
mappings, err := natTable()
if err != nil {
return err
}
for _, mapping := range mappings {
func (n *natmapper) applyNAT(rpt report.Report, scope string) {
n.WalkFlows(func(f Flow) {
mapping := toMapping(f)
realEndpointID := report.MakeEndpointNodeID(scope, mapping.originalIP, strconv.Itoa(mapping.originalPort))
copyEndpointID := report.MakeEndpointNodeID(scope, mapping.rewrittenIP, strconv.Itoa(mapping.rewrittenPort))
nmd, ok := rpt.Endpoint.NodeMetadatas[realEndpointID]
if !ok {
continue
return
}
rpt.Endpoint.NodeMetadatas[copyEndpointID] = nmd.Copy()
}
return nil
}
func conntrackModulePresent() bool {
f, err := os.Open(modules)
if err != nil {
return false
}
defer f.Close()
scanner := bufio.NewScanner(f)
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, conntrackModule) {
return true
}
}
if err := scanner.Err(); err != nil {
log.Printf("conntrack error: %v", err)
}
log.Printf("conntrack: failed to find module %s", conntrackModule)
return false
})
}

View File

@@ -1,7 +1,7 @@
package endpoint
import (
"fmt"
"log"
"strconv"
"time"
@@ -24,6 +24,8 @@ type Reporter struct {
hostName string
includeProcesses bool
includeNAT bool
conntracker *Conntracker
natmapper *natmapper
}
// SpyDuration is an exported prometheus metric
@@ -43,12 +45,41 @@ var SpyDuration = prometheus.NewSummaryVec(
// on the host machine, at the granularity of host and port. That information
// is stored in the Endpoint topology. It optionally enriches that topology
// with process (PID) information.
func NewReporter(hostID, hostName string, includeProcesses bool) *Reporter {
func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bool) *Reporter {
var (
conntrackModulePresent = ConntrackModulePresent()
conntracker *Conntracker
natmapper *natmapper
err error
)
if conntrackModulePresent && useConntrack {
conntracker, err = NewConntracker()
if err != nil {
log.Printf("Failed to start conntracker: %v", err)
}
}
if conntrackModulePresent {
natmapper, err = newNATMapper()
if err != nil {
log.Printf("Failed to start natMapper: %v", err)
}
}
return &Reporter{
hostID: hostID,
hostName: hostName,
includeProcesses: includeProcesses,
includeNAT: conntrackModulePresent(),
conntracker: conntracker,
natmapper: natmapper,
}
}
// Stop stop stop
func (r *Reporter) Stop() {
if r.conntracker != nil {
r.conntracker.Stop()
}
if r.natmapper != nil {
r.natmapper.Stop()
}
}
@@ -65,50 +96,73 @@ func (r *Reporter) Report() (report.Report, error) {
}
for conn := conns.Next(); conn != nil; conn = conns.Next() {
r.addConnection(&rpt, conn)
var (
localPort = conn.LocalPort
remotePort = conn.RemotePort
localAddr = conn.LocalAddress.String()
remoteAddr = conn.RemoteAddress.String()
)
r.addConnection(&rpt, localAddr, remoteAddr, localPort, remotePort, &conn.Proc)
}
if r.includeNAT {
err = applyNAT(rpt, r.hostID)
if r.conntracker != nil {
r.conntracker.WalkFlows(func(f Flow) {
var (
localPort = f.Original.Layer4.SrcPort
remotePort = f.Original.Layer4.DstPort
localAddr = f.Original.Layer3.SrcIP
remoteAddr = f.Original.Layer3.DstIP
)
r.addConnection(&rpt, localAddr, remoteAddr, uint16(localPort), uint16(remotePort), nil)
})
}
if r.natmapper != nil {
r.natmapper.applyNAT(rpt, r.hostID)
}
return rpt, err
}
func (r *Reporter) addConnection(rpt *report.Report, c *procspy.Connection) {
var (
localIsClient = int(c.LocalPort) > int(c.RemotePort)
localAddressNodeID = report.MakeAddressNodeID(r.hostID, c.LocalAddress.String())
remoteAddressNodeID = report.MakeAddressNodeID(r.hostID, c.RemoteAddress.String())
adjacencyID = ""
edgeID = ""
)
func (r *Reporter) addConnection(rpt *report.Report, localAddr, remoteAddr string, localPort, remotePort uint16, proc *procspy.Proc) {
localIsClient := int(localPort) > int(remotePort)
if localIsClient {
adjacencyID = report.MakeAdjacencyID(localAddressNodeID)
rpt.Address.Adjacency[adjacencyID] = rpt.Address.Adjacency[adjacencyID].Add(remoteAddressNodeID)
edgeID = report.MakeEdgeID(localAddressNodeID, remoteAddressNodeID)
} else {
adjacencyID = report.MakeAdjacencyID(remoteAddressNodeID)
rpt.Address.Adjacency[adjacencyID] = rpt.Address.Adjacency[adjacencyID].Add(localAddressNodeID)
edgeID = report.MakeEdgeID(remoteAddressNodeID, localAddressNodeID)
}
if _, ok := rpt.Address.NodeMetadatas[localAddressNodeID]; !ok {
rpt.Address.NodeMetadatas[localAddressNodeID] = report.MakeNodeMetadataWith(map[string]string{
"name": r.hostName,
Addr: c.LocalAddress.String(),
})
}
countTCPConnection(rpt.Address.EdgeMetadatas, edgeID)
if c.Proc.PID > 0 {
// Update address topology
{
var (
localEndpointNodeID = report.MakeEndpointNodeID(r.hostID, c.LocalAddress.String(), strconv.Itoa(int(c.LocalPort)))
remoteEndpointNodeID = report.MakeEndpointNodeID(r.hostID, c.RemoteAddress.String(), strconv.Itoa(int(c.RemotePort)))
localAddressNodeID = report.MakeAddressNodeID(r.hostID, localAddr)
remoteAddressNodeID = report.MakeAddressNodeID(r.hostID, remoteAddr)
adjacencyID = ""
edgeID = ""
)
if localIsClient {
adjacencyID = report.MakeAdjacencyID(localAddressNodeID)
rpt.Address.Adjacency[adjacencyID] = rpt.Address.Adjacency[adjacencyID].Add(remoteAddressNodeID)
edgeID = report.MakeEdgeID(localAddressNodeID, remoteAddressNodeID)
} else {
adjacencyID = report.MakeAdjacencyID(remoteAddressNodeID)
rpt.Address.Adjacency[adjacencyID] = rpt.Address.Adjacency[adjacencyID].Add(localAddressNodeID)
edgeID = report.MakeEdgeID(remoteAddressNodeID, localAddressNodeID)
}
countTCPConnection(rpt.Address.EdgeMetadatas, edgeID)
if _, ok := rpt.Address.NodeMetadatas[localAddressNodeID]; !ok {
rpt.Address.NodeMetadatas[localAddressNodeID] = report.MakeNodeMetadataWith(map[string]string{
"name": r.hostName,
Addr: localAddr,
})
}
}
// Update endpoint topology
if r.includeProcesses {
var (
localEndpointNodeID = report.MakeEndpointNodeID(r.hostID, localAddr, strconv.Itoa(int(localPort)))
remoteEndpointNodeID = report.MakeEndpointNodeID(r.hostID, remoteAddr, strconv.Itoa(int(remotePort)))
adjacencyID = ""
edgeID = ""
)
@@ -125,18 +179,24 @@ func (r *Reporter) addConnection(rpt *report.Report, c *procspy.Connection) {
edgeID = report.MakeEdgeID(remoteEndpointNodeID, localEndpointNodeID)
}
if _, ok := rpt.Endpoint.NodeMetadatas[localEndpointNodeID]; !ok {
// First hit establishes NodeMetadata for scoped local address + port
md := report.MakeNodeMetadataWith(map[string]string{
Addr: c.LocalAddress.String(),
Port: strconv.Itoa(int(c.LocalPort)),
process.PID: fmt.Sprint(c.Proc.PID),
})
countTCPConnection(rpt.Endpoint.EdgeMetadatas, edgeID)
md, ok := rpt.Endpoint.NodeMetadatas[localEndpointNodeID]
updated := !ok
if !ok {
md = report.MakeNodeMetadataWith(map[string]string{
Addr: localAddr,
Port: strconv.Itoa(int(localPort)),
})
}
if proc != nil && proc.PID > 0 {
pid := strconv.FormatUint(uint64(proc.PID), 10)
updated = updated || md.Metadata[process.PID] != pid
md.Metadata[process.PID] = pid
}
if updated {
rpt.Endpoint.NodeMetadatas[localEndpointNodeID] = md
}
countTCPConnection(rpt.Endpoint.EdgeMetadatas, edgeID)
}
}

View File

@@ -19,7 +19,6 @@ var (
fixRemotePortB = uint16(12346)
fixProcessPID = uint(4242)
fixProcessName = "nginx"
fixProcessPIDB = uint(4243)
fixConnections = []procspy.Connection{
{
@@ -55,9 +54,9 @@ var (
LocalAddress: fixLocalAddress,
LocalPort: fixLocalPort,
RemoteAddress: fixRemoteAddress,
RemotePort: fixRemotePort,
RemotePort: fixRemotePortB,
Proc: procspy.Proc{
PID: fixProcessPIDB,
PID: fixProcessPID,
Name: fixProcessName,
},
},
@@ -72,7 +71,7 @@ func TestSpyNoProcesses(t *testing.T) {
nodeName = "frenchs-since-1904" // TODO rename to hostNmae
)
reporter := endpoint.NewReporter(nodeID, nodeName, false)
reporter := endpoint.NewReporter(nodeID, nodeName, false, false)
r, _ := reporter.Report()
//buf, _ := json.MarshalIndent(r, "", " ")
//t.Logf("\n%s\n", buf)
@@ -109,7 +108,7 @@ func TestSpyWithProcesses(t *testing.T) {
nodeName = "fishermans-friend" // TODO rename to hostNmae
)
reporter := endpoint.NewReporter(nodeID, nodeName, false)
reporter := endpoint.NewReporter(nodeID, nodeName, true, false)
r, _ := reporter.Report()
// buf, _ := json.MarshalIndent(r, "", " ") ; t.Logf("\n%s\n", buf)

View File

@@ -46,6 +46,7 @@ func main() {
captureOn = flag.Duration("capture.on", 1*time.Second, "packet capture duty cycle 'on'")
captureOff = flag.Duration("capture.off", 5*time.Second, "packet capture duty cycle 'off'")
printVersion = flag.Bool("version", false, "print version number and exit")
useConntrack = flag.Bool("conntrack", true, "also use conntrack to track connections")
)
flag.Parse()
@@ -103,13 +104,16 @@ func main() {
}
var (
taggers = []Tagger{newTopologyTagger(), host.NewTagger(hostID)}
reporters = []Reporter{host.NewReporter(hostID, hostName, localNets), endpoint.NewReporter(hostID, hostName, *spyProcs)}
processCache *process.CachingWalker
endpointReporter = endpoint.NewReporter(hostID, hostName, *spyProcs, *useConntrack)
processCache = process.NewCachingWalker(process.NewWalker(*procRoot))
reporters = []Reporter{
endpointReporter,
host.NewReporter(hostID, hostName, localNets),
process.NewReporter(processCache, hostID),
}
taggers = []Tagger{newTopologyTagger(), host.NewTagger(hostID)}
)
processCache = process.NewCachingWalker(process.NewWalker(*procRoot))
reporters = append(reporters, process.NewReporter(processCache, hostID))
defer endpointReporter.Stop()
if *dockerEnabled {
if err := report.AddLocalBridge(*dockerBridge); err != nil {

View File

@@ -4,15 +4,14 @@ import (
"bufio"
"encoding/json"
"fmt"
"io"
"log"
"net"
"net/http"
"net/url"
"os/exec"
"regexp"
"strings"
"github.com/weaveworks/scope/common/exec"
"github.com/weaveworks/scope/probe/docker"
"github.com/weaveworks/scope/report"
)
@@ -36,16 +35,6 @@ const (
var weavePsMatch = regexp.MustCompile(`^([0-9a-f]{12}) ((?:[0-9a-f][0-9a-f]\:){5}(?:[0-9a-f][0-9a-f]))(.*)$`)
var ipMatch = regexp.MustCompile(`([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})(/[0-9]+)`)
// Cmd is a hook for mocking
type Cmd interface {
StdoutPipe() (io.ReadCloser, error)
Start() error
Wait() error
}
// ExecCommand is a hook for mocking
var ExecCommand = func(name string, args ...string) Cmd { return exec.Command(name, args...) }
// Weave represents a single Weave router, presumably on the same host
// as the probe. It is both a Reporter and a Tagger: it produces an Overlay
// topology, and (in theory) can tag existing topologies with foreign keys to
@@ -114,7 +103,7 @@ type psEntry struct {
func (w Weave) ps() ([]psEntry, error) {
var result []psEntry
cmd := ExecCommand("weave", "--local", "ps")
cmd := exec.Command("weave", "--local", "ps")
out, err := cmd.StdoutPipe()
if err != nil {
return result, err

View File

@@ -1,50 +1,25 @@
package overlay_test
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"reflect"
"testing"
"github.com/weaveworks/scope/common/exec"
"github.com/weaveworks/scope/probe/docker"
"github.com/weaveworks/scope/probe/overlay"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/test"
testExec "github.com/weaveworks/scope/test/exec"
)
type mockCmd struct {
*bytes.Buffer
}
func (c *mockCmd) Start() error {
return nil
}
func (c *mockCmd) Wait() error {
return nil
}
func (c *mockCmd) StdoutPipe() (io.ReadCloser, error) {
return struct {
io.Reader
io.Closer
}{
c.Buffer,
ioutil.NopCloser(nil),
}, nil
}
func TestWeaveTaggerOverlayTopology(t *testing.T) {
oldExecCmd := overlay.ExecCommand
defer func() { overlay.ExecCommand = oldExecCmd }()
overlay.ExecCommand = func(name string, args ...string) overlay.Cmd {
return &mockCmd{
bytes.NewBufferString(fmt.Sprintf("%s %s %s/24\n", mockContainerID, mockContainerMAC, mockContainerIP)),
}
oldExecCmd := exec.Command
defer func() { exec.Command = oldExecCmd }()
exec.Command = func(name string, args ...string) exec.Cmd {
return testExec.NewMockCmdString(fmt.Sprintf("%s %s %s/24\n", mockContainerID, mockContainerMAC, mockContainerIP))
}
s := httptest.NewServer(http.HandlerFunc(mockWeaveRouter))

View File

@@ -25,16 +25,13 @@ const (
)
// LeafMapFunc is anything which can take an arbitrary NodeMetadata, which is
// always one-to-one with nodes in a topology, and return a specific
// representation of the referenced node, in the form of a node ID and a
// human-readable major and minor labels.
// always one-to-one with nodes in a topology, and return a set of RenderableNodes
// - specific representations of the referenced node, in the form of a map of node
// ID to a human-readable major and minor labels.
//
// A single NodeMetadata can yield arbitrary many representations, including
// representations that reduce the cardinality of the set of nodes.
//
// If the final output parameter is false, the node shall be omitted from the
// rendered topology.
type LeafMapFunc func(report.NodeMetadata) (RenderableNode, bool)
// representations that reduce (or even increase) the cardinality of the set of nodes.
type LeafMapFunc func(report.NodeMetadata) RenderableNodes
// PseudoFunc creates RenderableNode representing pseudo nodes given the
// srcNodeID. dstNodeID is the node id of one of the nodes this node has an
@@ -44,24 +41,24 @@ type LeafMapFunc func(report.NodeMetadata) (RenderableNode, bool)
type PseudoFunc func(srcNodeID, dstNodeID string, srcIsClient bool, local report.Networks) (RenderableNode, bool)
// MapFunc is anything which can take an arbitrary RenderableNode and
// return another RenderableNode.
// return a set of other RenderableNodes.
//
// As with LeafMapFunc, if the final output parameter is false, the node
// As with LeafMapFunc, if the output is empty, the node
// shall be omitted from the rendered topology.
type MapFunc func(RenderableNode) (RenderableNode, bool)
type MapFunc func(RenderableNode) RenderableNodes
// MapEndpointIdentity maps an endpoint topology node to an endpoint
// MapEndpointIdentity maps an endpoint topology node to a single endpoint
// renderable node. As it is only ever run on endpoint topology nodes, we
// expect that certain keys are present.
func MapEndpointIdentity(m report.NodeMetadata) (RenderableNode, bool) {
func MapEndpointIdentity(m report.NodeMetadata) RenderableNodes {
addr, ok := m.Metadata[endpoint.Addr]
if !ok {
return RenderableNode{}, false
return RenderableNodes{}
}
port, ok := m.Metadata[endpoint.Port]
if !ok {
return RenderableNode{}, false
return RenderableNodes{}
}
var (
@@ -75,16 +72,16 @@ func MapEndpointIdentity(m report.NodeMetadata) (RenderableNode, bool) {
minor = fmt.Sprintf("%s (%s)", minor, pid)
}
return NewRenderableNode(id, major, minor, rank, m), true
return RenderableNodes{id: NewRenderableNode(id, major, minor, rank, m)}
}
// MapProcessIdentity maps a process topology node to a process renderable
// node. As it is only ever run on process topology nodes, we expect that
// certain keys are present.
func MapProcessIdentity(m report.NodeMetadata) (RenderableNode, bool) {
func MapProcessIdentity(m report.NodeMetadata) RenderableNodes {
pid, ok := m.Metadata[process.PID]
if !ok {
return RenderableNode{}, false
return RenderableNodes{}
}
var (
@@ -94,16 +91,16 @@ func MapProcessIdentity(m report.NodeMetadata) (RenderableNode, bool) {
rank = m.Metadata["comm"]
)
return NewRenderableNode(id, major, minor, rank, m), true
return RenderableNodes{id: NewRenderableNode(id, major, minor, rank, m)}
}
// MapContainerIdentity maps a container topology node to a container
// renderable node. As it is only ever run on container topology nodes, we
// expect that certain keys are present.
func MapContainerIdentity(m report.NodeMetadata) (RenderableNode, bool) {
func MapContainerIdentity(m report.NodeMetadata) RenderableNodes {
id, ok := m.Metadata[docker.ContainerID]
if !ok {
return RenderableNode{}, false
return RenderableNodes{}
}
var (
@@ -112,16 +109,16 @@ func MapContainerIdentity(m report.NodeMetadata) (RenderableNode, bool) {
rank = m.Metadata[docker.ImageID]
)
return NewRenderableNode(id, major, minor, rank, m), true
return RenderableNodes{id: NewRenderableNode(id, major, minor, rank, m)}
}
// MapContainerImageIdentity maps a container image topology node to container
// image renderable node. As it is only ever run on container image topology
// nodes, we expect that certain keys are present.
func MapContainerImageIdentity(m report.NodeMetadata) (RenderableNode, bool) {
func MapContainerImageIdentity(m report.NodeMetadata) RenderableNodes {
id, ok := m.Metadata[docker.ImageID]
if !ok {
return RenderableNode{}, false
return RenderableNodes{}
}
var (
@@ -129,16 +126,16 @@ func MapContainerImageIdentity(m report.NodeMetadata) (RenderableNode, bool) {
rank = m.Metadata[docker.ImageID]
)
return NewRenderableNode(id, major, "", rank, m), true
return RenderableNodes{id: NewRenderableNode(id, major, "", rank, m)}
}
// MapAddressIdentity maps an address topology node to an address renderable
// node. As it is only ever run on address topology nodes, we expect that
// certain keys are present.
func MapAddressIdentity(m report.NodeMetadata) (RenderableNode, bool) {
func MapAddressIdentity(m report.NodeMetadata) RenderableNodes {
addr, ok := m.Metadata[endpoint.Addr]
if !ok {
return RenderableNode{}, false
return RenderableNodes{}
}
var (
@@ -148,13 +145,13 @@ func MapAddressIdentity(m report.NodeMetadata) (RenderableNode, bool) {
rank = major
)
return NewRenderableNode(id, major, minor, rank, m), true
return RenderableNodes{id: NewRenderableNode(id, major, minor, rank, m)}
}
// MapHostIdentity maps a host topology node to a host renderable node. As it
// is only ever run on host topology nodes, we expect that certain keys are
// present.
func MapHostIdentity(m report.NodeMetadata) (RenderableNode, bool) {
func MapHostIdentity(m report.NodeMetadata) RenderableNodes {
var (
id = MakeHostID(report.ExtractHostID(m))
hostname = m.Metadata[host.HostName]
@@ -168,7 +165,79 @@ func MapHostIdentity(m report.NodeMetadata) (RenderableNode, bool) {
major = hostname
}
return NewRenderableNode(id, major, minor, rank, m), true
return RenderableNodes{id: NewRenderableNode(id, major, minor, rank, m)}
}
// MapEndpoint2IP maps endpoint nodes to their IP address, for joining
// with container nodes. We drop endpoint nodes with pids, as they
// will be joined to containers through the process topology, and we
// don't want to double count edges.
func MapEndpoint2IP(m report.NodeMetadata) RenderableNodes {
_, ok := m.Metadata[process.PID]
if ok {
return RenderableNodes{}
}
addr, ok := m.Metadata[endpoint.Addr]
if !ok {
return RenderableNodes{}
}
return RenderableNodes{addr: NewRenderableNode(addr, "", "", "", m)}
}
// IPPseudoNode maps endpoint pseudo nodes to regular IP address nodes, or
// the internet node.
func IPPseudoNode(srcNodeID, _ string, _ bool, local report.Networks) (RenderableNode, bool) {
// Use the addresser to extract the IP of the missing node
srcNodeAddr := report.EndpointIDAddresser(srcNodeID)
// If the dstNodeAddr is not in a network local to this report, we emit an
// internet node
if !local.Contains(srcNodeAddr) {
return newPseudoNode(TheInternetID, TheInternetMajor, ""), true
}
return NewRenderableNode(srcNodeAddr.String(), "", "", "", report.MakeNodeMetadata()), true
}
// MapContainer2IP maps container nodes to their IP addresses (outputs
// multiple nodes). This allows container to be joined directly with
// the endpoint topology.
func MapContainer2IP(m report.NodeMetadata) RenderableNodes {
result := RenderableNodes{}
addrs, ok := m.Metadata[docker.ContainerIPs]
if !ok {
return result
}
for _, addr := range strings.Fields(addrs) {
n := NewRenderableNode(addr, "", "", "", m)
n.NodeMetadata.Counters[containersKey] = 1
result[addr] = n
}
return result
}
// MapIP2Container maps IP nodes produced from MapContainer2IP back to
// container nodes. If there is more than one container with a given
// IP, it is dropped.
func MapIP2Container(n RenderableNode) RenderableNodes {
// If an IP is shared between multiple containers, we can't
// reliably attribute an connection based on its IP
if n.NodeMetadata.Counters[containersKey] > 1 {
return RenderableNodes{}
}
// Propogate the internet pseudo node.
if n.ID == TheInternetID {
return RenderableNodes{n.ID: n}
}
// If this node is not a container, exclude it.
// This excludes all the nodes we've dragged in from endpoint
// that we failed to join to a container.
id, ok := n.NodeMetadata.Metadata[docker.ContainerID]
if !ok {
return RenderableNodes{}
}
return RenderableNodes{id: newDerivedNode(id, n)}
}
// MapEndpoint2Process maps endpoint RenderableNodes to process
@@ -182,18 +251,18 @@ func MapHostIdentity(m report.NodeMetadata) (RenderableNode, bool) {
// format for a process, but without any Major or Minor labels.
// It does not have enough info to do that, and the resulting graph
// must be merged with a process graph to get that info.
func MapEndpoint2Process(n RenderableNode) (RenderableNode, bool) {
func MapEndpoint2Process(n RenderableNode) RenderableNodes {
if n.Pseudo {
return n, true
return RenderableNodes{n.ID: n}
}
pid, ok := n.NodeMetadata.Metadata[process.PID]
if !ok {
return RenderableNode{}, false
return RenderableNodes{}
}
id := MakeProcessID(report.ExtractHostID(n.NodeMetadata), pid)
return newDerivedNode(id, n), true
return RenderableNodes{id: newDerivedNode(id, n)}
}
// MapProcess2Container maps process RenderableNodes to container
@@ -207,15 +276,15 @@ func MapEndpoint2Process(n RenderableNode) (RenderableNode, bool) {
// format for a container, but without any Major or Minor labels.
// It does not have enough info to do that, and the resulting graph
// must be merged with a container graph to get that info.
func MapProcess2Container(n RenderableNode) (RenderableNode, bool) {
func MapProcess2Container(n RenderableNode) RenderableNodes {
// Propogate the internet pseudo node
if n.ID == TheInternetID {
return n, true
return RenderableNodes{n.ID: n}
}
// Don't propogate non-internet pseudo nodes
if n.Pseudo {
return n, false
return RenderableNodes{}
}
// Otherwise, if the process is not in a container, group it
@@ -228,10 +297,10 @@ func MapProcess2Container(n RenderableNode) (RenderableNode, bool) {
id = MakePseudoNodeID(UncontainedID, hostID)
node := newDerivedPseudoNode(id, UncontainedMajor, n)
node.LabelMinor = hostID
return node, true
return RenderableNodes{id: node}
}
return newDerivedNode(id, n), true
return RenderableNodes{id: newDerivedNode(id, n)}
}
// MapProcess2Name maps process RenderableNodes to RenderableNodes
@@ -240,29 +309,29 @@ func MapProcess2Container(n RenderableNode) (RenderableNode, bool) {
// This mapper is unlike the other foo2bar mappers as the intention
// is not to join the information with another topology. Therefore
// it outputs a properly-formed node with labels etc.
func MapProcess2Name(n RenderableNode) (RenderableNode, bool) {
func MapProcess2Name(n RenderableNode) RenderableNodes {
if n.Pseudo {
return n, true
return RenderableNodes{n.ID: n}
}
name, ok := n.NodeMetadata.Metadata["comm"]
if !ok {
return RenderableNode{}, false
return RenderableNodes{}
}
node := newDerivedNode(name, n)
node.LabelMajor = name
node.Rank = name
node.NodeMetadata.Counters[processesKey] = 1
return node, true
return RenderableNodes{name: node}
}
// MapCountProcessName maps 1:1 process name nodes, counting
// the number of processes grouped together and putting
// that info in the minor label.
func MapCountProcessName(n RenderableNode) (RenderableNode, bool) {
func MapCountProcessName(n RenderableNode) RenderableNodes {
if n.Pseudo {
return n, true
return RenderableNodes{n.ID: n}
}
processes := n.NodeMetadata.Counters[processesKey]
@@ -271,7 +340,7 @@ func MapCountProcessName(n RenderableNode) (RenderableNode, bool) {
} else {
n.LabelMinor = fmt.Sprintf("%d processes", processes)
}
return n, true
return RenderableNodes{n.ID: n}
}
// MapContainer2ContainerImage maps container RenderableNodes to container
@@ -285,23 +354,23 @@ func MapCountProcessName(n RenderableNode) (RenderableNode, bool) {
// format for a container, but without any Major or Minor labels.
// It does not have enough info to do that, and the resulting graph
// must be merged with a container graph to get that info.
func MapContainer2ContainerImage(n RenderableNode) (RenderableNode, bool) {
func MapContainer2ContainerImage(n RenderableNode) RenderableNodes {
// Propogate all pseudo nodes
if n.Pseudo {
return n, true
return RenderableNodes{n.ID: n}
}
// Otherwise, if some some reason the container doesn't have a image_id
// (maybe slightly out of sync reports), just drop it
id, ok := n.NodeMetadata.Metadata[docker.ImageID]
if !ok {
return n, false
return RenderableNodes{}
}
// Add container-<id> key to NMD, which will later be counted to produce the minor label
result := newDerivedNode(id, n)
result.NodeMetadata.Counters[containersKey] = 1
return result, true
return RenderableNodes{id: result}
}
// MapContainerImage2Name maps container images RenderableNodes to
@@ -310,14 +379,14 @@ func MapContainer2ContainerImage(n RenderableNode) (RenderableNode, bool) {
// This mapper is unlike the other foo2bar mappers as the intention
// is not to join the information with another topology. Therefore
// it outputs a properly-formed node with labels etc.
func MapContainerImage2Name(n RenderableNode) (RenderableNode, bool) {
func MapContainerImage2Name(n RenderableNode) RenderableNodes {
if n.Pseudo {
return n, true
return RenderableNodes{n.ID: n}
}
name, ok := n.NodeMetadata.Metadata[docker.ImageName]
if !ok {
return RenderableNode{}, false
return RenderableNodes{}
}
parts := strings.SplitN(name, ":", 2)
@@ -329,15 +398,15 @@ func MapContainerImage2Name(n RenderableNode) (RenderableNode, bool) {
node.LabelMajor = name
node.Rank = name
node.NodeMetadata = n.NodeMetadata.Copy() // Propagate NMD for container counting.
return node, true
return RenderableNodes{name: node}
}
// MapCountContainers maps 1:1 container image nodes, counting
// the number of containers grouped together and putting
// that info in the minor label.
func MapCountContainers(n RenderableNode) (RenderableNode, bool) {
func MapCountContainers(n RenderableNode) RenderableNodes {
if n.Pseudo {
return n, true
return RenderableNodes{n.ID: n}
}
containers := n.NodeMetadata.Counters[containersKey]
@@ -346,19 +415,19 @@ func MapCountContainers(n RenderableNode) (RenderableNode, bool) {
} else {
n.LabelMinor = fmt.Sprintf("%d containers", containers)
}
return n, true
return RenderableNodes{n.ID: n}
}
// MapAddress2Host maps address RenderableNodes to host RenderableNodes.
//
// Otherthan pseudo nodes, we can assume all nodes have a HostID
func MapAddress2Host(n RenderableNode) (RenderableNode, bool) {
func MapAddress2Host(n RenderableNode) RenderableNodes {
if n.Pseudo {
return n, true
return RenderableNodes{n.ID: n}
}
id := MakeHostID(report.ExtractHostID(n.NodeMetadata))
return newDerivedNode(id, n), true
return RenderableNodes{id: newDerivedNode(id, n)}
}
// GenericPseudoNode makes a PseudoFunc given an addresser. The returned

View File

@@ -72,7 +72,7 @@ type testcase struct {
}
func testMap(t *testing.T, f render.LeafMapFunc, input testcase) {
if _, have := f(input.md); input.ok != have {
if have := f(input.md); input.ok != (len(have) > 0) {
t.Errorf("%v: want %v, have %v", input.md, input.ok, have)
}
}

View File

@@ -53,26 +53,24 @@ func (m Map) Render(rpt report.Report) RenderableNodes {
return output
}
func (m Map) render(rpt report.Report) (RenderableNodes, map[string]string) {
func (m Map) render(rpt report.Report) (RenderableNodes, map[string]report.IDList) {
input := m.Renderer.Render(rpt)
output := RenderableNodes{}
mapped := map[string]string{} // input node ID -> output node ID
mapped := map[string]report.IDList{} // input node ID -> output node IDs
adjacencies := map[string]report.IDList{} // output node ID -> input node Adjacencies
for _, inRenderable := range input {
outRenderable, ok := m.MapFunc(inRenderable)
if !ok {
continue
}
outRenderables := m.MapFunc(inRenderable)
for _, outRenderable := range outRenderables {
existing, ok := output[outRenderable.ID]
if ok {
outRenderable.Merge(existing)
}
existing, ok := output[outRenderable.ID]
if ok {
outRenderable.Merge(existing)
output[outRenderable.ID] = outRenderable
mapped[inRenderable.ID] = mapped[inRenderable.ID].Add(outRenderable.ID)
adjacencies[outRenderable.ID] = adjacencies[outRenderable.ID].Merge(inRenderable.Adjacency)
}
output[outRenderable.ID] = outRenderable
mapped[inRenderable.ID] = outRenderable.ID
adjacencies[outRenderable.ID] = adjacencies[outRenderable.ID].Merge(inRenderable.Adjacency)
}
// Rewrite Adjacency for new node IDs.
@@ -82,7 +80,7 @@ func (m Map) render(rpt report.Report) (RenderableNodes, map[string]string) {
for outNodeID, inAdjacency := range adjacencies {
outAdjacency := report.MakeIDList()
for _, inAdjacent := range inAdjacency {
if outAdjacent, ok := mapped[inAdjacent]; ok {
for _, outAdjacent := range mapped[inAdjacent] {
outAdjacency = outAdjacency.Add(outAdjacent)
}
}
@@ -102,10 +100,12 @@ func (m Map) EdgeMetadata(rpt report.Report, srcRenderableID, dstRenderableID st
// First we need to map the ids in this layer into the ids in the underlying layer
_, mapped := m.render(rpt) // this maps from old -> new
inverted := map[string][]string{} // this maps from new -> old(s)
for k, v := range mapped {
existing := inverted[v]
existing = append(existing, k)
inverted[v] = existing
for k, vs := range mapped {
for _, v := range vs {
existing := inverted[v]
existing = append(existing, k)
inverted[v] = existing
}
}
// Now work out a slice of edges this edge is constructed from
@@ -148,34 +148,31 @@ func (m LeafMap) Render(rpt report.Report) RenderableNodes {
// Build a set of RenderableNodes for all non-pseudo probes, and an
// addressID to nodeID lookup map. Multiple addressIDs can map to the same
// RenderableNodes.
source2mapped := map[string]string{} // source node ID -> mapped node ID
source2mapped := map[string]report.IDList{} // source node ID -> mapped node IDs
for nodeID, metadata := range t.NodeMetadatas {
mapped, ok := m.Mapper(metadata)
if !ok {
continue
for _, mapped := range m.Mapper(metadata) {
// mapped.ID needs not be unique over all addressIDs. If not, we merge with
// the existing data, on the assumption that the MapFunc returns the same
// data.
existing, ok := nodes[mapped.ID]
if ok {
mapped.Merge(existing)
}
origins := mapped.Origins
origins = origins.Add(nodeID)
origins = origins.Add(metadata.Metadata[report.HostNodeID])
mapped.Origins = origins
nodes[mapped.ID] = mapped
source2mapped[nodeID] = source2mapped[nodeID].Add(mapped.ID)
}
// mapped.ID needs not be unique over all addressIDs. If not, we merge with
// the existing data, on the assumption that the MapFunc returns the same
// data.
existing, ok := nodes[mapped.ID]
if ok {
mapped.Merge(existing)
}
origins := mapped.Origins
origins = origins.Add(nodeID)
origins = origins.Add(metadata.Metadata[report.HostNodeID])
mapped.Origins = origins
nodes[mapped.ID] = mapped
source2mapped[nodeID] = mapped.ID
}
mkPseudoNode := func(srcNodeID, dstNodeID string, srcIsClient bool) (string, bool) {
mkPseudoNode := func(srcNodeID, dstNodeID string, srcIsClient bool) report.IDList {
pseudoNode, ok := m.Pseudo(srcNodeID, dstNodeID, srcIsClient, localNetworks)
if !ok {
return "", false
return report.MakeIDList()
}
pseudoNode.Origins = pseudoNode.Origins.Add(srcNodeID)
existing, ok := nodes[pseudoNode.ID]
@@ -184,8 +181,8 @@ func (m LeafMap) Render(rpt report.Report) RenderableNodes {
}
nodes[pseudoNode.ID] = pseudoNode
source2mapped[pseudoNode.ID] = srcNodeID
return pseudoNode.ID, true
source2mapped[pseudoNode.ID] = source2mapped[pseudoNode.ID].Add(srcNodeID)
return report.MakeIDList(pseudoNode.ID)
}
// Walk the graph and make connections.
@@ -196,51 +193,61 @@ func (m LeafMap) Render(rpt report.Report) RenderableNodes {
continue
}
srcRenderableID, ok := source2mapped[srcNodeID]
srcRenderableIDs, ok := source2mapped[srcNodeID]
if !ok {
// One of the entries in dsts must be a non-pseudo node
var existingDstNodeID string
// One of the entries in dsts must be a non-pseudo node, unless
// it was dropped by the mapping function.
for _, dstNodeID := range dsts {
if _, ok := source2mapped[dstNodeID]; ok {
existingDstNodeID = dstNodeID
srcRenderableIDs = mkPseudoNode(srcNodeID, dstNodeID, true)
break
}
}
srcRenderableID, ok = mkPseudoNode(srcNodeID, existingDstNodeID, true)
if !ok {
continue
}
}
srcRenderableNode := nodes[srcRenderableID]
if len(srcRenderableIDs) == 0 {
continue
}
for _, dstNodeID := range dsts {
dstRenderableID, ok := source2mapped[dstNodeID]
if !ok {
dstRenderableID, ok = mkPseudoNode(dstNodeID, srcNodeID, false)
for _, srcRenderableID := range srcRenderableIDs {
srcRenderableNode := nodes[srcRenderableID]
for _, dstNodeID := range dsts {
dstRenderableIDs, ok := source2mapped[dstNodeID]
if !ok {
dstRenderableIDs = mkPseudoNode(dstNodeID, srcNodeID, false)
}
if len(dstRenderableIDs) == 0 {
continue
}
}
dstRenderableNode := nodes[dstRenderableID]
for _, dstRenderableID := range dstRenderableIDs {
dstRenderableNode := nodes[dstRenderableID]
srcRenderableNode.Adjacency = srcRenderableNode.Adjacency.Add(dstRenderableID)
srcRenderableNode.Adjacency = srcRenderableNode.Adjacency.Add(dstRenderableID)
// We propagate edge metadata to nodes on both ends of the edges.
// TODO we should 'reverse' one end of the edge meta data - ingress -> egress etc.
if md, ok := t.EdgeMetadatas[report.MakeEdgeID(srcNodeID, dstNodeID)]; ok {
srcRenderableNode.EdgeMetadata = srcRenderableNode.EdgeMetadata.Merge(md)
dstRenderableNode.EdgeMetadata = dstRenderableNode.EdgeMetadata.Merge(md)
nodes[dstRenderableID] = dstRenderableNode
// We propagate edge metadata to nodes on both ends of the edges.
// TODO we should 'reverse' one end of the edge meta data - ingress -> egress etc.
if md, ok := t.EdgeMetadatas[report.MakeEdgeID(srcNodeID, dstNodeID)]; ok {
srcRenderableNode.EdgeMetadata = srcRenderableNode.EdgeMetadata.Merge(md)
dstRenderableNode.EdgeMetadata = dstRenderableNode.EdgeMetadata.Merge(md)
nodes[dstRenderableID] = dstRenderableNode
}
}
}
nodes[srcRenderableID] = srcRenderableNode
}
nodes[srcRenderableID] = srcRenderableNode
}
return nodes
}
func ids(nodes RenderableNodes) report.IDList {
result := report.MakeIDList()
for id := range nodes {
result = result.Add(id)
}
return result
}
// EdgeMetadata gives the metadata of an edge from the perspective of the
// srcRenderableID. Since an edgeID can have multiple edges on the address
// level, it uses the supplied mapping function to translate address IDs to
@@ -254,15 +261,16 @@ func (m LeafMap) EdgeMetadata(rpt report.Report, srcRenderableID, dstRenderableI
log.Printf("bad edge ID %q", edgeID)
continue
}
srcs, dsts := report.MakeIDList(src), report.MakeIDList(dst)
if src != report.TheInternet {
mapped, _ := m.Mapper(t.NodeMetadatas[src])
src = mapped.ID
mapped := m.Mapper(t.NodeMetadatas[src])
srcs = ids(mapped)
}
if dst != report.TheInternet {
mapped, _ := m.Mapper(t.NodeMetadatas[dst])
dst = mapped.ID
mapped := m.Mapper(t.NodeMetadatas[dst])
dsts = ids(mapped)
}
if src == srcRenderableID && dst == dstRenderableID {
if srcs.Contains(srcRenderableID) && dsts.Contains(dstRenderableID) {
metadata = metadata.Flatten(edgeMeta)
}
}

View File

@@ -52,8 +52,8 @@ func TestReduceEdge(t *testing.T) {
func TestMapRender1(t *testing.T) {
// 1. Check when we return false, the node gets filtered out
mapper := render.Map{
MapFunc: func(nodes render.RenderableNode) (render.RenderableNode, bool) {
return render.RenderableNode{}, false
MapFunc: func(nodes render.RenderableNode) render.RenderableNodes {
return render.RenderableNodes{}
},
Renderer: mockRenderer{RenderableNodes: render.RenderableNodes{
"foo": {ID: "foo"},
@@ -69,8 +69,8 @@ func TestMapRender1(t *testing.T) {
func TestMapRender2(t *testing.T) {
// 2. Check we can remap two nodes into one
mapper := render.Map{
MapFunc: func(nodes render.RenderableNode) (render.RenderableNode, bool) {
return render.RenderableNode{ID: "bar"}, true
MapFunc: func(nodes render.RenderableNode) render.RenderableNodes {
return render.RenderableNodes{"bar": render.RenderableNode{ID: "bar"}}
},
Renderer: mockRenderer{RenderableNodes: render.RenderableNodes{
"foo": {ID: "foo"},
@@ -89,8 +89,9 @@ func TestMapRender2(t *testing.T) {
func TestMapRender3(t *testing.T) {
// 3. Check we can remap adjacencies
mapper := render.Map{
MapFunc: func(nodes render.RenderableNode) (render.RenderableNode, bool) {
return render.RenderableNode{ID: "_" + nodes.ID}, true
MapFunc: func(nodes render.RenderableNode) render.RenderableNodes {
id := "_" + nodes.ID
return render.RenderableNodes{id: render.RenderableNode{ID: id}}
},
Renderer: mockRenderer{RenderableNodes: render.RenderableNodes{
"foo": {ID: "foo", Adjacency: report.MakeIDList("baz")},
@@ -125,13 +126,14 @@ func TestMapEdge(t *testing.T) {
}
}
identity := func(nmd report.NodeMetadata) (render.RenderableNode, bool) {
return render.NewRenderableNode(nmd.Metadata["id"], "", "", "", nmd), true
identity := func(nmd report.NodeMetadata) render.RenderableNodes {
return render.RenderableNodes{nmd.Metadata["id"]: render.NewRenderableNode(nmd.Metadata["id"], "", "", "", nmd)}
}
mapper := render.Map{
MapFunc: func(n render.RenderableNode) (render.RenderableNode, bool) {
return render.RenderableNode{ID: "_" + n.ID}, true
MapFunc: func(nodes render.RenderableNode) render.RenderableNodes {
id := "_" + nodes.ID
return render.RenderableNodes{id: render.RenderableNode{ID: id}}
},
Renderer: render.LeafMap{
Selector: selector,

View File

@@ -99,11 +99,34 @@ var ContainerRenderer = MakeReduce(
},
},
},
LeafMap{
Selector: report.SelectContainer,
Mapper: MapContainerIdentity,
Pseudo: PanicPseudoNode,
},
// This mapper brings in short lived connections by joining with container IPs.
// We need to be careful to ensure we only include each edge once. Edges brought in
// by the above renders will have a pid, so its enough to filter out any nodes with
// pids.
Map{
MapFunc: MapIP2Container,
Renderer: FilterUnconnected(
MakeReduce(
LeafMap{
Selector: report.SelectContainer,
Mapper: MapContainer2IP,
Pseudo: PanicPseudoNode,
},
LeafMap{
Selector: report.SelectEndpoint,
Mapper: MapEndpoint2IP,
Pseudo: IPPseudoNode,
},
),
),
},
)
// ContainerImageRenderer is a Renderer which produces a renderable container

48
test/exec/exec.go Normal file
View File

@@ -0,0 +1,48 @@
package exec
import (
"bytes"
"io"
"io/ioutil"
"os"
"github.com/weaveworks/scope/common/exec"
)
type mockCmd struct {
io.ReadCloser
}
// NewMockCmdString creates a new mock Cmd which has s on its stdout pipe
func NewMockCmdString(s string) exec.Cmd {
return &mockCmd{
struct {
io.Reader
io.Closer
}{
bytes.NewBufferString(s),
ioutil.NopCloser(nil),
},
}
}
// NewMockCmd creates a new mock Cmd with rc as its stdout pipe
func NewMockCmd(rc io.ReadCloser) exec.Cmd {
return &mockCmd{rc}
}
func (c *mockCmd) Start() error {
return nil
}
func (c *mockCmd) Wait() error {
return nil
}
func (c *mockCmd) StdoutPipe() (io.ReadCloser, error) {
return c.ReadCloser, nil
}
func (c *mockCmd) Process() *os.Process {
return nil
}

View File

@@ -2,6 +2,7 @@ package test
import (
"reflect"
"runtime"
"testing"
"time"
)
@@ -20,6 +21,7 @@ func Poll(t *testing.T, d time.Duration, want interface{}, have func() interface
}
h := have()
if !reflect.DeepEqual(want, h) {
t.Fatal(Diff(want, h))
_, file, line, _ := runtime.Caller(1)
t.Fatalf("%s:%d: %s", file, line, Diff(want, h))
}
}