diff --git a/common/exec/exec.go b/common/exec/exec.go new file mode 100644 index 000000000..00b9e68f0 --- /dev/null +++ b/common/exec/exec.go @@ -0,0 +1,28 @@ +package exec + +import ( + "io" + "os" + "os/exec" +) + +// Cmd is a hook for mocking +type Cmd interface { + StdoutPipe() (io.ReadCloser, error) + Start() error + Wait() error + Process() *os.Process +} + +// Command is a hook for mocking +var Command = func(name string, args ...string) Cmd { + return &realCmd{exec.Command(name, args...)} +} + +type realCmd struct { + *exec.Cmd +} + +func (c *realCmd) Process() *os.Process { + return c.Cmd.Process +} diff --git a/probe/endpoint/conntrack.go b/probe/endpoint/conntrack.go new file mode 100644 index 000000000..deefa4575 --- /dev/null +++ b/probe/endpoint/conntrack.go @@ -0,0 +1,225 @@ +package endpoint + +import ( + "bufio" + "encoding/xml" + "fmt" + "log" + "os" + "strings" + "sync" + + "github.com/weaveworks/scope/common/exec" +) + +// Constants exported for testing +const ( + modules = "/proc/modules" + conntrackModule = "nf_conntrack" + XMLHeader = "\n" + ConntrackOpenTag = "\n" + TimeWait = "TIME_WAIT" + TCP = "tcp" + New = "new" + Update = "update" + Destroy = "destroy" +) + +// Layer3 - these structs are for the parsed conntrack output +type Layer3 struct { + XMLName xml.Name `xml:"layer3"` + SrcIP string `xml:"src"` + DstIP string `xml:"dst"` +} + +// Layer4 - these structs are for the parsed conntrack output +type Layer4 struct { + XMLName xml.Name `xml:"layer4"` + SrcPort int `xml:"sport"` + DstPort int `xml:"dport"` + Proto string `xml:"protoname,attr"` +} + +// Meta - these structs are for the parsed conntrack output +type Meta struct { + XMLName xml.Name `xml:"meta"` + Direction string `xml:"direction,attr"` + Layer3 Layer3 `xml:"layer3"` + Layer4 Layer4 `xml:"layer4"` + ID int64 `xml:"id"` + State string `xml:"state"` +} + +// Flow - these structs are for the parsed conntrack output +type Flow struct { + XMLName xml.Name `xml:"flow"` + Metas []Meta `xml:"meta"` + Type string `xml:"type,attr"` + + Original, Reply, Independent *Meta `xml:"-"` +} + +// Conntracker uses the conntrack command to track network connections +type Conntracker struct { + sync.Mutex + cmd exec.Cmd + activeFlows map[int64]Flow // active flows in state != TIME_WAIT + bufferedFlows []Flow // flows coming out of activeFlows spend 1 walk cycle here +} + +// NewConntracker creates and starts a new Conntracter +func NewConntracker(args ...string) (*Conntracker, error) { + if !ConntrackModulePresent() { + return nil, fmt.Errorf("No conntrack module") + } + result := &Conntracker{ + activeFlows: map[int64]Flow{}, + } + go result.run(args...) + return result, nil +} + +// ConntrackModulePresent returns true if the kernel has the conntrack module +// present. It is made public for mocking. +var ConntrackModulePresent = func() bool { + f, err := os.Open(modules) + if err != nil { + return false + } + defer f.Close() + + scanner := bufio.NewScanner(f) + for scanner.Scan() { + line := scanner.Text() + if strings.HasPrefix(line, conntrackModule) { + return true + } + } + if err := scanner.Err(); err != nil { + log.Printf("conntrack error: %v", err) + } + + log.Printf("conntrack: failed to find module %s", conntrackModule) + return false +} + +// NB this is not re-entrant! +func (c *Conntracker) run(args ...string) { + args = append([]string{"-E", "-o", "xml"}, args...) + cmd := exec.Command("conntrack", args...) + stdout, err := cmd.StdoutPipe() + if err != nil { + log.Printf("conntrack error: %v", err) + return + } + if err := cmd.Start(); err != nil { + log.Printf("conntrack error: %v", err) + return + } + + c.Lock() + c.cmd = cmd + c.Unlock() + defer func() { + if err := cmd.Wait(); err != nil { + log.Printf("conntrack error: %v", err) + } + }() + + // Swallow the first two lines + reader := bufio.NewReader(stdout) + if line, err := reader.ReadString('\n'); err != nil { + log.Printf("conntrack error: %v", err) + return + } else if line != XMLHeader { + log.Printf("conntrack invalid output: '%s'", line) + return + } + if line, err := reader.ReadString('\n'); err != nil { + log.Printf("conntrack error: %v", err) + return + } else if line != ConntrackOpenTag { + log.Printf("conntrack invalid output: '%s'", line) + return + } + + // Now loop on the output stream + decoder := xml.NewDecoder(reader) + for { + var f Flow + if err := decoder.Decode(&f); err != nil { + log.Printf("conntrack error: %v", err) + } + c.handleFlow(f) + } +} + +// Stop stop stop +func (c *Conntracker) Stop() { + c.Lock() + defer c.Unlock() + if c.cmd == nil { + return + } + + if p := c.cmd.Process(); p != nil { + p.Kill() + } +} + +func (c *Conntracker) handleFlow(f Flow) { + // A flow consists of 3 'metas' - the 'original' 4 tuple (as seen by this + // host) and the 'reply' 4 tuple, which is what it has been rewritten to. + // This code finds those metas, which are identified by a Direction + // attribute. + for i := range f.Metas { + meta := &f.Metas[i] + switch meta.Direction { + case "original": + f.Original = meta + case "reply": + f.Reply = meta + case "independent": + f.Independent = meta + } + } + + // For not, I'm only interested in tcp connections - there is too much udp + // traffic going on (every container talking to weave dns, for example) to + // render nicely. TODO: revisit this. + if f.Original.Layer4.Proto != TCP { + return + } + + c.Lock() + defer c.Unlock() + + switch f.Type { + case New, Update: + if f.Independent.State != TimeWait { + c.activeFlows[f.Independent.ID] = f + } else if _, ok := c.activeFlows[f.Independent.ID]; ok { + delete(c.activeFlows, f.Independent.ID) + c.bufferedFlows = append(c.bufferedFlows, f) + } + case Destroy: + if _, ok := c.activeFlows[f.Independent.ID]; ok { + delete(c.activeFlows, f.Independent.ID) + c.bufferedFlows = append(c.bufferedFlows, f) + } + } +} + +// WalkFlows calls f with all active flows and flows that have come and gone +// since the last call to WalkFlows +func (c *Conntracker) WalkFlows(f func(Flow)) { + c.Lock() + defer c.Unlock() + for _, flow := range c.activeFlows { + f(flow) + } + for _, flow := range c.bufferedFlows { + f(flow) + } + c.bufferedFlows = c.bufferedFlows[:0] +} diff --git a/probe/endpoint/conntrack_test.go b/probe/endpoint/conntrack_test.go new file mode 100644 index 000000000..daf96f59d --- /dev/null +++ b/probe/endpoint/conntrack_test.go @@ -0,0 +1,143 @@ +package endpoint_test + +import ( + "bufio" + "encoding/xml" + "io" + "testing" + "time" + + "github.com/weaveworks/scope/common/exec" + . "github.com/weaveworks/scope/probe/endpoint" + "github.com/weaveworks/scope/test" + testExec "github.com/weaveworks/scope/test/exec" +) + +func makeFlow(id int64, srcIP, dstIP string, srcPort, dstPort int, ty, state string) Flow { + return Flow{ + XMLName: xml.Name{ + Local: "flow", + }, + Type: ty, + Metas: []Meta{ + { + XMLName: xml.Name{ + Local: "meta", + }, + Direction: "original", + Layer3: Layer3{ + XMLName: xml.Name{ + Local: "layer3", + }, + SrcIP: srcIP, + DstIP: dstIP, + }, + Layer4: Layer4{ + XMLName: xml.Name{ + Local: "layer4", + }, + SrcPort: srcPort, + DstPort: dstPort, + Proto: TCP, + }, + }, + { + XMLName: xml.Name{ + Local: "meta", + }, + Direction: "independent", + ID: id, + State: state, + Layer3: Layer3{ + XMLName: xml.Name{ + Local: "layer3", + }, + }, + Layer4: Layer4{ + XMLName: xml.Name{ + Local: "layer4", + }, + }, + }, + }, + } +} + +func TestConntracker(t *testing.T) { + oldExecCmd, oldConntrackPresent := exec.Command, ConntrackModulePresent + defer func() { exec.Command, ConntrackModulePresent = oldExecCmd, oldConntrackPresent }() + + ConntrackModulePresent = func() bool { + return true + } + + reader, writer := io.Pipe() + exec.Command = func(name string, args ...string) exec.Cmd { + return testExec.NewMockCmd(reader) + } + + conntracker, err := NewConntracker() + if err != nil { + t.Fatal(err) + } + + bw := bufio.NewWriter(writer) + if _, err := bw.WriteString(XMLHeader); err != nil { + t.Fatal(err) + } + if _, err := bw.WriteString(ConntrackOpenTag); err != nil { + t.Fatal(err) + } + if err := bw.Flush(); err != nil { + t.Fatal(err) + } + + have := func() interface{} { + result := []Flow{} + conntracker.WalkFlows(func(f Flow) { + f.Original = nil + f.Reply = nil + f.Independent = nil + result = append(result, f) + }) + return result + } + ts := 100 * time.Millisecond + + // First, assert we have no flows + test.Poll(t, ts, []Flow{}, have) + + // Now add some flows + xmlEncoder := xml.NewEncoder(bw) + writeFlow := func(f Flow) { + if err := xmlEncoder.Encode(f); err != nil { + t.Fatal(err) + } + if _, err := bw.WriteString("\n"); err != nil { + t.Fatal(err) + } + if err := bw.Flush(); err != nil { + t.Fatal(err) + } + } + + flow1 := makeFlow(1, "1.2.3.4", "2.3.4.5", 2, 3, New, "") + writeFlow(flow1) + test.Poll(t, ts, []Flow{flow1}, have) + + // Now check when we remove the flow, we still get it in the next Walk + flow1.Type = Destroy + writeFlow(flow1) + test.Poll(t, ts, []Flow{flow1}, have) + test.Poll(t, ts, []Flow{}, have) + + // This time we're not going to remove it, but put it in state TIME_WAIT + flow1.Type = New + writeFlow(flow1) + test.Poll(t, ts, []Flow{flow1}, have) + + flow1.Metas[1].State = TimeWait + writeFlow(flow1) + test.Poll(t, ts, []Flow{flow1}, have) + test.Poll(t, ts, []Flow{}, have) +} diff --git a/probe/endpoint/nat.go b/probe/endpoint/nat.go index 2639c88fb..2548ab2d9 100644 --- a/probe/endpoint/nat.go +++ b/probe/endpoint/nat.go @@ -1,54 +1,11 @@ package endpoint import ( - "bufio" - "encoding/xml" - "io" - "log" - "os" - "os/exec" "strconv" - "strings" "github.com/weaveworks/scope/report" ) -const ( - modules = "/proc/modules" - conntrackModule = "nf_conntrack" -) - -// these structs are for the parsed conntrack output -type layer3 struct { - XMLName xml.Name `xml:"layer3"` - SrcIP string `xml:"src"` - DstIP string `xml:"dst"` -} - -type layer4 struct { - XMLName xml.Name `xml:"layer4"` - SrcPort int `xml:"sport"` - DstPort int `xml:"dport"` - Proto string `xml:"protoname,attr"` -} - -type meta struct { - XMLName xml.Name `xml:"meta"` - Direction string `xml:"direction,attr"` - Layer3 layer3 `xml:"layer3"` - Layer4 layer4 `xml:"layer4"` -} - -type flow struct { - XMLName xml.Name `xml:"flow"` - Metas []meta `xml:"meta"` -} - -type conntrack struct { - XMLName xml.Name `xml:"conntrack"` - Flows []flow `xml:"flow"` -} - // This is our 'abstraction' of the endpoint that have been rewritten by NAT. // Original is the private IP that has been rewritten. type endpointMapping struct { @@ -59,111 +16,51 @@ type endpointMapping struct { rewrittenPort int } -// natTable returns a list of endpoints that have been remapped by NAT. -func natTable() ([]endpointMapping, error) { - var conntrack conntrack - cmd := exec.Command("conntrack", "-L", "--any-nat", "-o", "xml") - stdout, err := cmd.StdoutPipe() +type natmapper struct { + *Conntracker +} + +func newNATMapper() (*natmapper, error) { + ct, err := NewConntracker("--any-nat") if err != nil { return nil, err } - if err := cmd.Start(); err != nil { - return nil, err - } - defer func() { - if err := cmd.Wait(); err != nil { - log.Printf("conntrack error: %v", err) + return &natmapper{ct}, nil +} + +func toMapping(f Flow) *endpointMapping { + var mapping endpointMapping + if f.Original.Layer3.SrcIP == f.Reply.Layer3.DstIP { + mapping = endpointMapping{ + originalIP: f.Reply.Layer3.SrcIP, + originalPort: f.Reply.Layer4.SrcPort, + rewrittenIP: f.Original.Layer3.DstIP, + rewrittenPort: f.Original.Layer4.DstPort, } - }() - if err := xml.NewDecoder(stdout).Decode(&conntrack); err != nil { - if err == io.EOF { - return []endpointMapping{}, nil + } else { + mapping = endpointMapping{ + originalIP: f.Original.Layer3.SrcIP, + originalPort: f.Original.Layer4.SrcPort, + rewrittenIP: f.Reply.Layer3.DstIP, + rewrittenPort: f.Reply.Layer4.DstPort, } - return nil, err } - output := []endpointMapping{} - for _, flow := range conntrack.Flows { - // A flow consists of 3 'metas' - the 'original' 4 tuple (as seen by this - // host) and the 'reply' 4 tuple, which is what it has been rewritten to. - // This code finds those metas, which are identified by a Direction - // attribute. - original, reply := meta{}, meta{} - for _, meta := range flow.Metas { - if meta.Direction == "original" { - original = meta - } else if meta.Direction == "reply" { - reply = meta - } - } - - if original.Layer4.Proto != "tcp" { - continue - } - - var conn endpointMapping - if original.Layer3.SrcIP == reply.Layer3.DstIP { - conn = endpointMapping{ - originalIP: reply.Layer3.SrcIP, - originalPort: reply.Layer4.SrcPort, - rewrittenIP: original.Layer3.DstIP, - rewrittenPort: original.Layer4.DstPort, - } - } else { - conn = endpointMapping{ - originalIP: original.Layer3.SrcIP, - originalPort: original.Layer4.SrcPort, - rewrittenIP: reply.Layer3.DstIP, - rewrittenPort: reply.Layer4.DstPort, - } - } - - output = append(output, conn) - } - - return output, nil + return &mapping } // applyNAT duplicates NodeMetadatas in the endpoint topology of a // report, based on the NAT table as returns by natTable. -func applyNAT(rpt report.Report, scope string) error { - mappings, err := natTable() - if err != nil { - return err - } - - for _, mapping := range mappings { +func (n *natmapper) applyNAT(rpt report.Report, scope string) { + n.WalkFlows(func(f Flow) { + mapping := toMapping(f) realEndpointID := report.MakeEndpointNodeID(scope, mapping.originalIP, strconv.Itoa(mapping.originalPort)) copyEndpointID := report.MakeEndpointNodeID(scope, mapping.rewrittenIP, strconv.Itoa(mapping.rewrittenPort)) nmd, ok := rpt.Endpoint.NodeMetadatas[realEndpointID] if !ok { - continue + return } rpt.Endpoint.NodeMetadatas[copyEndpointID] = nmd.Copy() - } - - return nil -} - -func conntrackModulePresent() bool { - f, err := os.Open(modules) - if err != nil { - return false - } - defer f.Close() - - scanner := bufio.NewScanner(f) - for scanner.Scan() { - line := scanner.Text() - if strings.HasPrefix(line, conntrackModule) { - return true - } - } - if err := scanner.Err(); err != nil { - log.Printf("conntrack error: %v", err) - } - - log.Printf("conntrack: failed to find module %s", conntrackModule) - return false + }) } diff --git a/probe/endpoint/reporter.go b/probe/endpoint/reporter.go index 0c7c5bfad..3a758f9ee 100644 --- a/probe/endpoint/reporter.go +++ b/probe/endpoint/reporter.go @@ -1,7 +1,7 @@ package endpoint import ( - "fmt" + "log" "strconv" "time" @@ -24,6 +24,8 @@ type Reporter struct { hostName string includeProcesses bool includeNAT bool + conntracker *Conntracker + natmapper *natmapper } // SpyDuration is an exported prometheus metric @@ -43,12 +45,41 @@ var SpyDuration = prometheus.NewSummaryVec( // on the host machine, at the granularity of host and port. That information // is stored in the Endpoint topology. It optionally enriches that topology // with process (PID) information. -func NewReporter(hostID, hostName string, includeProcesses bool) *Reporter { +func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bool) *Reporter { + var ( + conntrackModulePresent = ConntrackModulePresent() + conntracker *Conntracker + natmapper *natmapper + err error + ) + if conntrackModulePresent && useConntrack { + conntracker, err = NewConntracker() + if err != nil { + log.Printf("Failed to start conntracker: %v", err) + } + } + if conntrackModulePresent { + natmapper, err = newNATMapper() + if err != nil { + log.Printf("Failed to start natMapper: %v", err) + } + } return &Reporter{ hostID: hostID, hostName: hostName, includeProcesses: includeProcesses, - includeNAT: conntrackModulePresent(), + conntracker: conntracker, + natmapper: natmapper, + } +} + +// Stop stop stop +func (r *Reporter) Stop() { + if r.conntracker != nil { + r.conntracker.Stop() + } + if r.natmapper != nil { + r.natmapper.Stop() } } @@ -65,50 +96,73 @@ func (r *Reporter) Report() (report.Report, error) { } for conn := conns.Next(); conn != nil; conn = conns.Next() { - r.addConnection(&rpt, conn) + var ( + localPort = conn.LocalPort + remotePort = conn.RemotePort + localAddr = conn.LocalAddress.String() + remoteAddr = conn.RemoteAddress.String() + ) + r.addConnection(&rpt, localAddr, remoteAddr, localPort, remotePort, &conn.Proc) } - if r.includeNAT { - err = applyNAT(rpt, r.hostID) + if r.conntracker != nil { + r.conntracker.WalkFlows(func(f Flow) { + var ( + localPort = f.Original.Layer4.SrcPort + remotePort = f.Original.Layer4.DstPort + localAddr = f.Original.Layer3.SrcIP + remoteAddr = f.Original.Layer3.DstIP + ) + r.addConnection(&rpt, localAddr, remoteAddr, uint16(localPort), uint16(remotePort), nil) + }) + } + + if r.natmapper != nil { + r.natmapper.applyNAT(rpt, r.hostID) } return rpt, err } -func (r *Reporter) addConnection(rpt *report.Report, c *procspy.Connection) { - var ( - localIsClient = int(c.LocalPort) > int(c.RemotePort) - localAddressNodeID = report.MakeAddressNodeID(r.hostID, c.LocalAddress.String()) - remoteAddressNodeID = report.MakeAddressNodeID(r.hostID, c.RemoteAddress.String()) - adjacencyID = "" - edgeID = "" - ) +func (r *Reporter) addConnection(rpt *report.Report, localAddr, remoteAddr string, localPort, remotePort uint16, proc *procspy.Proc) { + localIsClient := int(localPort) > int(remotePort) - if localIsClient { - adjacencyID = report.MakeAdjacencyID(localAddressNodeID) - rpt.Address.Adjacency[adjacencyID] = rpt.Address.Adjacency[adjacencyID].Add(remoteAddressNodeID) - - edgeID = report.MakeEdgeID(localAddressNodeID, remoteAddressNodeID) - } else { - adjacencyID = report.MakeAdjacencyID(remoteAddressNodeID) - rpt.Address.Adjacency[adjacencyID] = rpt.Address.Adjacency[adjacencyID].Add(localAddressNodeID) - - edgeID = report.MakeEdgeID(remoteAddressNodeID, localAddressNodeID) - } - - if _, ok := rpt.Address.NodeMetadatas[localAddressNodeID]; !ok { - rpt.Address.NodeMetadatas[localAddressNodeID] = report.MakeNodeMetadataWith(map[string]string{ - "name": r.hostName, - Addr: c.LocalAddress.String(), - }) - } - - countTCPConnection(rpt.Address.EdgeMetadatas, edgeID) - - if c.Proc.PID > 0 { + // Update address topology + { var ( - localEndpointNodeID = report.MakeEndpointNodeID(r.hostID, c.LocalAddress.String(), strconv.Itoa(int(c.LocalPort))) - remoteEndpointNodeID = report.MakeEndpointNodeID(r.hostID, c.RemoteAddress.String(), strconv.Itoa(int(c.RemotePort))) + localAddressNodeID = report.MakeAddressNodeID(r.hostID, localAddr) + remoteAddressNodeID = report.MakeAddressNodeID(r.hostID, remoteAddr) + adjacencyID = "" + edgeID = "" + ) + + if localIsClient { + adjacencyID = report.MakeAdjacencyID(localAddressNodeID) + rpt.Address.Adjacency[adjacencyID] = rpt.Address.Adjacency[adjacencyID].Add(remoteAddressNodeID) + + edgeID = report.MakeEdgeID(localAddressNodeID, remoteAddressNodeID) + } else { + adjacencyID = report.MakeAdjacencyID(remoteAddressNodeID) + rpt.Address.Adjacency[adjacencyID] = rpt.Address.Adjacency[adjacencyID].Add(localAddressNodeID) + + edgeID = report.MakeEdgeID(remoteAddressNodeID, localAddressNodeID) + } + + countTCPConnection(rpt.Address.EdgeMetadatas, edgeID) + + if _, ok := rpt.Address.NodeMetadatas[localAddressNodeID]; !ok { + rpt.Address.NodeMetadatas[localAddressNodeID] = report.MakeNodeMetadataWith(map[string]string{ + "name": r.hostName, + Addr: localAddr, + }) + } + } + + // Update endpoint topology + if r.includeProcesses { + var ( + localEndpointNodeID = report.MakeEndpointNodeID(r.hostID, localAddr, strconv.Itoa(int(localPort))) + remoteEndpointNodeID = report.MakeEndpointNodeID(r.hostID, remoteAddr, strconv.Itoa(int(remotePort))) adjacencyID = "" edgeID = "" ) @@ -125,18 +179,24 @@ func (r *Reporter) addConnection(rpt *report.Report, c *procspy.Connection) { edgeID = report.MakeEdgeID(remoteEndpointNodeID, localEndpointNodeID) } - if _, ok := rpt.Endpoint.NodeMetadatas[localEndpointNodeID]; !ok { - // First hit establishes NodeMetadata for scoped local address + port - md := report.MakeNodeMetadataWith(map[string]string{ - Addr: c.LocalAddress.String(), - Port: strconv.Itoa(int(c.LocalPort)), - process.PID: fmt.Sprint(c.Proc.PID), - }) + countTCPConnection(rpt.Endpoint.EdgeMetadatas, edgeID) + md, ok := rpt.Endpoint.NodeMetadatas[localEndpointNodeID] + updated := !ok + if !ok { + md = report.MakeNodeMetadataWith(map[string]string{ + Addr: localAddr, + Port: strconv.Itoa(int(localPort)), + }) + } + if proc != nil && proc.PID > 0 { + pid := strconv.FormatUint(uint64(proc.PID), 10) + updated = updated || md.Metadata[process.PID] != pid + md.Metadata[process.PID] = pid + } + if updated { rpt.Endpoint.NodeMetadatas[localEndpointNodeID] = md } - - countTCPConnection(rpt.Endpoint.EdgeMetadatas, edgeID) } } diff --git a/probe/endpoint/reporter_test.go b/probe/endpoint/reporter_test.go index 455d18ada..0258c1636 100644 --- a/probe/endpoint/reporter_test.go +++ b/probe/endpoint/reporter_test.go @@ -19,7 +19,6 @@ var ( fixRemotePortB = uint16(12346) fixProcessPID = uint(4242) fixProcessName = "nginx" - fixProcessPIDB = uint(4243) fixConnections = []procspy.Connection{ { @@ -55,9 +54,9 @@ var ( LocalAddress: fixLocalAddress, LocalPort: fixLocalPort, RemoteAddress: fixRemoteAddress, - RemotePort: fixRemotePort, + RemotePort: fixRemotePortB, Proc: procspy.Proc{ - PID: fixProcessPIDB, + PID: fixProcessPID, Name: fixProcessName, }, }, @@ -72,7 +71,7 @@ func TestSpyNoProcesses(t *testing.T) { nodeName = "frenchs-since-1904" // TODO rename to hostNmae ) - reporter := endpoint.NewReporter(nodeID, nodeName, false) + reporter := endpoint.NewReporter(nodeID, nodeName, false, false) r, _ := reporter.Report() //buf, _ := json.MarshalIndent(r, "", " ") //t.Logf("\n%s\n", buf) @@ -109,7 +108,7 @@ func TestSpyWithProcesses(t *testing.T) { nodeName = "fishermans-friend" // TODO rename to hostNmae ) - reporter := endpoint.NewReporter(nodeID, nodeName, false) + reporter := endpoint.NewReporter(nodeID, nodeName, true, false) r, _ := reporter.Report() // buf, _ := json.MarshalIndent(r, "", " ") ; t.Logf("\n%s\n", buf) diff --git a/probe/main.go b/probe/main.go index 02274459f..db0c24971 100644 --- a/probe/main.go +++ b/probe/main.go @@ -46,6 +46,7 @@ func main() { captureOn = flag.Duration("capture.on", 1*time.Second, "packet capture duty cycle 'on'") captureOff = flag.Duration("capture.off", 5*time.Second, "packet capture duty cycle 'off'") printVersion = flag.Bool("version", false, "print version number and exit") + useConntrack = flag.Bool("conntrack", true, "also use conntrack to track connections") ) flag.Parse() @@ -103,13 +104,16 @@ func main() { } var ( - taggers = []Tagger{newTopologyTagger(), host.NewTagger(hostID)} - reporters = []Reporter{host.NewReporter(hostID, hostName, localNets), endpoint.NewReporter(hostID, hostName, *spyProcs)} - processCache *process.CachingWalker + endpointReporter = endpoint.NewReporter(hostID, hostName, *spyProcs, *useConntrack) + processCache = process.NewCachingWalker(process.NewWalker(*procRoot)) + reporters = []Reporter{ + endpointReporter, + host.NewReporter(hostID, hostName, localNets), + process.NewReporter(processCache, hostID), + } + taggers = []Tagger{newTopologyTagger(), host.NewTagger(hostID)} ) - - processCache = process.NewCachingWalker(process.NewWalker(*procRoot)) - reporters = append(reporters, process.NewReporter(processCache, hostID)) + defer endpointReporter.Stop() if *dockerEnabled { if err := report.AddLocalBridge(*dockerBridge); err != nil { diff --git a/probe/overlay/weave.go b/probe/overlay/weave.go index 38c580853..3b15ddc09 100644 --- a/probe/overlay/weave.go +++ b/probe/overlay/weave.go @@ -4,15 +4,14 @@ import ( "bufio" "encoding/json" "fmt" - "io" "log" "net" "net/http" "net/url" - "os/exec" "regexp" "strings" + "github.com/weaveworks/scope/common/exec" "github.com/weaveworks/scope/probe/docker" "github.com/weaveworks/scope/report" ) @@ -36,16 +35,6 @@ const ( var weavePsMatch = regexp.MustCompile(`^([0-9a-f]{12}) ((?:[0-9a-f][0-9a-f]\:){5}(?:[0-9a-f][0-9a-f]))(.*)$`) var ipMatch = regexp.MustCompile(`([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})(/[0-9]+)`) -// Cmd is a hook for mocking -type Cmd interface { - StdoutPipe() (io.ReadCloser, error) - Start() error - Wait() error -} - -// ExecCommand is a hook for mocking -var ExecCommand = func(name string, args ...string) Cmd { return exec.Command(name, args...) } - // Weave represents a single Weave router, presumably on the same host // as the probe. It is both a Reporter and a Tagger: it produces an Overlay // topology, and (in theory) can tag existing topologies with foreign keys to @@ -114,7 +103,7 @@ type psEntry struct { func (w Weave) ps() ([]psEntry, error) { var result []psEntry - cmd := ExecCommand("weave", "--local", "ps") + cmd := exec.Command("weave", "--local", "ps") out, err := cmd.StdoutPipe() if err != nil { return result, err diff --git a/probe/overlay/weave_test.go b/probe/overlay/weave_test.go index 679c45095..7ea89ee76 100644 --- a/probe/overlay/weave_test.go +++ b/probe/overlay/weave_test.go @@ -1,50 +1,25 @@ package overlay_test import ( - "bytes" "fmt" - "io" - "io/ioutil" "net/http" "net/http/httptest" "reflect" "testing" + "github.com/weaveworks/scope/common/exec" "github.com/weaveworks/scope/probe/docker" "github.com/weaveworks/scope/probe/overlay" "github.com/weaveworks/scope/report" "github.com/weaveworks/scope/test" + testExec "github.com/weaveworks/scope/test/exec" ) -type mockCmd struct { - *bytes.Buffer -} - -func (c *mockCmd) Start() error { - return nil -} - -func (c *mockCmd) Wait() error { - return nil -} - -func (c *mockCmd) StdoutPipe() (io.ReadCloser, error) { - return struct { - io.Reader - io.Closer - }{ - c.Buffer, - ioutil.NopCloser(nil), - }, nil -} - func TestWeaveTaggerOverlayTopology(t *testing.T) { - oldExecCmd := overlay.ExecCommand - defer func() { overlay.ExecCommand = oldExecCmd }() - overlay.ExecCommand = func(name string, args ...string) overlay.Cmd { - return &mockCmd{ - bytes.NewBufferString(fmt.Sprintf("%s %s %s/24\n", mockContainerID, mockContainerMAC, mockContainerIP)), - } + oldExecCmd := exec.Command + defer func() { exec.Command = oldExecCmd }() + exec.Command = func(name string, args ...string) exec.Cmd { + return testExec.NewMockCmdString(fmt.Sprintf("%s %s %s/24\n", mockContainerID, mockContainerMAC, mockContainerIP)) } s := httptest.NewServer(http.HandlerFunc(mockWeaveRouter)) diff --git a/render/mapping.go b/render/mapping.go index 0c8030687..691817f38 100644 --- a/render/mapping.go +++ b/render/mapping.go @@ -25,16 +25,13 @@ const ( ) // LeafMapFunc is anything which can take an arbitrary NodeMetadata, which is -// always one-to-one with nodes in a topology, and return a specific -// representation of the referenced node, in the form of a node ID and a -// human-readable major and minor labels. +// always one-to-one with nodes in a topology, and return a set of RenderableNodes +// - specific representations of the referenced node, in the form of a map of node +// ID to a human-readable major and minor labels. // // A single NodeMetadata can yield arbitrary many representations, including -// representations that reduce the cardinality of the set of nodes. -// -// If the final output parameter is false, the node shall be omitted from the -// rendered topology. -type LeafMapFunc func(report.NodeMetadata) (RenderableNode, bool) +// representations that reduce (or even increase) the cardinality of the set of nodes. +type LeafMapFunc func(report.NodeMetadata) RenderableNodes // PseudoFunc creates RenderableNode representing pseudo nodes given the // srcNodeID. dstNodeID is the node id of one of the nodes this node has an @@ -44,24 +41,24 @@ type LeafMapFunc func(report.NodeMetadata) (RenderableNode, bool) type PseudoFunc func(srcNodeID, dstNodeID string, srcIsClient bool, local report.Networks) (RenderableNode, bool) // MapFunc is anything which can take an arbitrary RenderableNode and -// return another RenderableNode. +// return a set of other RenderableNodes. // -// As with LeafMapFunc, if the final output parameter is false, the node +// As with LeafMapFunc, if the output is empty, the node // shall be omitted from the rendered topology. -type MapFunc func(RenderableNode) (RenderableNode, bool) +type MapFunc func(RenderableNode) RenderableNodes -// MapEndpointIdentity maps an endpoint topology node to an endpoint +// MapEndpointIdentity maps an endpoint topology node to a single endpoint // renderable node. As it is only ever run on endpoint topology nodes, we // expect that certain keys are present. -func MapEndpointIdentity(m report.NodeMetadata) (RenderableNode, bool) { +func MapEndpointIdentity(m report.NodeMetadata) RenderableNodes { addr, ok := m.Metadata[endpoint.Addr] if !ok { - return RenderableNode{}, false + return RenderableNodes{} } port, ok := m.Metadata[endpoint.Port] if !ok { - return RenderableNode{}, false + return RenderableNodes{} } var ( @@ -75,16 +72,16 @@ func MapEndpointIdentity(m report.NodeMetadata) (RenderableNode, bool) { minor = fmt.Sprintf("%s (%s)", minor, pid) } - return NewRenderableNode(id, major, minor, rank, m), true + return RenderableNodes{id: NewRenderableNode(id, major, minor, rank, m)} } // MapProcessIdentity maps a process topology node to a process renderable // node. As it is only ever run on process topology nodes, we expect that // certain keys are present. -func MapProcessIdentity(m report.NodeMetadata) (RenderableNode, bool) { +func MapProcessIdentity(m report.NodeMetadata) RenderableNodes { pid, ok := m.Metadata[process.PID] if !ok { - return RenderableNode{}, false + return RenderableNodes{} } var ( @@ -94,16 +91,16 @@ func MapProcessIdentity(m report.NodeMetadata) (RenderableNode, bool) { rank = m.Metadata["comm"] ) - return NewRenderableNode(id, major, minor, rank, m), true + return RenderableNodes{id: NewRenderableNode(id, major, minor, rank, m)} } // MapContainerIdentity maps a container topology node to a container // renderable node. As it is only ever run on container topology nodes, we // expect that certain keys are present. -func MapContainerIdentity(m report.NodeMetadata) (RenderableNode, bool) { +func MapContainerIdentity(m report.NodeMetadata) RenderableNodes { id, ok := m.Metadata[docker.ContainerID] if !ok { - return RenderableNode{}, false + return RenderableNodes{} } var ( @@ -112,16 +109,16 @@ func MapContainerIdentity(m report.NodeMetadata) (RenderableNode, bool) { rank = m.Metadata[docker.ImageID] ) - return NewRenderableNode(id, major, minor, rank, m), true + return RenderableNodes{id: NewRenderableNode(id, major, minor, rank, m)} } // MapContainerImageIdentity maps a container image topology node to container // image renderable node. As it is only ever run on container image topology // nodes, we expect that certain keys are present. -func MapContainerImageIdentity(m report.NodeMetadata) (RenderableNode, bool) { +func MapContainerImageIdentity(m report.NodeMetadata) RenderableNodes { id, ok := m.Metadata[docker.ImageID] if !ok { - return RenderableNode{}, false + return RenderableNodes{} } var ( @@ -129,16 +126,16 @@ func MapContainerImageIdentity(m report.NodeMetadata) (RenderableNode, bool) { rank = m.Metadata[docker.ImageID] ) - return NewRenderableNode(id, major, "", rank, m), true + return RenderableNodes{id: NewRenderableNode(id, major, "", rank, m)} } // MapAddressIdentity maps an address topology node to an address renderable // node. As it is only ever run on address topology nodes, we expect that // certain keys are present. -func MapAddressIdentity(m report.NodeMetadata) (RenderableNode, bool) { +func MapAddressIdentity(m report.NodeMetadata) RenderableNodes { addr, ok := m.Metadata[endpoint.Addr] if !ok { - return RenderableNode{}, false + return RenderableNodes{} } var ( @@ -148,13 +145,13 @@ func MapAddressIdentity(m report.NodeMetadata) (RenderableNode, bool) { rank = major ) - return NewRenderableNode(id, major, minor, rank, m), true + return RenderableNodes{id: NewRenderableNode(id, major, minor, rank, m)} } // MapHostIdentity maps a host topology node to a host renderable node. As it // is only ever run on host topology nodes, we expect that certain keys are // present. -func MapHostIdentity(m report.NodeMetadata) (RenderableNode, bool) { +func MapHostIdentity(m report.NodeMetadata) RenderableNodes { var ( id = MakeHostID(report.ExtractHostID(m)) hostname = m.Metadata[host.HostName] @@ -168,7 +165,79 @@ func MapHostIdentity(m report.NodeMetadata) (RenderableNode, bool) { major = hostname } - return NewRenderableNode(id, major, minor, rank, m), true + return RenderableNodes{id: NewRenderableNode(id, major, minor, rank, m)} +} + +// MapEndpoint2IP maps endpoint nodes to their IP address, for joining +// with container nodes. We drop endpoint nodes with pids, as they +// will be joined to containers through the process topology, and we +// don't want to double count edges. +func MapEndpoint2IP(m report.NodeMetadata) RenderableNodes { + _, ok := m.Metadata[process.PID] + if ok { + return RenderableNodes{} + } + addr, ok := m.Metadata[endpoint.Addr] + if !ok { + return RenderableNodes{} + } + return RenderableNodes{addr: NewRenderableNode(addr, "", "", "", m)} +} + +// IPPseudoNode maps endpoint pseudo nodes to regular IP address nodes, or +// the internet node. +func IPPseudoNode(srcNodeID, _ string, _ bool, local report.Networks) (RenderableNode, bool) { + // Use the addresser to extract the IP of the missing node + srcNodeAddr := report.EndpointIDAddresser(srcNodeID) + // If the dstNodeAddr is not in a network local to this report, we emit an + // internet node + if !local.Contains(srcNodeAddr) { + return newPseudoNode(TheInternetID, TheInternetMajor, ""), true + } + return NewRenderableNode(srcNodeAddr.String(), "", "", "", report.MakeNodeMetadata()), true +} + +// MapContainer2IP maps container nodes to their IP addresses (outputs +// multiple nodes). This allows container to be joined directly with +// the endpoint topology. +func MapContainer2IP(m report.NodeMetadata) RenderableNodes { + result := RenderableNodes{} + addrs, ok := m.Metadata[docker.ContainerIPs] + if !ok { + return result + } + for _, addr := range strings.Fields(addrs) { + n := NewRenderableNode(addr, "", "", "", m) + n.NodeMetadata.Counters[containersKey] = 1 + result[addr] = n + } + return result +} + +// MapIP2Container maps IP nodes produced from MapContainer2IP back to +// container nodes. If there is more than one container with a given +// IP, it is dropped. +func MapIP2Container(n RenderableNode) RenderableNodes { + // If an IP is shared between multiple containers, we can't + // reliably attribute an connection based on its IP + if n.NodeMetadata.Counters[containersKey] > 1 { + return RenderableNodes{} + } + + // Propogate the internet pseudo node. + if n.ID == TheInternetID { + return RenderableNodes{n.ID: n} + } + + // If this node is not a container, exclude it. + // This excludes all the nodes we've dragged in from endpoint + // that we failed to join to a container. + id, ok := n.NodeMetadata.Metadata[docker.ContainerID] + if !ok { + return RenderableNodes{} + } + + return RenderableNodes{id: newDerivedNode(id, n)} } // MapEndpoint2Process maps endpoint RenderableNodes to process @@ -182,18 +251,18 @@ func MapHostIdentity(m report.NodeMetadata) (RenderableNode, bool) { // format for a process, but without any Major or Minor labels. // It does not have enough info to do that, and the resulting graph // must be merged with a process graph to get that info. -func MapEndpoint2Process(n RenderableNode) (RenderableNode, bool) { +func MapEndpoint2Process(n RenderableNode) RenderableNodes { if n.Pseudo { - return n, true + return RenderableNodes{n.ID: n} } pid, ok := n.NodeMetadata.Metadata[process.PID] if !ok { - return RenderableNode{}, false + return RenderableNodes{} } id := MakeProcessID(report.ExtractHostID(n.NodeMetadata), pid) - return newDerivedNode(id, n), true + return RenderableNodes{id: newDerivedNode(id, n)} } // MapProcess2Container maps process RenderableNodes to container @@ -207,15 +276,15 @@ func MapEndpoint2Process(n RenderableNode) (RenderableNode, bool) { // format for a container, but without any Major or Minor labels. // It does not have enough info to do that, and the resulting graph // must be merged with a container graph to get that info. -func MapProcess2Container(n RenderableNode) (RenderableNode, bool) { +func MapProcess2Container(n RenderableNode) RenderableNodes { // Propogate the internet pseudo node if n.ID == TheInternetID { - return n, true + return RenderableNodes{n.ID: n} } // Don't propogate non-internet pseudo nodes if n.Pseudo { - return n, false + return RenderableNodes{} } // Otherwise, if the process is not in a container, group it @@ -228,10 +297,10 @@ func MapProcess2Container(n RenderableNode) (RenderableNode, bool) { id = MakePseudoNodeID(UncontainedID, hostID) node := newDerivedPseudoNode(id, UncontainedMajor, n) node.LabelMinor = hostID - return node, true + return RenderableNodes{id: node} } - return newDerivedNode(id, n), true + return RenderableNodes{id: newDerivedNode(id, n)} } // MapProcess2Name maps process RenderableNodes to RenderableNodes @@ -240,29 +309,29 @@ func MapProcess2Container(n RenderableNode) (RenderableNode, bool) { // This mapper is unlike the other foo2bar mappers as the intention // is not to join the information with another topology. Therefore // it outputs a properly-formed node with labels etc. -func MapProcess2Name(n RenderableNode) (RenderableNode, bool) { +func MapProcess2Name(n RenderableNode) RenderableNodes { if n.Pseudo { - return n, true + return RenderableNodes{n.ID: n} } name, ok := n.NodeMetadata.Metadata["comm"] if !ok { - return RenderableNode{}, false + return RenderableNodes{} } node := newDerivedNode(name, n) node.LabelMajor = name node.Rank = name node.NodeMetadata.Counters[processesKey] = 1 - return node, true + return RenderableNodes{name: node} } // MapCountProcessName maps 1:1 process name nodes, counting // the number of processes grouped together and putting // that info in the minor label. -func MapCountProcessName(n RenderableNode) (RenderableNode, bool) { +func MapCountProcessName(n RenderableNode) RenderableNodes { if n.Pseudo { - return n, true + return RenderableNodes{n.ID: n} } processes := n.NodeMetadata.Counters[processesKey] @@ -271,7 +340,7 @@ func MapCountProcessName(n RenderableNode) (RenderableNode, bool) { } else { n.LabelMinor = fmt.Sprintf("%d processes", processes) } - return n, true + return RenderableNodes{n.ID: n} } // MapContainer2ContainerImage maps container RenderableNodes to container @@ -285,23 +354,23 @@ func MapCountProcessName(n RenderableNode) (RenderableNode, bool) { // format for a container, but without any Major or Minor labels. // It does not have enough info to do that, and the resulting graph // must be merged with a container graph to get that info. -func MapContainer2ContainerImage(n RenderableNode) (RenderableNode, bool) { +func MapContainer2ContainerImage(n RenderableNode) RenderableNodes { // Propogate all pseudo nodes if n.Pseudo { - return n, true + return RenderableNodes{n.ID: n} } // Otherwise, if some some reason the container doesn't have a image_id // (maybe slightly out of sync reports), just drop it id, ok := n.NodeMetadata.Metadata[docker.ImageID] if !ok { - return n, false + return RenderableNodes{} } // Add container- key to NMD, which will later be counted to produce the minor label result := newDerivedNode(id, n) result.NodeMetadata.Counters[containersKey] = 1 - return result, true + return RenderableNodes{id: result} } // MapContainerImage2Name maps container images RenderableNodes to @@ -310,14 +379,14 @@ func MapContainer2ContainerImage(n RenderableNode) (RenderableNode, bool) { // This mapper is unlike the other foo2bar mappers as the intention // is not to join the information with another topology. Therefore // it outputs a properly-formed node with labels etc. -func MapContainerImage2Name(n RenderableNode) (RenderableNode, bool) { +func MapContainerImage2Name(n RenderableNode) RenderableNodes { if n.Pseudo { - return n, true + return RenderableNodes{n.ID: n} } name, ok := n.NodeMetadata.Metadata[docker.ImageName] if !ok { - return RenderableNode{}, false + return RenderableNodes{} } parts := strings.SplitN(name, ":", 2) @@ -329,15 +398,15 @@ func MapContainerImage2Name(n RenderableNode) (RenderableNode, bool) { node.LabelMajor = name node.Rank = name node.NodeMetadata = n.NodeMetadata.Copy() // Propagate NMD for container counting. - return node, true + return RenderableNodes{name: node} } // MapCountContainers maps 1:1 container image nodes, counting // the number of containers grouped together and putting // that info in the minor label. -func MapCountContainers(n RenderableNode) (RenderableNode, bool) { +func MapCountContainers(n RenderableNode) RenderableNodes { if n.Pseudo { - return n, true + return RenderableNodes{n.ID: n} } containers := n.NodeMetadata.Counters[containersKey] @@ -346,19 +415,19 @@ func MapCountContainers(n RenderableNode) (RenderableNode, bool) { } else { n.LabelMinor = fmt.Sprintf("%d containers", containers) } - return n, true + return RenderableNodes{n.ID: n} } // MapAddress2Host maps address RenderableNodes to host RenderableNodes. // // Otherthan pseudo nodes, we can assume all nodes have a HostID -func MapAddress2Host(n RenderableNode) (RenderableNode, bool) { +func MapAddress2Host(n RenderableNode) RenderableNodes { if n.Pseudo { - return n, true + return RenderableNodes{n.ID: n} } id := MakeHostID(report.ExtractHostID(n.NodeMetadata)) - return newDerivedNode(id, n), true + return RenderableNodes{id: newDerivedNode(id, n)} } // GenericPseudoNode makes a PseudoFunc given an addresser. The returned diff --git a/render/mapping_test.go b/render/mapping_test.go index 6efbf98d8..35f746ebe 100644 --- a/render/mapping_test.go +++ b/render/mapping_test.go @@ -72,7 +72,7 @@ type testcase struct { } func testMap(t *testing.T, f render.LeafMapFunc, input testcase) { - if _, have := f(input.md); input.ok != have { + if have := f(input.md); input.ok != (len(have) > 0) { t.Errorf("%v: want %v, have %v", input.md, input.ok, have) } } diff --git a/render/render.go b/render/render.go index bc551ef52..d1db1ed9c 100644 --- a/render/render.go +++ b/render/render.go @@ -53,26 +53,24 @@ func (m Map) Render(rpt report.Report) RenderableNodes { return output } -func (m Map) render(rpt report.Report) (RenderableNodes, map[string]string) { +func (m Map) render(rpt report.Report) (RenderableNodes, map[string]report.IDList) { input := m.Renderer.Render(rpt) output := RenderableNodes{} - mapped := map[string]string{} // input node ID -> output node ID + mapped := map[string]report.IDList{} // input node ID -> output node IDs adjacencies := map[string]report.IDList{} // output node ID -> input node Adjacencies for _, inRenderable := range input { - outRenderable, ok := m.MapFunc(inRenderable) - if !ok { - continue - } + outRenderables := m.MapFunc(inRenderable) + for _, outRenderable := range outRenderables { + existing, ok := output[outRenderable.ID] + if ok { + outRenderable.Merge(existing) + } - existing, ok := output[outRenderable.ID] - if ok { - outRenderable.Merge(existing) + output[outRenderable.ID] = outRenderable + mapped[inRenderable.ID] = mapped[inRenderable.ID].Add(outRenderable.ID) + adjacencies[outRenderable.ID] = adjacencies[outRenderable.ID].Merge(inRenderable.Adjacency) } - - output[outRenderable.ID] = outRenderable - mapped[inRenderable.ID] = outRenderable.ID - adjacencies[outRenderable.ID] = adjacencies[outRenderable.ID].Merge(inRenderable.Adjacency) } // Rewrite Adjacency for new node IDs. @@ -82,7 +80,7 @@ func (m Map) render(rpt report.Report) (RenderableNodes, map[string]string) { for outNodeID, inAdjacency := range adjacencies { outAdjacency := report.MakeIDList() for _, inAdjacent := range inAdjacency { - if outAdjacent, ok := mapped[inAdjacent]; ok { + for _, outAdjacent := range mapped[inAdjacent] { outAdjacency = outAdjacency.Add(outAdjacent) } } @@ -102,10 +100,12 @@ func (m Map) EdgeMetadata(rpt report.Report, srcRenderableID, dstRenderableID st // First we need to map the ids in this layer into the ids in the underlying layer _, mapped := m.render(rpt) // this maps from old -> new inverted := map[string][]string{} // this maps from new -> old(s) - for k, v := range mapped { - existing := inverted[v] - existing = append(existing, k) - inverted[v] = existing + for k, vs := range mapped { + for _, v := range vs { + existing := inverted[v] + existing = append(existing, k) + inverted[v] = existing + } } // Now work out a slice of edges this edge is constructed from @@ -148,34 +148,31 @@ func (m LeafMap) Render(rpt report.Report) RenderableNodes { // Build a set of RenderableNodes for all non-pseudo probes, and an // addressID to nodeID lookup map. Multiple addressIDs can map to the same // RenderableNodes. - source2mapped := map[string]string{} // source node ID -> mapped node ID + source2mapped := map[string]report.IDList{} // source node ID -> mapped node IDs for nodeID, metadata := range t.NodeMetadatas { - mapped, ok := m.Mapper(metadata) - if !ok { - continue + for _, mapped := range m.Mapper(metadata) { + // mapped.ID needs not be unique over all addressIDs. If not, we merge with + // the existing data, on the assumption that the MapFunc returns the same + // data. + existing, ok := nodes[mapped.ID] + if ok { + mapped.Merge(existing) + } + + origins := mapped.Origins + origins = origins.Add(nodeID) + origins = origins.Add(metadata.Metadata[report.HostNodeID]) + mapped.Origins = origins + + nodes[mapped.ID] = mapped + source2mapped[nodeID] = source2mapped[nodeID].Add(mapped.ID) } - - // mapped.ID needs not be unique over all addressIDs. If not, we merge with - // the existing data, on the assumption that the MapFunc returns the same - // data. - existing, ok := nodes[mapped.ID] - if ok { - mapped.Merge(existing) - } - - origins := mapped.Origins - origins = origins.Add(nodeID) - origins = origins.Add(metadata.Metadata[report.HostNodeID]) - mapped.Origins = origins - - nodes[mapped.ID] = mapped - source2mapped[nodeID] = mapped.ID } - mkPseudoNode := func(srcNodeID, dstNodeID string, srcIsClient bool) (string, bool) { + mkPseudoNode := func(srcNodeID, dstNodeID string, srcIsClient bool) report.IDList { pseudoNode, ok := m.Pseudo(srcNodeID, dstNodeID, srcIsClient, localNetworks) if !ok { - return "", false + return report.MakeIDList() } pseudoNode.Origins = pseudoNode.Origins.Add(srcNodeID) existing, ok := nodes[pseudoNode.ID] @@ -184,8 +181,8 @@ func (m LeafMap) Render(rpt report.Report) RenderableNodes { } nodes[pseudoNode.ID] = pseudoNode - source2mapped[pseudoNode.ID] = srcNodeID - return pseudoNode.ID, true + source2mapped[pseudoNode.ID] = source2mapped[pseudoNode.ID].Add(srcNodeID) + return report.MakeIDList(pseudoNode.ID) } // Walk the graph and make connections. @@ -196,51 +193,61 @@ func (m LeafMap) Render(rpt report.Report) RenderableNodes { continue } - srcRenderableID, ok := source2mapped[srcNodeID] + srcRenderableIDs, ok := source2mapped[srcNodeID] if !ok { - // One of the entries in dsts must be a non-pseudo node - var existingDstNodeID string + // One of the entries in dsts must be a non-pseudo node, unless + // it was dropped by the mapping function. for _, dstNodeID := range dsts { if _, ok := source2mapped[dstNodeID]; ok { - existingDstNodeID = dstNodeID + srcRenderableIDs = mkPseudoNode(srcNodeID, dstNodeID, true) break } } - - srcRenderableID, ok = mkPseudoNode(srcNodeID, existingDstNodeID, true) - if !ok { - continue - } } - srcRenderableNode := nodes[srcRenderableID] + if len(srcRenderableIDs) == 0 { + continue + } - for _, dstNodeID := range dsts { - dstRenderableID, ok := source2mapped[dstNodeID] - if !ok { - dstRenderableID, ok = mkPseudoNode(dstNodeID, srcNodeID, false) + for _, srcRenderableID := range srcRenderableIDs { + srcRenderableNode := nodes[srcRenderableID] + + for _, dstNodeID := range dsts { + dstRenderableIDs, ok := source2mapped[dstNodeID] if !ok { + dstRenderableIDs = mkPseudoNode(dstNodeID, srcNodeID, false) + } + if len(dstRenderableIDs) == 0 { continue } - } - dstRenderableNode := nodes[dstRenderableID] + for _, dstRenderableID := range dstRenderableIDs { + dstRenderableNode := nodes[dstRenderableID] + srcRenderableNode.Adjacency = srcRenderableNode.Adjacency.Add(dstRenderableID) - srcRenderableNode.Adjacency = srcRenderableNode.Adjacency.Add(dstRenderableID) - - // We propagate edge metadata to nodes on both ends of the edges. - // TODO we should 'reverse' one end of the edge meta data - ingress -> egress etc. - if md, ok := t.EdgeMetadatas[report.MakeEdgeID(srcNodeID, dstNodeID)]; ok { - srcRenderableNode.EdgeMetadata = srcRenderableNode.EdgeMetadata.Merge(md) - dstRenderableNode.EdgeMetadata = dstRenderableNode.EdgeMetadata.Merge(md) - nodes[dstRenderableID] = dstRenderableNode + // We propagate edge metadata to nodes on both ends of the edges. + // TODO we should 'reverse' one end of the edge meta data - ingress -> egress etc. + if md, ok := t.EdgeMetadatas[report.MakeEdgeID(srcNodeID, dstNodeID)]; ok { + srcRenderableNode.EdgeMetadata = srcRenderableNode.EdgeMetadata.Merge(md) + dstRenderableNode.EdgeMetadata = dstRenderableNode.EdgeMetadata.Merge(md) + nodes[dstRenderableID] = dstRenderableNode + } + } } + + nodes[srcRenderableID] = srcRenderableNode } - - nodes[srcRenderableID] = srcRenderableNode } return nodes } +func ids(nodes RenderableNodes) report.IDList { + result := report.MakeIDList() + for id := range nodes { + result = result.Add(id) + } + return result +} + // EdgeMetadata gives the metadata of an edge from the perspective of the // srcRenderableID. Since an edgeID can have multiple edges on the address // level, it uses the supplied mapping function to translate address IDs to @@ -254,15 +261,16 @@ func (m LeafMap) EdgeMetadata(rpt report.Report, srcRenderableID, dstRenderableI log.Printf("bad edge ID %q", edgeID) continue } + srcs, dsts := report.MakeIDList(src), report.MakeIDList(dst) if src != report.TheInternet { - mapped, _ := m.Mapper(t.NodeMetadatas[src]) - src = mapped.ID + mapped := m.Mapper(t.NodeMetadatas[src]) + srcs = ids(mapped) } if dst != report.TheInternet { - mapped, _ := m.Mapper(t.NodeMetadatas[dst]) - dst = mapped.ID + mapped := m.Mapper(t.NodeMetadatas[dst]) + dsts = ids(mapped) } - if src == srcRenderableID && dst == dstRenderableID { + if srcs.Contains(srcRenderableID) && dsts.Contains(dstRenderableID) { metadata = metadata.Flatten(edgeMeta) } } diff --git a/render/render_test.go b/render/render_test.go index bc69c90ff..a9f0a5fd4 100644 --- a/render/render_test.go +++ b/render/render_test.go @@ -52,8 +52,8 @@ func TestReduceEdge(t *testing.T) { func TestMapRender1(t *testing.T) { // 1. Check when we return false, the node gets filtered out mapper := render.Map{ - MapFunc: func(nodes render.RenderableNode) (render.RenderableNode, bool) { - return render.RenderableNode{}, false + MapFunc: func(nodes render.RenderableNode) render.RenderableNodes { + return render.RenderableNodes{} }, Renderer: mockRenderer{RenderableNodes: render.RenderableNodes{ "foo": {ID: "foo"}, @@ -69,8 +69,8 @@ func TestMapRender1(t *testing.T) { func TestMapRender2(t *testing.T) { // 2. Check we can remap two nodes into one mapper := render.Map{ - MapFunc: func(nodes render.RenderableNode) (render.RenderableNode, bool) { - return render.RenderableNode{ID: "bar"}, true + MapFunc: func(nodes render.RenderableNode) render.RenderableNodes { + return render.RenderableNodes{"bar": render.RenderableNode{ID: "bar"}} }, Renderer: mockRenderer{RenderableNodes: render.RenderableNodes{ "foo": {ID: "foo"}, @@ -89,8 +89,9 @@ func TestMapRender2(t *testing.T) { func TestMapRender3(t *testing.T) { // 3. Check we can remap adjacencies mapper := render.Map{ - MapFunc: func(nodes render.RenderableNode) (render.RenderableNode, bool) { - return render.RenderableNode{ID: "_" + nodes.ID}, true + MapFunc: func(nodes render.RenderableNode) render.RenderableNodes { + id := "_" + nodes.ID + return render.RenderableNodes{id: render.RenderableNode{ID: id}} }, Renderer: mockRenderer{RenderableNodes: render.RenderableNodes{ "foo": {ID: "foo", Adjacency: report.MakeIDList("baz")}, @@ -125,13 +126,14 @@ func TestMapEdge(t *testing.T) { } } - identity := func(nmd report.NodeMetadata) (render.RenderableNode, bool) { - return render.NewRenderableNode(nmd.Metadata["id"], "", "", "", nmd), true + identity := func(nmd report.NodeMetadata) render.RenderableNodes { + return render.RenderableNodes{nmd.Metadata["id"]: render.NewRenderableNode(nmd.Metadata["id"], "", "", "", nmd)} } mapper := render.Map{ - MapFunc: func(n render.RenderableNode) (render.RenderableNode, bool) { - return render.RenderableNode{ID: "_" + n.ID}, true + MapFunc: func(nodes render.RenderableNode) render.RenderableNodes { + id := "_" + nodes.ID + return render.RenderableNodes{id: render.RenderableNode{ID: id}} }, Renderer: render.LeafMap{ Selector: selector, diff --git a/render/topologies.go b/render/topologies.go index f078dfeea..99c83f108 100644 --- a/render/topologies.go +++ b/render/topologies.go @@ -99,11 +99,34 @@ var ContainerRenderer = MakeReduce( }, }, }, + LeafMap{ Selector: report.SelectContainer, Mapper: MapContainerIdentity, Pseudo: PanicPseudoNode, }, + + // This mapper brings in short lived connections by joining with container IPs. + // We need to be careful to ensure we only include each edge once. Edges brought in + // by the above renders will have a pid, so its enough to filter out any nodes with + // pids. + Map{ + MapFunc: MapIP2Container, + Renderer: FilterUnconnected( + MakeReduce( + LeafMap{ + Selector: report.SelectContainer, + Mapper: MapContainer2IP, + Pseudo: PanicPseudoNode, + }, + LeafMap{ + Selector: report.SelectEndpoint, + Mapper: MapEndpoint2IP, + Pseudo: IPPseudoNode, + }, + ), + ), + }, ) // ContainerImageRenderer is a Renderer which produces a renderable container diff --git a/test/exec/exec.go b/test/exec/exec.go new file mode 100644 index 000000000..61f47d36a --- /dev/null +++ b/test/exec/exec.go @@ -0,0 +1,48 @@ +package exec + +import ( + "bytes" + "io" + "io/ioutil" + "os" + + "github.com/weaveworks/scope/common/exec" +) + +type mockCmd struct { + io.ReadCloser +} + +// NewMockCmdString creates a new mock Cmd which has s on its stdout pipe +func NewMockCmdString(s string) exec.Cmd { + return &mockCmd{ + struct { + io.Reader + io.Closer + }{ + bytes.NewBufferString(s), + ioutil.NopCloser(nil), + }, + } +} + +// NewMockCmd creates a new mock Cmd with rc as its stdout pipe +func NewMockCmd(rc io.ReadCloser) exec.Cmd { + return &mockCmd{rc} +} + +func (c *mockCmd) Start() error { + return nil +} + +func (c *mockCmd) Wait() error { + return nil +} + +func (c *mockCmd) StdoutPipe() (io.ReadCloser, error) { + return c.ReadCloser, nil +} + +func (c *mockCmd) Process() *os.Process { + return nil +} diff --git a/test/poll.go b/test/poll.go index 864043f25..da20f15e3 100644 --- a/test/poll.go +++ b/test/poll.go @@ -2,6 +2,7 @@ package test import ( "reflect" + "runtime" "testing" "time" ) @@ -20,6 +21,7 @@ func Poll(t *testing.T, d time.Duration, want interface{}, have func() interface } h := have() if !reflect.DeepEqual(want, h) { - t.Fatal(Diff(want, h)) + _, file, line, _ := runtime.Caller(1) + t.Fatalf("%s:%d: %s", file, line, Diff(want, h)) } }