From 32a57e63dbf7385d4da1ada31bb9a205919d8e7d Mon Sep 17 00:00:00 2001 From: Peter Bourgon Date: Tue, 29 Sep 2015 15:11:02 +0200 Subject: [PATCH 01/10] probe/endpoint: NATMapper missed an edge case NATMapper can be created with a nil Conntracker if ConntrackerModulePresent is false, e.g. on Darwin. Check for that in ApplyNAT. --- probe/endpoint/nat.go | 9 ++++++--- probe/endpoint/nat_test.go | 4 ++-- probe/endpoint/reporter.go | 2 +- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/probe/endpoint/nat.go b/probe/endpoint/nat.go index fe9927155..18ae534d1 100644 --- a/probe/endpoint/nat.go +++ b/probe/endpoint/nat.go @@ -21,8 +21,8 @@ type NATMapper struct { Conntracker } -// NewNATMapper is exposed for testing -func NewNATMapper(ct Conntracker) NATMapper { +// MakeNATMapper is exposed for testing +func MakeNATMapper(ct Conntracker) NATMapper { return NATMapper{ct} } @@ -50,7 +50,10 @@ func toMapping(f Flow) *endpointMapping { // ApplyNAT duplicates Nodes in the endpoint topology of a // report, based on the NAT table as returns by natTable. func (n NATMapper) ApplyNAT(rpt report.Report, scope string) { - n.WalkFlows(func(f Flow) { + if n.Conntracker == nil { + return + } + n.Conntracker.WalkFlows(func(f Flow) { var ( mapping = toMapping(f) realEndpointID = report.MakeEndpointNodeID(scope, mapping.originalIP, strconv.Itoa(mapping.originalPort)) diff --git a/probe/endpoint/nat_test.go b/probe/endpoint/nat_test.go index 65b2c16ea..49580d512 100644 --- a/probe/endpoint/nat_test.go +++ b/probe/endpoint/nat_test.go @@ -55,7 +55,7 @@ func TestNat(t *testing.T) { "foo": "bar", })) - natmapper := endpoint.NewNATMapper(ct) + natmapper := endpoint.MakeNATMapper(ct) natmapper.ApplyNAT(have, "host1") if !reflect.DeepEqual(want, have) { t.Fatal(test.Diff(want, have)) @@ -88,7 +88,7 @@ func TestNat(t *testing.T) { "foo": "baz", })) - natmapper := endpoint.NewNATMapper(ct) + natmapper := endpoint.MakeNATMapper(ct) natmapper.ApplyNAT(have, "host1") if !reflect.DeepEqual(want, have) { t.Fatal(test.Diff(want, have)) diff --git a/probe/endpoint/reporter.go b/probe/endpoint/reporter.go index b5b67d7fb..ec239cc3e 100644 --- a/probe/endpoint/reporter.go +++ b/probe/endpoint/reporter.go @@ -66,7 +66,7 @@ func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bo if err != nil { log.Printf("Failed to start conntracker for natmapper: %v", err) } - natmapper = NewNATMapper(ct) + natmapper = MakeNATMapper(ct) } return &Reporter{ hostID: hostID, From 65a75474152a3de0d1f4c069178059119fd2e197 Mon Sep 17 00:00:00 2001 From: Peter Bourgon Date: Tue, 29 Sep 2015 18:51:13 +0200 Subject: [PATCH 02/10] Intermediate stage fix --- probe/endpoint/reporter.go | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/probe/endpoint/reporter.go b/probe/endpoint/reporter.go index ec239cc3e..7f4332652 100644 --- a/probe/endpoint/reporter.go +++ b/probe/endpoint/reporter.go @@ -50,30 +50,29 @@ var SpyDuration = prometheus.NewSummaryVec( // with process (PID) information. func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bool) *Reporter { var ( - conntrackModulePresent = ConntrackModulePresent() - conntracker Conntracker - natmapper NATMapper - err error + conntracker Conntracker + natmapper *NATMapper ) - if conntrackModulePresent && useConntrack { - conntracker, err = NewConntracker(true) - if err != nil { - log.Printf("Failed to start conntracker: %v", err) + if ConntrackModulePresent() { + if useConntrack { + var err error + if conntracker, err = NewConntracker(true); err != nil { + log.Printf("Failed to start conntracker for endpoint reporter: %v", err) + } } - } - if conntrackModulePresent { - ct, err := NewConntracker(true, "--any-nat") - if err != nil { - log.Printf("Failed to start conntracker for natmapper: %v", err) + if natmapperConntracker, err := NewConntracker(true, "--any-nat"); err == nil { + m := MakeNATMapper(natmapperConntracker) + natmapper = &m + } else { + log.Printf("Failed to start conntracker for NAT mapper: %v", err) } - natmapper = MakeNATMapper(ct) } return &Reporter{ hostID: hostID, hostName: hostName, includeProcesses: includeProcesses, conntracker: conntracker, - natmapper: &natmapper, + natmapper: natmapper, revResolver: NewReverseResolver(), } } From c8fbea0f1582126ab374390a04d4b40a33023f2d Mon Sep 17 00:00:00 2001 From: Peter Bourgon Date: Fri, 2 Oct 2015 17:05:00 +0200 Subject: [PATCH 03/10] Move fixture to its own package We want to be able to import test from packages like probe/endpoint, in order to use utility functions like poll. But that causes an import cycle with the current layout. We got around this using dot-imports so far, but it's ugly and unnecessary: fixture can be its own package. --- app/api_topology_test.go | 3 +- app/mock_reporter_test.go | 4 +- app/origin_host_test.go | 4 +- render/detailed_node_test.go | 81 +++++----- render/expected/expected.go | 232 +++++++++++++-------------- render/topologies_test.go | 17 +- test/{ => fixture}/report_fixture.go | 2 +- 7 files changed, 173 insertions(+), 170 deletions(-) rename test/{ => fixture}/report_fixture.go (99%) diff --git a/app/api_topology_test.go b/app/api_topology_test.go index b81f75661..ef5d806b3 100644 --- a/app/api_topology_test.go +++ b/app/api_topology_test.go @@ -14,6 +14,7 @@ import ( "github.com/weaveworks/scope/render/expected" "github.com/weaveworks/scope/report" "github.com/weaveworks/scope/test" + "github.com/weaveworks/scope/test/fixture" ) func TestAll(t *testing.T) { @@ -78,7 +79,7 @@ func TestAPITopologyApplications(t *testing.T) { } equals(t, expected.ServerProcessID, node.Node.ID) equals(t, "apache", node.Node.LabelMajor) - equals(t, fmt.Sprintf("%s (server:%s)", test.ServerHostID, test.ServerPID), node.Node.LabelMinor) + equals(t, fmt.Sprintf("%s (server:%s)", fixture.ServerHostID, fixture.ServerPID), node.Node.LabelMinor) equals(t, false, node.Node.Pseudo) // Let's not unit-test the specific content of the detail tables } diff --git a/app/mock_reporter_test.go b/app/mock_reporter_test.go index 711c9b209..99439e9a0 100644 --- a/app/mock_reporter_test.go +++ b/app/mock_reporter_test.go @@ -2,12 +2,12 @@ package main import ( "github.com/weaveworks/scope/report" - "github.com/weaveworks/scope/test" + "github.com/weaveworks/scope/test/fixture" ) // StaticReport is used as a fixture in tests. It emulates an xfer.Collector. type StaticReport struct{} -func (s StaticReport) Report() report.Report { return test.Report } +func (s StaticReport) Report() report.Report { return fixture.Report } func (s StaticReport) Add(report.Report) {} diff --git a/app/origin_host_test.go b/app/origin_host_test.go index 08240f232..93b7e9fc5 100644 --- a/app/origin_host_test.go +++ b/app/origin_host_test.go @@ -6,7 +6,7 @@ import ( "net/http/httptest" "testing" - "github.com/weaveworks/scope/test" + "github.com/weaveworks/scope/test/fixture" ) func TestAPIOriginHost(t *testing.T) { @@ -18,7 +18,7 @@ func TestAPIOriginHost(t *testing.T) { { // Origin - body := getRawJSON(t, ts, fmt.Sprintf("/api/origin/host/%s", test.ServerHostNodeID)) + body := getRawJSON(t, ts, fmt.Sprintf("/api/origin/host/%s", fixture.ServerHostNodeID)) var o OriginHost if err := json.Unmarshal(body, &o); err != nil { t.Fatalf("JSON parse error: %s", err) diff --git a/render/detailed_node_test.go b/render/detailed_node_test.go index 5c5c5306a..d3be039c5 100644 --- a/render/detailed_node_test.go +++ b/render/detailed_node_test.go @@ -7,20 +7,21 @@ import ( "github.com/weaveworks/scope/render" "github.com/weaveworks/scope/test" + "github.com/weaveworks/scope/test/fixture" ) func TestOriginTable(t *testing.T) { - if _, ok := render.OriginTable(test.Report, "not-found", false, false); ok { + if _, ok := render.OriginTable(fixture.Report, "not-found", false, false); ok { t.Errorf("unknown origin ID gave unexpected success") } - for originID, want := range map[string]render.Table{test.ServerProcessNodeID: { - Title: fmt.Sprintf(`Process "apache" (%s)`, test.ServerPID), + for originID, want := range map[string]render.Table{fixture.ServerProcessNodeID: { + Title: fmt.Sprintf(`Process "apache" (%s)`, fixture.ServerPID), Numeric: false, Rank: 2, Rows: []render.Row{}, }, - test.ServerHostNodeID: { - Title: fmt.Sprintf("Host %q", test.ServerHostName), + fixture.ServerHostNodeID: { + Title: fmt.Sprintf("Host %q", fixture.ServerHostName), Numeric: false, Rank: 1, Rows: []render.Row{ @@ -29,7 +30,7 @@ func TestOriginTable(t *testing.T) { }, }, } { - have, ok := render.OriginTable(test.Report, originID, false, false) + have, ok := render.OriginTable(fixture.Report, originID, false, false) if !ok { t.Errorf("%q: not OK", originID) continue @@ -41,23 +42,23 @@ func TestOriginTable(t *testing.T) { // Test host/container tags for originID, want := range map[string]render.Table{ - test.ServerProcessNodeID: { - Title: fmt.Sprintf(`Process "apache" (%s)`, test.ServerPID), + fixture.ServerProcessNodeID: { + Title: fmt.Sprintf(`Process "apache" (%s)`, fixture.ServerPID), Numeric: false, Rank: 2, Rows: []render.Row{ - {"Host", test.ServerHostID, "", false}, - {"Container ID", test.ServerContainerID, "", false}, + {"Host", fixture.ServerHostID, "", false}, + {"Container ID", fixture.ServerContainerID, "", false}, }, }, - test.ServerContainerNodeID: { + fixture.ServerContainerNodeID: { Title: `Container "server"`, Numeric: false, Rank: 3, Rows: []render.Row{ - {"Host", test.ServerHostID, "", false}, - {"ID", test.ServerContainerID, "", false}, - {"Image ID", test.ServerContainerImageID, "", false}, + {"Host", fixture.ServerHostID, "", false}, + {"ID", fixture.ServerContainerID, "", false}, + {"Image ID", fixture.ServerContainerImageID, "", false}, {fmt.Sprintf(`Label %q`, render.AmazonECSContainerNameLabel), `server`, "", false}, {`Label "foo1"`, `bar1`, "", false}, {`Label "foo2"`, `bar2`, "", false}, @@ -65,7 +66,7 @@ func TestOriginTable(t *testing.T) { }, }, } { - have, ok := render.OriginTable(test.Report, originID, true, true) + have, ok := render.OriginTable(fixture.Report, originID, true, true) if !ok { t.Errorf("%q: not OK", originID) continue @@ -78,16 +79,16 @@ func TestOriginTable(t *testing.T) { } func TestMakeDetailedHostNode(t *testing.T) { - renderableNode := render.HostRenderer.Render(test.Report)[render.MakeHostID(test.ClientHostID)] - have := render.MakeDetailedNode(test.Report, renderableNode) + renderableNode := render.HostRenderer.Render(fixture.Report)[render.MakeHostID(fixture.ClientHostID)] + have := render.MakeDetailedNode(fixture.Report, renderableNode) want := render.DetailedNode{ - ID: render.MakeHostID(test.ClientHostID), + ID: render.MakeHostID(fixture.ClientHostID), LabelMajor: "client", LabelMinor: "hostname.com", Pseudo: false, Tables: []render.Table{ { - Title: fmt.Sprintf("Host %q", test.ClientHostName), + Title: fmt.Sprintf("Host %q", fixture.ClientHostName), Numeric: false, Rank: 1, Rows: []render.Row{ @@ -135,12 +136,12 @@ func TestMakeDetailedHostNode(t *testing.T) { } func TestMakeDetailedContainerNode(t *testing.T) { - renderableNode := render.ContainerRenderer.Render(test.Report)[test.ServerContainerID] - have := render.MakeDetailedNode(test.Report, renderableNode) + renderableNode := render.ContainerRenderer.Render(fixture.Report)[fixture.ServerContainerID] + have := render.MakeDetailedNode(fixture.Report, renderableNode) want := render.DetailedNode{ - ID: test.ServerContainerID, + ID: fixture.ServerContainerID, LabelMajor: "server", - LabelMinor: test.ServerHostName, + LabelMinor: fixture.ServerHostName, Pseudo: false, Tables: []render.Table{ { @@ -148,7 +149,7 @@ func TestMakeDetailedContainerNode(t *testing.T) { Numeric: false, Rank: 4, Rows: []render.Row{ - {"Image ID", test.ServerContainerImageID, "", false}, + {"Image ID", fixture.ServerContainerImageID, "", false}, {`Label "foo1"`, `bar1`, "", false}, {`Label "foo2"`, `bar2`, "", false}, }, @@ -158,8 +159,8 @@ func TestMakeDetailedContainerNode(t *testing.T) { Numeric: false, Rank: 3, Rows: []render.Row{ - {"ID", test.ServerContainerID, "", false}, - {"Image ID", test.ServerContainerImageID, "", false}, + {"ID", fixture.ServerContainerID, "", false}, + {"Image ID", fixture.ServerContainerImageID, "", false}, {fmt.Sprintf(`Label %q`, render.AmazonECSContainerNameLabel), `server`, "", false}, {`Label "foo1"`, `bar1`, "", false}, {`Label "foo2"`, `bar2`, "", false}, @@ -167,13 +168,13 @@ func TestMakeDetailedContainerNode(t *testing.T) { }, }, { - Title: fmt.Sprintf(`Process "apache" (%s)`, test.ServerPID), + Title: fmt.Sprintf(`Process "apache" (%s)`, fixture.ServerPID), Numeric: false, Rank: 2, Rows: []render.Row{}, }, { - Title: fmt.Sprintf("Host %q", test.ServerHostName), + Title: fmt.Sprintf("Host %q", fixture.ServerHostName), Numeric: false, Rank: 1, Rows: []render.Row{ @@ -190,38 +191,38 @@ func TestMakeDetailedContainerNode(t *testing.T) { {"Ingress byte rate", "1.0", "KBps", false}, {"Client", "Server", "", true}, { - fmt.Sprintf("%s:%s", test.UnknownClient1IP, test.UnknownClient1Port), - fmt.Sprintf("%s:%s", test.ServerIP, test.ServerPort), + fmt.Sprintf("%s:%s", fixture.UnknownClient1IP, fixture.UnknownClient1Port), + fmt.Sprintf("%s:%s", fixture.ServerIP, fixture.ServerPort), "", true, }, { - fmt.Sprintf("%s:%s", test.UnknownClient2IP, test.UnknownClient2Port), - fmt.Sprintf("%s:%s", test.ServerIP, test.ServerPort), + fmt.Sprintf("%s:%s", fixture.UnknownClient2IP, fixture.UnknownClient2Port), + fmt.Sprintf("%s:%s", fixture.ServerIP, fixture.ServerPort), "", true, }, { - fmt.Sprintf("%s:%s", test.UnknownClient3IP, test.UnknownClient3Port), - fmt.Sprintf("%s:%s", test.ServerIP, test.ServerPort), + fmt.Sprintf("%s:%s", fixture.UnknownClient3IP, fixture.UnknownClient3Port), + fmt.Sprintf("%s:%s", fixture.ServerIP, fixture.ServerPort), "", true, }, { - fmt.Sprintf("%s:%s", test.ClientIP, test.ClientPort54001), - fmt.Sprintf("%s:%s", test.ServerIP, test.ServerPort), + fmt.Sprintf("%s:%s", fixture.ClientIP, fixture.ClientPort54001), + fmt.Sprintf("%s:%s", fixture.ServerIP, fixture.ServerPort), "", true, }, { - fmt.Sprintf("%s:%s", test.ClientIP, test.ClientPort54002), - fmt.Sprintf("%s:%s", test.ServerIP, test.ServerPort), + fmt.Sprintf("%s:%s", fixture.ClientIP, fixture.ClientPort54002), + fmt.Sprintf("%s:%s", fixture.ServerIP, fixture.ServerPort), "", true, }, { - fmt.Sprintf("%s:%s", test.RandomClientIP, test.RandomClientPort), - fmt.Sprintf("%s:%s", test.ServerIP, test.ServerPort), + fmt.Sprintf("%s:%s", fixture.RandomClientIP, fixture.RandomClientPort), + fmt.Sprintf("%s:%s", fixture.ServerIP, fixture.ServerPort), "", true, }, diff --git a/render/expected/expected.go b/render/expected/expected.go index 3616e6070..28ea83cfa 100644 --- a/render/expected/expected.go +++ b/render/expected/expected.go @@ -5,14 +5,14 @@ import ( "github.com/weaveworks/scope/render" "github.com/weaveworks/scope/report" - "github.com/weaveworks/scope/test" + "github.com/weaveworks/scope/test/fixture" ) // Exported for testing. var ( - uncontainedServerID = render.MakePseudoNodeID(render.UncontainedID, test.ServerHostName) - unknownPseudoNode1ID = render.MakePseudoNodeID("10.10.10.10", test.ServerIP, "80") - unknownPseudoNode2ID = render.MakePseudoNodeID("10.10.10.11", test.ServerIP, "80") + uncontainedServerID = render.MakePseudoNodeID(render.UncontainedID, fixture.ServerHostName) + unknownPseudoNode1ID = render.MakePseudoNodeID("10.10.10.10", fixture.ServerIP, "80") + unknownPseudoNode2ID = render.MakePseudoNodeID("10.10.10.11", fixture.ServerIP, "80") unknownPseudoNode1 = func(adjacent string) render.RenderableNode { return render.RenderableNode{ ID: unknownPseudoNode1ID, @@ -24,8 +24,8 @@ var ( EgressByteCount: newu64(700), }, Origins: report.MakeIDList( - test.UnknownClient1NodeID, - test.UnknownClient2NodeID, + fixture.UnknownClient1NodeID, + fixture.UnknownClient2NodeID, ), } } @@ -40,7 +40,7 @@ var ( EgressByteCount: newu64(500), }, Origins: report.MakeIDList( - test.UnknownClient3NodeID, + fixture.UnknownClient3NodeID, ), } } @@ -55,27 +55,27 @@ var ( EgressByteCount: newu64(600), }, Origins: report.MakeIDList( - test.RandomClientNodeID, - test.GoogleEndpointNodeID, + fixture.RandomClientNodeID, + fixture.GoogleEndpointNodeID, ), } } - ClientProcess1ID = render.MakeProcessID(test.ClientHostID, test.Client1PID) - ClientProcess2ID = render.MakeProcessID(test.ClientHostID, test.Client2PID) - ServerProcessID = render.MakeProcessID(test.ServerHostID, test.ServerPID) - nonContainerProcessID = render.MakeProcessID(test.ServerHostID, test.NonContainerPID) + ClientProcess1ID = render.MakeProcessID(fixture.ClientHostID, fixture.Client1PID) + ClientProcess2ID = render.MakeProcessID(fixture.ClientHostID, fixture.Client2PID) + ServerProcessID = render.MakeProcessID(fixture.ServerHostID, fixture.ServerPID) + nonContainerProcessID = render.MakeProcessID(fixture.ServerHostID, fixture.NonContainerPID) RenderedProcesses = (render.RenderableNodes{ ClientProcess1ID: { ID: ClientProcess1ID, - LabelMajor: test.Client1Comm, - LabelMinor: fmt.Sprintf("%s (%s)", test.ClientHostID, test.Client1PID), - Rank: test.Client1Comm, + LabelMajor: fixture.Client1Comm, + LabelMinor: fmt.Sprintf("%s (%s)", fixture.ClientHostID, fixture.Client1PID), + Rank: fixture.Client1Comm, Pseudo: false, Origins: report.MakeIDList( - test.Client54001NodeID, - test.ClientProcess1NodeID, - test.ClientHostNodeID, + fixture.Client54001NodeID, + fixture.ClientProcess1NodeID, + fixture.ClientHostNodeID, ), Node: report.MakeNode().WithAdjacent(ServerProcessID), EdgeMetadata: report.EdgeMetadata{ @@ -85,14 +85,14 @@ var ( }, ClientProcess2ID: { ID: ClientProcess2ID, - LabelMajor: test.Client2Comm, - LabelMinor: fmt.Sprintf("%s (%s)", test.ClientHostID, test.Client2PID), - Rank: test.Client2Comm, + LabelMajor: fixture.Client2Comm, + LabelMinor: fmt.Sprintf("%s (%s)", fixture.ClientHostID, fixture.Client2PID), + Rank: fixture.Client2Comm, Pseudo: false, Origins: report.MakeIDList( - test.Client54002NodeID, - test.ClientProcess2NodeID, - test.ClientHostNodeID, + fixture.Client54002NodeID, + fixture.ClientProcess2NodeID, + fixture.ClientHostNodeID, ), Node: report.MakeNode().WithAdjacent(ServerProcessID), EdgeMetadata: report.EdgeMetadata{ @@ -103,13 +103,13 @@ var ( ServerProcessID: { ID: ServerProcessID, LabelMajor: "apache", - LabelMinor: fmt.Sprintf("%s (%s)", test.ServerHostID, test.ServerPID), - Rank: test.ServerComm, + LabelMinor: fmt.Sprintf("%s (%s)", fixture.ServerHostID, fixture.ServerPID), + Rank: fixture.ServerComm, Pseudo: false, Origins: report.MakeIDList( - test.Server80NodeID, - test.ServerProcessNodeID, - test.ServerHostNodeID, + fixture.Server80NodeID, + fixture.ServerProcessNodeID, + fixture.ServerHostNodeID, ), Node: report.MakeNode(), EdgeMetadata: report.EdgeMetadata{ @@ -119,14 +119,14 @@ var ( }, nonContainerProcessID: { ID: nonContainerProcessID, - LabelMajor: test.NonContainerComm, - LabelMinor: fmt.Sprintf("%s (%s)", test.ServerHostID, test.NonContainerPID), - Rank: test.NonContainerComm, + LabelMajor: fixture.NonContainerComm, + LabelMinor: fmt.Sprintf("%s (%s)", fixture.ServerHostID, fixture.NonContainerPID), + Rank: fixture.NonContainerComm, Pseudo: false, Origins: report.MakeIDList( - test.NonContainerProcessNodeID, - test.ServerHostNodeID, - test.NonContainerNodeID, + fixture.NonContainerProcessNodeID, + fixture.ServerHostNodeID, + fixture.NonContainerNodeID, ), Node: report.MakeNode().WithAdjacent(render.TheInternetID), EdgeMetadata: report.EdgeMetadata{}, @@ -144,11 +144,11 @@ var ( Rank: "curl", Pseudo: false, Origins: report.MakeIDList( - test.Client54001NodeID, - test.Client54002NodeID, - test.ClientProcess1NodeID, - test.ClientProcess2NodeID, - test.ClientHostNodeID, + fixture.Client54001NodeID, + fixture.Client54002NodeID, + fixture.ClientProcess1NodeID, + fixture.ClientProcess2NodeID, + fixture.ClientHostNodeID, ), Node: report.MakeNode().WithAdjacent("apache"), EdgeMetadata: report.EdgeMetadata{ @@ -163,9 +163,9 @@ var ( Rank: "apache", Pseudo: false, Origins: report.MakeIDList( - test.Server80NodeID, - test.ServerProcessNodeID, - test.ServerHostNodeID, + fixture.Server80NodeID, + fixture.ServerProcessNodeID, + fixture.ServerHostNodeID, ), Node: report.MakeNode(), EdgeMetadata: report.EdgeMetadata{ @@ -173,16 +173,16 @@ var ( IngressByteCount: newu64(2100), }, }, - test.NonContainerComm: { - ID: test.NonContainerComm, - LabelMajor: test.NonContainerComm, + fixture.NonContainerComm: { + ID: fixture.NonContainerComm, + LabelMajor: fixture.NonContainerComm, LabelMinor: "1 process", - Rank: test.NonContainerComm, + Rank: fixture.NonContainerComm, Pseudo: false, Origins: report.MakeIDList( - test.NonContainerProcessNodeID, - test.ServerHostNodeID, - test.NonContainerNodeID, + fixture.NonContainerProcessNodeID, + fixture.ServerHostNodeID, + fixture.NonContainerNodeID, ), Node: report.MakeNode().WithAdjacent(render.TheInternetID), EdgeMetadata: report.EdgeMetadata{}, @@ -193,39 +193,39 @@ var ( }).Prune() RenderedContainers = (render.RenderableNodes{ - test.ClientContainerID: { - ID: test.ClientContainerID, + fixture.ClientContainerID: { + ID: fixture.ClientContainerID, LabelMajor: "client", - LabelMinor: test.ClientHostName, - Rank: test.ClientContainerImageName, + LabelMinor: fixture.ClientHostName, + Rank: fixture.ClientContainerImageName, Pseudo: false, Origins: report.MakeIDList( - test.ClientContainerImageNodeID, - test.ClientContainerNodeID, - test.Client54001NodeID, - test.Client54002NodeID, - test.ClientProcess1NodeID, - test.ClientProcess2NodeID, - test.ClientHostNodeID, + fixture.ClientContainerImageNodeID, + fixture.ClientContainerNodeID, + fixture.Client54001NodeID, + fixture.Client54002NodeID, + fixture.ClientProcess1NodeID, + fixture.ClientProcess2NodeID, + fixture.ClientHostNodeID, ), - Node: report.MakeNode().WithAdjacent(test.ServerContainerID), + Node: report.MakeNode().WithAdjacent(fixture.ServerContainerID), EdgeMetadata: report.EdgeMetadata{ EgressPacketCount: newu64(30), EgressByteCount: newu64(300), }, }, - test.ServerContainerID: { - ID: test.ServerContainerID, + fixture.ServerContainerID: { + ID: fixture.ServerContainerID, LabelMajor: "server", - LabelMinor: test.ServerHostName, - Rank: test.ServerContainerImageName, + LabelMinor: fixture.ServerHostName, + Rank: fixture.ServerContainerImageName, Pseudo: false, Origins: report.MakeIDList( - test.ServerContainerImageNodeID, - test.ServerContainerNodeID, - test.Server80NodeID, - test.ServerProcessNodeID, - test.ServerHostNodeID, + fixture.ServerContainerImageNodeID, + fixture.ServerContainerNodeID, + fixture.Server80NodeID, + fixture.ServerProcessNodeID, + fixture.ServerHostNodeID, ), Node: report.MakeNode(), EdgeMetadata: report.EdgeMetadata{ @@ -236,54 +236,54 @@ var ( uncontainedServerID: { ID: uncontainedServerID, LabelMajor: render.UncontainedMajor, - LabelMinor: test.ServerHostName, + LabelMinor: fixture.ServerHostName, Rank: "", Pseudo: true, Origins: report.MakeIDList( - test.NonContainerProcessNodeID, - test.ServerHostNodeID, - test.NonContainerNodeID, + fixture.NonContainerProcessNodeID, + fixture.ServerHostNodeID, + fixture.NonContainerNodeID, ), Node: report.MakeNode().WithAdjacent(render.TheInternetID), EdgeMetadata: report.EdgeMetadata{}, }, - render.TheInternetID: theInternetNode(test.ServerContainerID), + render.TheInternetID: theInternetNode(fixture.ServerContainerID), }).Prune() RenderedContainerImages = (render.RenderableNodes{ - test.ClientContainerImageName: { - ID: test.ClientContainerImageName, - LabelMajor: test.ClientContainerImageName, + fixture.ClientContainerImageName: { + ID: fixture.ClientContainerImageName, + LabelMajor: fixture.ClientContainerImageName, LabelMinor: "1 container", - Rank: test.ClientContainerImageName, + Rank: fixture.ClientContainerImageName, Pseudo: false, Origins: report.MakeIDList( - test.ClientContainerImageNodeID, - test.ClientContainerNodeID, - test.Client54001NodeID, - test.Client54002NodeID, - test.ClientProcess1NodeID, - test.ClientProcess2NodeID, - test.ClientHostNodeID, + fixture.ClientContainerImageNodeID, + fixture.ClientContainerNodeID, + fixture.Client54001NodeID, + fixture.Client54002NodeID, + fixture.ClientProcess1NodeID, + fixture.ClientProcess2NodeID, + fixture.ClientHostNodeID, ), - Node: report.MakeNode().WithAdjacent(test.ServerContainerImageName), + Node: report.MakeNode().WithAdjacent(fixture.ServerContainerImageName), EdgeMetadata: report.EdgeMetadata{ EgressPacketCount: newu64(30), EgressByteCount: newu64(300), }, }, - test.ServerContainerImageName: { - ID: test.ServerContainerImageName, - LabelMajor: test.ServerContainerImageName, + fixture.ServerContainerImageName: { + ID: fixture.ServerContainerImageName, + LabelMajor: fixture.ServerContainerImageName, LabelMinor: "1 container", - Rank: test.ServerContainerImageName, + Rank: fixture.ServerContainerImageName, Pseudo: false, Origins: report.MakeIDList( - test.ServerContainerImageNodeID, - test.ServerContainerNodeID, - test.Server80NodeID, - test.ServerProcessNodeID, - test.ServerHostNodeID), + fixture.ServerContainerImageNodeID, + fixture.ServerContainerNodeID, + fixture.Server80NodeID, + fixture.ServerProcessNodeID, + fixture.ServerHostNodeID), Node: report.MakeNode(), EdgeMetadata: report.EdgeMetadata{ IngressPacketCount: newu64(210), @@ -293,24 +293,24 @@ var ( uncontainedServerID: { ID: uncontainedServerID, LabelMajor: render.UncontainedMajor, - LabelMinor: test.ServerHostName, + LabelMinor: fixture.ServerHostName, Rank: "", Pseudo: true, Origins: report.MakeIDList( - test.NonContainerNodeID, - test.NonContainerProcessNodeID, - test.ServerHostNodeID, + fixture.NonContainerNodeID, + fixture.NonContainerProcessNodeID, + fixture.ServerHostNodeID, ), Node: report.MakeNode().WithAdjacent(render.TheInternetID), EdgeMetadata: report.EdgeMetadata{}, }, - render.TheInternetID: theInternetNode(test.ServerContainerImageName), + render.TheInternetID: theInternetNode(fixture.ServerContainerImageName), }).Prune() - ServerHostRenderedID = render.MakeHostID(test.ServerHostID) - ClientHostRenderedID = render.MakeHostID(test.ClientHostID) - pseudoHostID1 = render.MakePseudoNodeID(test.UnknownClient1IP, test.ServerIP) - pseudoHostID2 = render.MakePseudoNodeID(test.UnknownClient3IP, test.ServerIP) + ServerHostRenderedID = render.MakeHostID(fixture.ServerHostID) + ClientHostRenderedID = render.MakeHostID(fixture.ClientHostID) + pseudoHostID1 = render.MakePseudoNodeID(fixture.UnknownClient1IP, fixture.ServerIP) + pseudoHostID2 = render.MakePseudoNodeID(fixture.UnknownClient3IP, fixture.ServerIP) RenderedHosts = (render.RenderableNodes{ ServerHostRenderedID: { @@ -320,8 +320,8 @@ var ( Rank: "hostname.com", Pseudo: false, Origins: report.MakeIDList( - test.ServerHostNodeID, - test.ServerAddressNodeID, + fixture.ServerHostNodeID, + fixture.ServerAddressNodeID, ), Node: report.MakeNode(), EdgeMetadata: report.EdgeMetadata{ @@ -335,8 +335,8 @@ var ( Rank: "hostname.com", Pseudo: false, Origins: report.MakeIDList( - test.ClientHostNodeID, - test.ClientAddressNodeID, + fixture.ClientHostNodeID, + fixture.ClientAddressNodeID, ), Node: report.MakeNode().WithAdjacent(ServerHostRenderedID), EdgeMetadata: report.EdgeMetadata{ @@ -345,19 +345,19 @@ var ( }, pseudoHostID1: { ID: pseudoHostID1, - LabelMajor: test.UnknownClient1IP, + LabelMajor: fixture.UnknownClient1IP, Pseudo: true, Node: report.MakeNode().WithAdjacent(ServerHostRenderedID), EdgeMetadata: report.EdgeMetadata{}, - Origins: report.MakeIDList(test.UnknownAddress1NodeID, test.UnknownAddress2NodeID), + Origins: report.MakeIDList(fixture.UnknownAddress1NodeID, fixture.UnknownAddress2NodeID), }, pseudoHostID2: { ID: pseudoHostID2, - LabelMajor: test.UnknownClient3IP, + LabelMajor: fixture.UnknownClient3IP, Pseudo: true, Node: report.MakeNode().WithAdjacent(ServerHostRenderedID), EdgeMetadata: report.EdgeMetadata{}, - Origins: report.MakeIDList(test.UnknownAddress3NodeID), + Origins: report.MakeIDList(fixture.UnknownAddress3NodeID), }, render.TheInternetID: { ID: render.TheInternetID, @@ -365,7 +365,7 @@ var ( Pseudo: true, Node: report.MakeNode().WithAdjacent(ServerHostRenderedID), EdgeMetadata: report.EdgeMetadata{}, - Origins: report.MakeIDList(test.RandomAddressNodeID), + Origins: report.MakeIDList(fixture.RandomAddressNodeID), }, }).Prune() diff --git a/render/topologies_test.go b/render/topologies_test.go index dfdd0cb04..a80e70968 100644 --- a/render/topologies_test.go +++ b/render/topologies_test.go @@ -9,10 +9,11 @@ import ( "github.com/weaveworks/scope/render" "github.com/weaveworks/scope/render/expected" "github.com/weaveworks/scope/test" + "github.com/weaveworks/scope/test/fixture" ) func TestProcessRenderer(t *testing.T) { - have := render.ProcessRenderer.Render(test.Report).Prune() + have := render.ProcessRenderer.Render(fixture.Report).Prune() want := expected.RenderedProcesses if !reflect.DeepEqual(want, have) { t.Error(test.Diff(want, have)) @@ -20,7 +21,7 @@ func TestProcessRenderer(t *testing.T) { } func TestProcessNameRenderer(t *testing.T) { - have := render.ProcessNameRenderer.Render(test.Report).Prune() + have := render.ProcessNameRenderer.Render(fixture.Report).Prune() want := expected.RenderedProcessNames if !reflect.DeepEqual(want, have) { t.Error(test.Diff(want, have)) @@ -28,7 +29,7 @@ func TestProcessNameRenderer(t *testing.T) { } func TestContainerRenderer(t *testing.T) { - have := (render.ContainerWithImageNameRenderer.Render(test.Report)).Prune() + have := (render.ContainerWithImageNameRenderer.Render(fixture.Report)).Prune() want := expected.RenderedContainers if !reflect.DeepEqual(want, have) { t.Error(test.Diff(want, have)) @@ -38,18 +39,18 @@ func TestContainerRenderer(t *testing.T) { func TestContainerFilterRenderer(t *testing.T) { // tag on of the containers in the topology and ensure // it is filtered out correctly. - input := test.Report.Copy() - input.Container.Nodes[test.ClientContainerNodeID].Metadata[docker.LabelPrefix+"works.weave.role"] = "system" + input := fixture.Report.Copy() + input.Container.Nodes[fixture.ClientContainerNodeID].Metadata[docker.LabelPrefix+"works.weave.role"] = "system" have := render.FilterSystem(render.ContainerWithImageNameRenderer).Render(input).Prune() want := expected.RenderedContainers.Copy() - delete(want, test.ClientContainerID) + delete(want, fixture.ClientContainerID) if !reflect.DeepEqual(want, have) { t.Error(test.Diff(want, have)) } } func TestContainerImageRenderer(t *testing.T) { - have := render.ContainerImageRenderer.Render(test.Report).Prune() + have := render.ContainerImageRenderer.Render(fixture.Report).Prune() want := expected.RenderedContainerImages if !reflect.DeepEqual(want, have) { t.Error(test.Diff(want, have)) @@ -57,7 +58,7 @@ func TestContainerImageRenderer(t *testing.T) { } func TestHostRenderer(t *testing.T) { - have := render.HostRenderer.Render(test.Report).Prune() + have := render.HostRenderer.Render(fixture.Report).Prune() want := expected.RenderedHosts if !reflect.DeepEqual(want, have) { t.Error(test.Diff(want, have)) diff --git a/test/report_fixture.go b/test/fixture/report_fixture.go similarity index 99% rename from test/report_fixture.go rename to test/fixture/report_fixture.go index 06da33cf9..7e09d5a88 100644 --- a/test/report_fixture.go +++ b/test/fixture/report_fixture.go @@ -1,4 +1,4 @@ -package test +package fixture import ( "time" From 36ce1089f4303282593cafe922fc2aca5fcb7cad Mon Sep 17 00:00:00 2001 From: Peter Bourgon Date: Fri, 2 Oct 2015 17:08:04 +0200 Subject: [PATCH 04/10] Don't export reverseResolver It's only used within package endpoint, so it shouldn't be exported. That means resolver_test becomes resolver_internal_test, and with the previous change to the fixture, we can avoid the dot-import. Also, update method names to reflect it's an unexported type. --- probe/endpoint/reporter.go | 14 +++++------ probe/endpoint/resolver.go | 23 +++++++++---------- ...lver_test.go => resolver_internal_test.go} | 9 ++++---- 3 files changed, 22 insertions(+), 24 deletions(-) rename probe/endpoint/{resolver_test.go => resolver_internal_test.go} (81%) diff --git a/probe/endpoint/reporter.go b/probe/endpoint/reporter.go index 7f4332652..98c086155 100644 --- a/probe/endpoint/reporter.go +++ b/probe/endpoint/reporter.go @@ -28,7 +28,7 @@ type Reporter struct { includeNAT bool conntracker Conntracker natmapper *NATMapper - revResolver *ReverseResolver + reverseResolver *reverseResolver } // SpyDuration is an exported prometheus metric @@ -73,7 +73,7 @@ func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bo includeProcesses: includeProcesses, conntracker: conntracker, natmapper: natmapper, - revResolver: NewReverseResolver(), + reverseResolver: newReverseResolver(), } } @@ -85,7 +85,7 @@ func (r *Reporter) Stop() { if r.natmapper != nil { r.natmapper.Stop() } - r.revResolver.Stop() + r.reverseResolver.stop() } // Report implements Reporter. @@ -164,9 +164,9 @@ func (r *Reporter) addConnection(rpt *report.Report, localAddr, remoteAddr strin // In case we have a reverse resolution for the IP, we can use it for // the name... - if revRemoteName, err := r.revResolver.Get(remoteAddr); err == nil { + if remoteName, err := r.reverseResolver.get(remoteAddr); err == nil { remoteNode = remoteNode.WithMetadata(map[string]string{ - "name": revRemoteName, + "name": remoteName, }) } @@ -210,9 +210,9 @@ func (r *Reporter) addConnection(rpt *report.Report, localAddr, remoteAddr strin // In case we have a reverse resolution for the IP, we can use it for // the name... - if revRemoteName, err := r.revResolver.Get(remoteAddr); err == nil { + if remoteName, err := r.reverseResolver.get(remoteAddr); err == nil { remoteNode = remoteNode.WithMetadata(map[string]string{ - "name": revRemoteName, + "name": remoteName, }) } diff --git a/probe/endpoint/resolver.go b/probe/endpoint/resolver.go index 12af6ee16..af11ad3d5 100644 --- a/probe/endpoint/resolver.go +++ b/probe/endpoint/resolver.go @@ -16,22 +16,22 @@ const ( rAddrCacheExpiration = 30 * time.Minute ) -var errNotFound = fmt.Errorf("Not found") +var errNotFound = fmt.Errorf("not found") type revResFunc func(addr string) (names []string, err error) -// ReverseResolver is a caching, reverse resolver. -type ReverseResolver struct { +// A caching, reverse resolver. +type reverseResolver struct { addresses chan string cache gcache.Cache Throttle <-chan time.Time // Made public for mocking Resolver revResFunc } -// NewReverseResolver starts a new reverse resolver that performs reverse +// newReverseResolver starts a new reverse resolver that performs reverse // resolutions and caches the result. -func NewReverseResolver() *ReverseResolver { - r := ReverseResolver{ +func newReverseResolver() *reverseResolver { + r := reverseResolver{ addresses: make(chan string, rAddrBacklog), cache: gcache.New(rAddrCacheLen).LRU().Expiration(rAddrCacheExpiration).Build(), Throttle: time.Tick(time.Second / 10), @@ -41,10 +41,10 @@ func NewReverseResolver() *ReverseResolver { return &r } -// Get the reverse resolution for an IP address if already in the cache, a +// get the reverse resolution for an IP address if already in the cache, a // gcache.NotFoundKeyError error otherwise. Note: it returns one of the // possible names that can be obtained for that IP. -func (r *ReverseResolver) Get(address string) (string, error) { +func (r *reverseResolver) get(address string) (string, error) { val, err := r.cache.Get(address) if hostname, ok := val.(string); err == nil && ok { return hostname, nil @@ -53,7 +53,7 @@ func (r *ReverseResolver) Get(address string) (string, error) { return "", errNotFound } if err == gcache.NotFoundKeyError { - // We trigger a asynchronous reverse resolution when not cached + // We trigger a asynchronous reverse resolution when not cached. select { case r.addresses <- address: default: @@ -62,7 +62,7 @@ func (r *ReverseResolver) Get(address string) (string, error) { return "", errNotFound } -func (r *ReverseResolver) loop() { +func (r *reverseResolver) loop() { for request := range r.addresses { // check if the answer is already in the cache if _, err := r.cache.Get(request); err == nil { @@ -80,7 +80,6 @@ func (r *ReverseResolver) loop() { } } -// Stop the async reverse resolver. -func (r *ReverseResolver) Stop() { +func (r *reverseResolver) stop() { close(r.addresses) } diff --git a/probe/endpoint/resolver_test.go b/probe/endpoint/resolver_internal_test.go similarity index 81% rename from probe/endpoint/resolver_test.go rename to probe/endpoint/resolver_internal_test.go index 096113620..a3ac4632e 100644 --- a/probe/endpoint/resolver_test.go +++ b/probe/endpoint/resolver_internal_test.go @@ -1,11 +1,10 @@ -package endpoint_test +package endpoint import ( "errors" "testing" "time" - . "github.com/weaveworks/scope/probe/endpoint" "github.com/weaveworks/scope/test" ) @@ -15,8 +14,8 @@ func TestReverseResolver(t *testing.T) { "4.3.2.1": {"im.a.little.tea.pot"}, } - revRes := NewReverseResolver() - defer revRes.Stop() + revRes := newReverseResolver() + defer revRes.stop() // Use a mocked resolver function. revRes.Resolver = func(addr string) (names []string, err error) { @@ -31,7 +30,7 @@ func TestReverseResolver(t *testing.T) { for ip, names := range tests { test.Poll(t, 100*time.Millisecond, names[0], func() interface{} { - result, _ := revRes.Get(ip) + result, _ := revRes.get(ip) return result }) } From 6ae5077515ea784a898ee5fc5ff3ae02a1c0f6a3 Mon Sep 17 00:00:00 2001 From: Peter Bourgon Date: Fri, 2 Oct 2015 17:53:44 +0200 Subject: [PATCH 05/10] Un-export NATMapper Lots of TODOs to clean up... --- probe/endpoint/nat.go | 17 ++- .../{nat_test.go => nat_internal_test.go} | 107 ++++++++++++++---- probe/endpoint/reporter.go | 22 ++-- 3 files changed, 102 insertions(+), 44 deletions(-) rename probe/endpoint/{nat_test.go => nat_internal_test.go} (54%) diff --git a/probe/endpoint/nat.go b/probe/endpoint/nat.go index 18ae534d1..ab1b8f5f1 100644 --- a/probe/endpoint/nat.go +++ b/probe/endpoint/nat.go @@ -16,14 +16,13 @@ type endpointMapping struct { rewrittenPort int } -// NATMapper rewrites a report to deal with NAT's connections -type NATMapper struct { +// natMapper rewrites a report to deal with NAT'd connections. +type natMapper struct { Conntracker } -// MakeNATMapper is exposed for testing -func MakeNATMapper(ct Conntracker) NATMapper { - return NATMapper{ct} +func makeNATMapper(ct Conntracker) natMapper { + return natMapper{ct} } func toMapping(f Flow) *endpointMapping { @@ -47,10 +46,10 @@ func toMapping(f Flow) *endpointMapping { return &mapping } -// ApplyNAT duplicates Nodes in the endpoint topology of a -// report, based on the NAT table as returns by natTable. -func (n NATMapper) ApplyNAT(rpt report.Report, scope string) { - if n.Conntracker == nil { +// applyNAT duplicates Nodes in the endpoint topology of a report, based on +// the NAT table. +func (n natMapper) applyNAT(rpt report.Report, scope string) { + if n.Conntracker == nil { // TODO(pb) return } n.Conntracker.WalkFlows(func(f Flow) { diff --git a/probe/endpoint/nat_test.go b/probe/endpoint/nat_internal_test.go similarity index 54% rename from probe/endpoint/nat_test.go rename to probe/endpoint/nat_internal_test.go index 49580d512..81ebb051a 100644 --- a/probe/endpoint/nat_test.go +++ b/probe/endpoint/nat_internal_test.go @@ -1,19 +1,19 @@ -package endpoint_test +package endpoint import ( + "encoding/xml" "reflect" "testing" - "github.com/weaveworks/scope/probe/endpoint" "github.com/weaveworks/scope/report" "github.com/weaveworks/scope/test" ) type mockConntracker struct { - flows []endpoint.Flow + flows []Flow } -func (m *mockConntracker) WalkFlows(f func(endpoint.Flow)) { +func (m *mockConntracker) WalkFlows(f func(Flow)) { for _, flow := range m.flows { f(flow) } @@ -21,6 +21,67 @@ func (m *mockConntracker) WalkFlows(f func(endpoint.Flow)) { func (m *mockConntracker) Stop() {} +// TODO(pb): dedupe later +func makeFlow(ty string) Flow { + return Flow{ + XMLName: xml.Name{ + Local: "flow", + }, + Type: ty, + } +} + +// TODO(pb): dedupe later +func addMeta(f *Flow, dir, srcIP, dstIP string, srcPort, dstPort int) *Meta { + meta := Meta{ + XMLName: xml.Name{ + Local: "meta", + }, + Direction: dir, + Layer3: Layer3{ + XMLName: xml.Name{ + Local: "layer3", + }, + SrcIP: srcIP, + DstIP: dstIP, + }, + Layer4: Layer4{ + XMLName: xml.Name{ + Local: "layer4", + }, + SrcPort: srcPort, + DstPort: dstPort, + Proto: TCP, + }, + } + f.Metas = append(f.Metas, meta) + return &meta +} + +// TODO(pb): dedupe later +func addIndependant(f *Flow, id int64, state string) *Meta { + meta := Meta{ + XMLName: xml.Name{ + Local: "meta", + }, + Direction: "independent", + ID: id, + State: state, + Layer3: Layer3{ + XMLName: xml.Name{ + Local: "layer3", + }, + }, + Layer4: Layer4{ + XMLName: xml.Name{ + Local: "layer4", + }, + }, + } + f.Metas = append(f.Metas, meta) + return &meta +} + func TestNat(t *testing.T) { // test that two containers, on the docker network, get their connections mapped // correctly. @@ -36,27 +97,26 @@ func TestNat(t *testing.T) { flow.Original = addMeta(&flow, "original", "2.3.4.5", "1.2.3.4", 222222, 80) flow.Reply = addMeta(&flow, "reply", "10.0.47.1", "2.3.4.5", 80, 222222) ct := &mockConntracker{ - flows: []endpoint.Flow{flow}, + flows: []Flow{flow}, } have := report.MakeReport() originalID := report.MakeEndpointNodeID("host1", "10.0.47.1", "80") have.Endpoint.AddNode(originalID, report.MakeNodeWith(report.Metadata{ - endpoint.Addr: "10.0.47.1", - endpoint.Port: "80", - "foo": "bar", + Addr: "10.0.47.1", + Port: "80", + "foo": "bar", })) want := have.Copy() want.Endpoint.AddNode(report.MakeEndpointNodeID("host1", "1.2.3.4", "80"), report.MakeNodeWith(report.Metadata{ - endpoint.Addr: "1.2.3.4", - endpoint.Port: "80", - "copy_of": originalID, - "foo": "bar", + Addr: "1.2.3.4", + Port: "80", + "copy_of": originalID, + "foo": "bar", })) - natmapper := endpoint.MakeNATMapper(ct) - natmapper.ApplyNAT(have, "host1") + makeNATMapper(ct).applyNAT(have, "host1") if !reflect.DeepEqual(want, have) { t.Fatal(test.Diff(want, have)) } @@ -69,27 +129,26 @@ func TestNat(t *testing.T) { flow.Original = addMeta(&flow, "original", "10.0.47.2", "1.2.3.4", 22222, 80) flow.Reply = addMeta(&flow, "reply", "1.2.3.4", "2.3.4.5", 80, 22223) ct := &mockConntracker{ - flows: []endpoint.Flow{flow}, + flows: []Flow{flow}, } have := report.MakeReport() originalID := report.MakeEndpointNodeID("host2", "10.0.47.2", "22222") have.Endpoint.AddNode(originalID, report.MakeNodeWith(report.Metadata{ - endpoint.Addr: "10.0.47.2", - endpoint.Port: "22222", - "foo": "baz", + Addr: "10.0.47.2", + Port: "22222", + "foo": "baz", })) want := have.Copy() want.Endpoint.AddNode(report.MakeEndpointNodeID("host2", "2.3.4.5", "22223"), report.MakeNodeWith(report.Metadata{ - endpoint.Addr: "2.3.4.5", - endpoint.Port: "22223", - "copy_of": originalID, - "foo": "baz", + Addr: "2.3.4.5", + Port: "22223", + "copy_of": originalID, + "foo": "baz", })) - natmapper := endpoint.MakeNATMapper(ct) - natmapper.ApplyNAT(have, "host1") + makeNATMapper(ct).applyNAT(have, "host1") if !reflect.DeepEqual(want, have) { t.Fatal(test.Diff(want, have)) } diff --git a/probe/endpoint/reporter.go b/probe/endpoint/reporter.go index 98c086155..0311ad3a0 100644 --- a/probe/endpoint/reporter.go +++ b/probe/endpoint/reporter.go @@ -27,7 +27,7 @@ type Reporter struct { includeProcesses bool includeNAT bool conntracker Conntracker - natmapper *NATMapper + natMapper *natMapper reverseResolver *reverseResolver } @@ -51,9 +51,9 @@ var SpyDuration = prometheus.NewSummaryVec( func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bool) *Reporter { var ( conntracker Conntracker - natmapper *NATMapper + natMapper *natMapper ) - if ConntrackModulePresent() { + if ConntrackModulePresent() { // TODO(pb) if useConntrack { var err error if conntracker, err = NewConntracker(true); err != nil { @@ -61,8 +61,8 @@ func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bo } } if natmapperConntracker, err := NewConntracker(true, "--any-nat"); err == nil { - m := MakeNATMapper(natmapperConntracker) - natmapper = &m + m := makeNATMapper(natmapperConntracker) + natMapper = &m // TODO(pb): if we only ever use this as a pointer, newNATMapper } else { log.Printf("Failed to start conntracker for NAT mapper: %v", err) } @@ -72,18 +72,18 @@ func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bo hostName: hostName, includeProcesses: includeProcesses, conntracker: conntracker, - natmapper: natmapper, + natMapper: natMapper, reverseResolver: newReverseResolver(), } } // Stop stop stop func (r *Reporter) Stop() { - if r.conntracker != nil { + if r.conntracker != nil { // TODO(pb): this should never be nil (implies interface) r.conntracker.Stop() } - if r.natmapper != nil { - r.natmapper.Stop() + if r.natMapper != nil { // TODO(pb): this should never be nil (implies interface) + r.natMapper.Stop() } r.reverseResolver.stop() } @@ -138,8 +138,8 @@ func (r *Reporter) Report() (report.Report, error) { }) } - if r.natmapper != nil { - r.natmapper.ApplyNAT(rpt, r.hostID) + if r.natMapper != nil { // TODO(pb): should never be nil + r.natMapper.applyNAT(rpt, r.hostID) } return rpt, nil From cb40ad3a908bb7814867369fd6cc0d8088a39385 Mon Sep 17 00:00:00 2001 From: Peter Bourgon Date: Fri, 2 Oct 2015 18:38:04 +0200 Subject: [PATCH 06/10] Continued un-exporting of symbols; renames - Unexport consts, types, vars, etc. - Rename Conntracker (interface) to FlowWalker, to match its definition. - Rename conntracker (type) to conntrackWalker, to match the interface. - Move conntrack_test.go to conntrack_internal_test.go and package endpoint --- probe/endpoint/conntrack.go | 106 +++++++++--------- ...ack_test.go => conntrack_internal_test.go} | 63 +++++------ probe/endpoint/nat.go | 12 +- probe/endpoint/nat_internal_test.go | 94 +++------------- probe/endpoint/reporter.go | 24 ++-- 5 files changed, 116 insertions(+), 183 deletions(-) rename probe/endpoint/{conntrack_test.go => conntrack_internal_test.go} (65%) diff --git a/probe/endpoint/conntrack.go b/probe/endpoint/conntrack.go index 531e000cb..2671c2c3a 100644 --- a/probe/endpoint/conntrack.go +++ b/probe/endpoint/conntrack.go @@ -14,82 +14,79 @@ import ( "github.com/weaveworks/scope/common/exec" ) -// Constants exported for testing const ( modules = "/proc/modules" conntrackModule = "nf_conntrack" - XMLHeader = "\n" - ConntrackOpenTag = "\n" - TimeWait = "TIME_WAIT" - TCP = "tcp" - New = "new" - Update = "update" - Destroy = "destroy" + xmlHeader = "\n" + conntrackOpenTag = "\n" + timeWait = "TIME_WAIT" + tcpProto = "tcp" + newType = "new" + updateType = "update" + destroyType = "destroy" ) -// Layer3 - these structs are for the parsed conntrack output -type Layer3 struct { +type layer3 struct { XMLName xml.Name `xml:"layer3"` SrcIP string `xml:"src"` DstIP string `xml:"dst"` } -// Layer4 - these structs are for the parsed conntrack output -type Layer4 struct { +type layer4 struct { XMLName xml.Name `xml:"layer4"` SrcPort int `xml:"sport"` DstPort int `xml:"dport"` Proto string `xml:"protoname,attr"` } -// Meta - these structs are for the parsed conntrack output -type Meta struct { +type meta struct { XMLName xml.Name `xml:"meta"` Direction string `xml:"direction,attr"` - Layer3 Layer3 `xml:"layer3"` - Layer4 Layer4 `xml:"layer4"` + Layer3 layer3 `xml:"layer3"` + Layer4 layer4 `xml:"layer4"` ID int64 `xml:"id"` State string `xml:"state"` } -// Flow - these structs are for the parsed conntrack output -type Flow struct { +type flow struct { XMLName xml.Name `xml:"flow"` - Metas []Meta `xml:"meta"` + Metas []meta `xml:"meta"` Type string `xml:"type,attr"` - Original, Reply, Independent *Meta `xml:"-"` + Original, Reply, Independent *meta `xml:"-"` } type conntrack struct { XMLName xml.Name `xml:"conntrack"` - Flows []Flow `xml:"flow"` + Flows []flow `xml:"flow"` } -// Conntracker is something that tracks connections. -type Conntracker interface { - WalkFlows(f func(Flow)) - Stop() +// flowWalker is something that maintains flows, and provides an accessor +// method to walk them. +type flowWalker interface { + walkFlows(f func(flow)) + stop() } -// Conntracker uses the conntrack command to track network connections -type conntracker struct { +// conntrackWalker uses the conntrack command to track network connections and +// implement flowWalker. +type conntrackWalker struct { sync.Mutex cmd exec.Cmd - activeFlows map[int64]Flow // active flows in state != TIME_WAIT - bufferedFlows []Flow // flows coming out of activeFlows spend 1 walk cycle here + activeFlows map[int64]flow // active flows in state != TIME_WAIT + bufferedFlows []flow // flows coming out of activeFlows spend 1 walk cycle here existingConns bool args []string quit chan struct{} } -// NewConntracker creates and starts a new Conntracter -func NewConntracker(existingConns bool, args ...string) (Conntracker, error) { +// newConntracker creates and starts a new conntracker. +func newConntrackFlowWalker(existingConns bool, args ...string) (flowWalker, error) { if !ConntrackModulePresent() { return nil, fmt.Errorf("No conntrack module") } - result := &conntracker{ - activeFlows: map[int64]Flow{}, + result := &conntrackWalker{ + activeFlows: map[int64]flow{}, existingConns: existingConns, args: args, } @@ -121,7 +118,7 @@ var ConntrackModulePresent = func() bool { return false } -func (c *conntracker) loop() { +func (c *conntrackWalker) loop() { // conntrack can sometimes fail with ENOBUFS, when there is a particularly // high connection rate. In these cases just retry in a loop, so we can // survive the spike. For sustained loads this degrades nicely, as we @@ -139,7 +136,7 @@ func (c *conntracker) loop() { } } -func (c *conntracker) clearFlows() { +func (c *conntrackWalker) clearFlows() { c.Lock() defer c.Unlock() @@ -147,7 +144,7 @@ func (c *conntracker) clearFlows() { c.bufferedFlows = append(c.bufferedFlows, f) } - c.activeFlows = map[int64]Flow{} + c.activeFlows = map[int64]flow{} } func logPipe(prefix string, reader io.Reader) { @@ -160,7 +157,7 @@ func logPipe(prefix string, reader io.Reader) { } } -func (c *conntracker) run() { +func (c *conntrackWalker) run() { if c.existingConns { // Fork another conntrack, just to capture existing connections // for which we don't get events @@ -217,14 +214,14 @@ func (c *conntracker) run() { if line, err := reader.ReadString('\n'); err != nil { log.Printf("conntrack error: %v", err) return - } else if line != XMLHeader { + } else if line != xmlHeader { log.Printf("conntrack invalid output: '%s'", line) return } if line, err := reader.ReadString('\n'); err != nil { log.Printf("conntrack error: %v", err) return - } else if line != ConntrackOpenTag { + } else if line != conntrackOpenTag { log.Printf("conntrack invalid output: '%s'", line) return } @@ -234,7 +231,7 @@ func (c *conntracker) run() { // Now loop on the output stream decoder := xml.NewDecoder(reader) for { - var f Flow + var f flow if err := decoder.Decode(&f); err != nil { log.Printf("conntrack error: %v", err) return @@ -243,15 +240,15 @@ func (c *conntracker) run() { } } -func (c *conntracker) existingConnections() ([]Flow, error) { +func (c *conntrackWalker) existingConnections() ([]flow, error) { args := append([]string{"-L", "-o", "xml", "-p", "tcp"}, c.args...) cmd := exec.Command("conntrack", args...) stdout, err := cmd.StdoutPipe() if err != nil { - return []Flow{}, err + return []flow{}, err } if err := cmd.Start(); err != nil { - return []Flow{}, err + return []flow{}, err } defer func() { if err := cmd.Wait(); err != nil { @@ -260,15 +257,14 @@ func (c *conntracker) existingConnections() ([]Flow, error) { }() var result conntrack if err := xml.NewDecoder(stdout).Decode(&result); err == io.EOF { - return []Flow{}, nil + return []flow{}, nil } else if err != nil { - return []Flow{}, err + return []flow{}, err } return result.Flows, nil } -// Stop stop stop -func (c *conntracker) Stop() { +func (c *conntrackWalker) stop() { c.Lock() defer c.Unlock() close(c.quit) @@ -277,7 +273,7 @@ func (c *conntracker) Stop() { } } -func (c *conntracker) handleFlow(f Flow, forceAdd bool) { +func (c *conntrackWalker) handleFlow(f flow, forceAdd bool) { // A flow consists of 3 'metas' - the 'original' 4 tuple (as seen by this // host) and the 'reply' 4 tuple, which is what it has been rewritten to. // This code finds those metas, which are identified by a Direction @@ -297,7 +293,7 @@ func (c *conntracker) handleFlow(f Flow, forceAdd bool) { // For not, I'm only interested in tcp connections - there is too much udp // traffic going on (every container talking to weave dns, for example) to // render nicely. TODO: revisit this. - if f.Original.Layer4.Proto != TCP { + if f.Original.Layer4.Proto != tcpProto { return } @@ -305,14 +301,14 @@ func (c *conntracker) handleFlow(f Flow, forceAdd bool) { defer c.Unlock() switch { - case forceAdd || f.Type == New || f.Type == Update: - if f.Independent.State != TimeWait { + case forceAdd || f.Type == newType || f.Type == updateType: + if f.Independent.State != timeWait { c.activeFlows[f.Independent.ID] = f } else if _, ok := c.activeFlows[f.Independent.ID]; ok { delete(c.activeFlows, f.Independent.ID) c.bufferedFlows = append(c.bufferedFlows, f) } - case f.Type == Destroy: + case f.Type == destroyType: if _, ok := c.activeFlows[f.Independent.ID]; ok { delete(c.activeFlows, f.Independent.ID) c.bufferedFlows = append(c.bufferedFlows, f) @@ -320,9 +316,9 @@ func (c *conntracker) handleFlow(f Flow, forceAdd bool) { } } -// WalkFlows calls f with all active flows and flows that have come and gone -// since the last call to WalkFlows -func (c *conntracker) WalkFlows(f func(Flow)) { +// walkFlows calls f with all active flows and flows that have come and gone +// since the last call to walkFlows +func (c *conntrackWalker) walkFlows(f func(flow)) { c.Lock() defer c.Unlock() for _, flow := range c.activeFlows { diff --git a/probe/endpoint/conntrack_test.go b/probe/endpoint/conntrack_internal_test.go similarity index 65% rename from probe/endpoint/conntrack_test.go rename to probe/endpoint/conntrack_internal_test.go index 16b8f8e6e..fed6ca281 100644 --- a/probe/endpoint/conntrack_test.go +++ b/probe/endpoint/conntrack_internal_test.go @@ -1,4 +1,4 @@ -package endpoint_test +package endpoint import ( "bufio" @@ -8,13 +8,12 @@ import ( "time" "github.com/weaveworks/scope/common/exec" - . "github.com/weaveworks/scope/probe/endpoint" "github.com/weaveworks/scope/test" - testExec "github.com/weaveworks/scope/test/exec" + testexec "github.com/weaveworks/scope/test/exec" ) -func makeFlow(ty string) Flow { - return Flow{ +func makeFlow(ty string) flow { + return flow{ XMLName: xml.Name{ Local: "flow", }, @@ -22,46 +21,46 @@ func makeFlow(ty string) Flow { } } -func addMeta(f *Flow, dir, srcIP, dstIP string, srcPort, dstPort int) *Meta { - meta := Meta{ +func addMeta(f *flow, dir, srcIP, dstIP string, srcPort, dstPort int) *meta { + meta := meta{ XMLName: xml.Name{ Local: "meta", }, Direction: dir, - Layer3: Layer3{ + Layer3: layer3{ XMLName: xml.Name{ Local: "layer3", }, SrcIP: srcIP, DstIP: dstIP, }, - Layer4: Layer4{ + Layer4: layer4{ XMLName: xml.Name{ Local: "layer4", }, SrcPort: srcPort, DstPort: dstPort, - Proto: TCP, + Proto: tcpProto, }, } f.Metas = append(f.Metas, meta) return &meta } -func addIndependant(f *Flow, id int64, state string) *Meta { - meta := Meta{ +func addIndependant(f *flow, id int64, state string) *meta { + meta := meta{ XMLName: xml.Name{ Local: "meta", }, Direction: "independent", ID: id, State: state, - Layer3: Layer3{ + Layer3: layer3{ XMLName: xml.Name{ Local: "layer3", }, }, - Layer4: Layer4{ + Layer4: layer4{ XMLName: xml.Name{ Local: "layer4", }, @@ -81,19 +80,19 @@ func TestConntracker(t *testing.T) { reader, writer := io.Pipe() exec.Command = func(name string, args ...string) exec.Cmd { - return testExec.NewMockCmd(reader) + return testexec.NewMockCmd(reader) } - conntracker, err := NewConntracker(false) + flowWalker, err := newConntrackFlowWalker(false) if err != nil { t.Fatal(err) } bw := bufio.NewWriter(writer) - if _, err := bw.WriteString(XMLHeader); err != nil { + if _, err := bw.WriteString(xmlHeader); err != nil { t.Fatal(err) } - if _, err := bw.WriteString(ConntrackOpenTag); err != nil { + if _, err := bw.WriteString(conntrackOpenTag); err != nil { t.Fatal(err) } if err := bw.Flush(); err != nil { @@ -101,8 +100,8 @@ func TestConntracker(t *testing.T) { } have := func() interface{} { - result := []Flow{} - conntracker.WalkFlows(func(f Flow) { + result := []flow{} + flowWalker.walkFlows(func(f flow) { f.Original = nil f.Reply = nil f.Independent = nil @@ -113,11 +112,11 @@ func TestConntracker(t *testing.T) { ts := 100 * time.Millisecond // First, assert we have no flows - test.Poll(t, ts, []Flow{}, have) + test.Poll(t, ts, []flow{}, have) // Now add some flows xmlEncoder := xml.NewEncoder(bw) - writeFlow := func(f Flow) { + writeFlow := func(f flow) { if err := xmlEncoder.Encode(f); err != nil { t.Fatal(err) } @@ -129,25 +128,25 @@ func TestConntracker(t *testing.T) { } } - flow1 := makeFlow(New) + flow1 := makeFlow(newType) addMeta(&flow1, "original", "1.2.3.4", "2.3.4.5", 2, 3) addIndependant(&flow1, 1, "") writeFlow(flow1) - test.Poll(t, ts, []Flow{flow1}, have) + test.Poll(t, ts, []flow{flow1}, have) // Now check when we remove the flow, we still get it in the next Walk - flow1.Type = Destroy + flow1.Type = destroyType writeFlow(flow1) - test.Poll(t, ts, []Flow{flow1}, have) - test.Poll(t, ts, []Flow{}, have) + test.Poll(t, ts, []flow{flow1}, have) + test.Poll(t, ts, []flow{}, have) // This time we're not going to remove it, but put it in state TIME_WAIT - flow1.Type = New + flow1.Type = newType writeFlow(flow1) - test.Poll(t, ts, []Flow{flow1}, have) + test.Poll(t, ts, []flow{flow1}, have) - flow1.Metas[1].State = TimeWait + flow1.Metas[1].State = timeWait writeFlow(flow1) - test.Poll(t, ts, []Flow{flow1}, have) - test.Poll(t, ts, []Flow{}, have) + test.Poll(t, ts, []flow{flow1}, have) + test.Poll(t, ts, []flow{}, have) } diff --git a/probe/endpoint/nat.go b/probe/endpoint/nat.go index ab1b8f5f1..016dced6a 100644 --- a/probe/endpoint/nat.go +++ b/probe/endpoint/nat.go @@ -18,14 +18,14 @@ type endpointMapping struct { // natMapper rewrites a report to deal with NAT'd connections. type natMapper struct { - Conntracker + flowWalker } -func makeNATMapper(ct Conntracker) natMapper { - return natMapper{ct} +func makeNATMapper(fw flowWalker) natMapper { + return natMapper{fw} } -func toMapping(f Flow) *endpointMapping { +func toMapping(f flow) *endpointMapping { var mapping endpointMapping if f.Original.Layer3.SrcIP == f.Reply.Layer3.DstIP { mapping = endpointMapping{ @@ -49,10 +49,10 @@ func toMapping(f Flow) *endpointMapping { // applyNAT duplicates Nodes in the endpoint topology of a report, based on // the NAT table. func (n natMapper) applyNAT(rpt report.Report, scope string) { - if n.Conntracker == nil { // TODO(pb) + if n.flowWalker == nil { // TODO(pb) return } - n.Conntracker.WalkFlows(func(f Flow) { + n.flowWalker.walkFlows(func(f flow) { var ( mapping = toMapping(f) realEndpointID = report.MakeEndpointNodeID(scope, mapping.originalIP, strconv.Itoa(mapping.originalPort)) diff --git a/probe/endpoint/nat_internal_test.go b/probe/endpoint/nat_internal_test.go index 81ebb051a..105666d36 100644 --- a/probe/endpoint/nat_internal_test.go +++ b/probe/endpoint/nat_internal_test.go @@ -1,7 +1,6 @@ package endpoint import ( - "encoding/xml" "reflect" "testing" @@ -9,78 +8,17 @@ import ( "github.com/weaveworks/scope/test" ) -type mockConntracker struct { - flows []Flow +type mockFlowWalker struct { + flows []flow } -func (m *mockConntracker) WalkFlows(f func(Flow)) { +func (m *mockFlowWalker) walkFlows(f func(flow)) { for _, flow := range m.flows { f(flow) } } -func (m *mockConntracker) Stop() {} - -// TODO(pb): dedupe later -func makeFlow(ty string) Flow { - return Flow{ - XMLName: xml.Name{ - Local: "flow", - }, - Type: ty, - } -} - -// TODO(pb): dedupe later -func addMeta(f *Flow, dir, srcIP, dstIP string, srcPort, dstPort int) *Meta { - meta := Meta{ - XMLName: xml.Name{ - Local: "meta", - }, - Direction: dir, - Layer3: Layer3{ - XMLName: xml.Name{ - Local: "layer3", - }, - SrcIP: srcIP, - DstIP: dstIP, - }, - Layer4: Layer4{ - XMLName: xml.Name{ - Local: "layer4", - }, - SrcPort: srcPort, - DstPort: dstPort, - Proto: TCP, - }, - } - f.Metas = append(f.Metas, meta) - return &meta -} - -// TODO(pb): dedupe later -func addIndependant(f *Flow, id int64, state string) *Meta { - meta := Meta{ - XMLName: xml.Name{ - Local: "meta", - }, - Direction: "independent", - ID: id, - State: state, - Layer3: Layer3{ - XMLName: xml.Name{ - Local: "layer3", - }, - }, - Layer4: Layer4{ - XMLName: xml.Name{ - Local: "layer4", - }, - }, - } - f.Metas = append(f.Metas, meta) - return &meta -} +func (m *mockFlowWalker) stop() {} func TestNat(t *testing.T) { // test that two containers, on the docker network, get their connections mapped @@ -92,12 +30,12 @@ func TestNat(t *testing.T) { // from the PoV of host1 { - flow := makeFlow("") - addIndependant(&flow, 1, "") - flow.Original = addMeta(&flow, "original", "2.3.4.5", "1.2.3.4", 222222, 80) - flow.Reply = addMeta(&flow, "reply", "10.0.47.1", "2.3.4.5", 80, 222222) - ct := &mockConntracker{ - flows: []Flow{flow}, + f := makeFlow("") + addIndependant(&f, 1, "") + f.Original = addMeta(&f, "original", "2.3.4.5", "1.2.3.4", 222222, 80) + f.Reply = addMeta(&f, "reply", "10.0.47.1", "2.3.4.5", 80, 222222) + ct := &mockFlowWalker{ + flows: []flow{f}, } have := report.MakeReport() @@ -124,12 +62,12 @@ func TestNat(t *testing.T) { // form the PoV of host2 { - flow := makeFlow("") - addIndependant(&flow, 2, "") - flow.Original = addMeta(&flow, "original", "10.0.47.2", "1.2.3.4", 22222, 80) - flow.Reply = addMeta(&flow, "reply", "1.2.3.4", "2.3.4.5", 80, 22223) - ct := &mockConntracker{ - flows: []Flow{flow}, + f := makeFlow("") + addIndependant(&f, 2, "") + f.Original = addMeta(&f, "original", "10.0.47.2", "1.2.3.4", 22222, 80) + f.Reply = addMeta(&f, "reply", "1.2.3.4", "2.3.4.5", 80, 22223) + ct := &mockFlowWalker{ + flows: []flow{f}, } have := report.MakeReport() diff --git a/probe/endpoint/reporter.go b/probe/endpoint/reporter.go index 0311ad3a0..4080dd0bc 100644 --- a/probe/endpoint/reporter.go +++ b/probe/endpoint/reporter.go @@ -26,7 +26,7 @@ type Reporter struct { hostName string includeProcesses bool includeNAT bool - conntracker Conntracker + flowWalker flowWalker // interface natMapper *natMapper reverseResolver *reverseResolver } @@ -50,18 +50,18 @@ var SpyDuration = prometheus.NewSummaryVec( // with process (PID) information. func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bool) *Reporter { var ( - conntracker Conntracker - natMapper *natMapper + flowWalker flowWalker + natMapper *natMapper ) if ConntrackModulePresent() { // TODO(pb) if useConntrack { var err error - if conntracker, err = NewConntracker(true); err != nil { + if flowWalker, err = newConntrackFlowWalker(true); err != nil { log.Printf("Failed to start conntracker for endpoint reporter: %v", err) } } - if natmapperConntracker, err := NewConntracker(true, "--any-nat"); err == nil { - m := makeNATMapper(natmapperConntracker) + if natmapperFlowWalker, err := newConntrackFlowWalker(true, "--any-nat"); err == nil { + m := makeNATMapper(natmapperFlowWalker) natMapper = &m // TODO(pb): if we only ever use this as a pointer, newNATMapper } else { log.Printf("Failed to start conntracker for NAT mapper: %v", err) @@ -71,7 +71,7 @@ func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bo hostID: hostID, hostName: hostName, includeProcesses: includeProcesses, - conntracker: conntracker, + flowWalker: flowWalker, natMapper: natMapper, reverseResolver: newReverseResolver(), } @@ -79,11 +79,11 @@ func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bo // Stop stop stop func (r *Reporter) Stop() { - if r.conntracker != nil { // TODO(pb): this should never be nil (implies interface) - r.conntracker.Stop() + if r.flowWalker != nil { // TODO(pb): this should never be nil (implies interface) + r.flowWalker.stop() } if r.natMapper != nil { // TODO(pb): this should never be nil (implies interface) - r.natMapper.Stop() + r.natMapper.stop() } r.reverseResolver.stop() } @@ -123,11 +123,11 @@ func (r *Reporter) Report() (report.Report, error) { } } - if r.conntracker != nil { + if r.flowWalker != nil { extraNodeInfo := report.MakeNode().WithMetadata(report.Metadata{ Conntracked: "true", }) - r.conntracker.WalkFlows(func(f Flow) { + r.flowWalker.walkFlows(func(f flow) { var ( localPort = uint16(f.Original.Layer4.SrcPort) remotePort = uint16(f.Original.Layer4.DstPort) From a3c53aadf55c43360675f77965f890cd9a6ebcdb Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 27 Oct 2015 10:53:23 +0000 Subject: [PATCH 07/10] No more nil flow workers --- probe/endpoint/conntrack.go | 15 ++++++--- probe/endpoint/conntrack_internal_test.go | 6 +--- probe/endpoint/nat.go | 3 -- probe/endpoint/reporter.go | 41 +++++------------------ 4 files changed, 20 insertions(+), 45 deletions(-) diff --git a/probe/endpoint/conntrack.go b/probe/endpoint/conntrack.go index 2671c2c3a..2efb1f9cb 100644 --- a/probe/endpoint/conntrack.go +++ b/probe/endpoint/conntrack.go @@ -3,7 +3,6 @@ package endpoint import ( "bufio" "encoding/xml" - "fmt" "io" "log" "os" @@ -68,6 +67,11 @@ type flowWalker interface { stop() } +type nilFlowWalker struct{} + +func (n *nilFlowWalker) stop() {} +func (n *nilFlowWalker) walkFlows(f func(flow)) {} + // conntrackWalker uses the conntrack command to track network connections and // implement flowWalker. type conntrackWalker struct { @@ -81,9 +85,12 @@ type conntrackWalker struct { } // newConntracker creates and starts a new conntracker. -func newConntrackFlowWalker(existingConns bool, args ...string) (flowWalker, error) { +func newConntrackFlowWalker(useConntrack, existingConns bool, args ...string) flowWalker { if !ConntrackModulePresent() { - return nil, fmt.Errorf("No conntrack module") + log.Printf("Not using conntrack: module not present") + return &nilFlowWalker{} + } else if !useConntrack { + return &nilFlowWalker{} } result := &conntrackWalker{ activeFlows: map[int64]flow{}, @@ -91,7 +98,7 @@ func newConntrackFlowWalker(existingConns bool, args ...string) (flowWalker, err args: args, } go result.loop() - return result, nil + return result } // ConntrackModulePresent returns true if the kernel has the conntrack module diff --git a/probe/endpoint/conntrack_internal_test.go b/probe/endpoint/conntrack_internal_test.go index fed6ca281..a88eb2aa0 100644 --- a/probe/endpoint/conntrack_internal_test.go +++ b/probe/endpoint/conntrack_internal_test.go @@ -83,11 +83,7 @@ func TestConntracker(t *testing.T) { return testexec.NewMockCmd(reader) } - flowWalker, err := newConntrackFlowWalker(false) - if err != nil { - t.Fatal(err) - } - + flowWalker := newConntrackFlowWalker(true, false) bw := bufio.NewWriter(writer) if _, err := bw.WriteString(xmlHeader); err != nil { t.Fatal(err) diff --git a/probe/endpoint/nat.go b/probe/endpoint/nat.go index 016dced6a..205379477 100644 --- a/probe/endpoint/nat.go +++ b/probe/endpoint/nat.go @@ -49,9 +49,6 @@ func toMapping(f flow) *endpointMapping { // applyNAT duplicates Nodes in the endpoint topology of a report, based on // the NAT table. func (n natMapper) applyNAT(rpt report.Report, scope string) { - if n.flowWalker == nil { // TODO(pb) - return - } n.flowWalker.walkFlows(func(f flow) { var ( mapping = toMapping(f) diff --git a/probe/endpoint/reporter.go b/probe/endpoint/reporter.go index 4080dd0bc..191bb7ea5 100644 --- a/probe/endpoint/reporter.go +++ b/probe/endpoint/reporter.go @@ -1,7 +1,6 @@ package endpoint import ( - "log" "strconv" "time" @@ -27,7 +26,7 @@ type Reporter struct { includeProcesses bool includeNAT bool flowWalker flowWalker // interface - natMapper *natMapper + natMapper natMapper reverseResolver *reverseResolver } @@ -49,42 +48,20 @@ var SpyDuration = prometheus.NewSummaryVec( // is stored in the Endpoint topology. It optionally enriches that topology // with process (PID) information. func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bool) *Reporter { - var ( - flowWalker flowWalker - natMapper *natMapper - ) - if ConntrackModulePresent() { // TODO(pb) - if useConntrack { - var err error - if flowWalker, err = newConntrackFlowWalker(true); err != nil { - log.Printf("Failed to start conntracker for endpoint reporter: %v", err) - } - } - if natmapperFlowWalker, err := newConntrackFlowWalker(true, "--any-nat"); err == nil { - m := makeNATMapper(natmapperFlowWalker) - natMapper = &m // TODO(pb): if we only ever use this as a pointer, newNATMapper - } else { - log.Printf("Failed to start conntracker for NAT mapper: %v", err) - } - } return &Reporter{ hostID: hostID, hostName: hostName, includeProcesses: includeProcesses, - flowWalker: flowWalker, - natMapper: natMapper, + flowWalker: newConntrackFlowWalker(useConntrack, true), + natMapper: makeNATMapper(newConntrackFlowWalker(useConntrack, true, "--any-nat")), reverseResolver: newReverseResolver(), } } // Stop stop stop func (r *Reporter) Stop() { - if r.flowWalker != nil { // TODO(pb): this should never be nil (implies interface) - r.flowWalker.stop() - } - if r.natMapper != nil { // TODO(pb): this should never be nil (implies interface) - r.natMapper.stop() - } + r.flowWalker.stop() + r.natMapper.stop() r.reverseResolver.stop() } @@ -123,7 +100,8 @@ func (r *Reporter) Report() (report.Report, error) { } } - if r.flowWalker != nil { + // Consult the flowWalker for short-live connections + { extraNodeInfo := report.MakeNode().WithMetadata(report.Metadata{ Conntracked: "true", }) @@ -138,10 +116,7 @@ func (r *Reporter) Report() (report.Report, error) { }) } - if r.natMapper != nil { // TODO(pb): should never be nil - r.natMapper.applyNAT(rpt, r.hostID) - } - + r.natMapper.applyNAT(rpt, r.hostID) return rpt, nil } From 8b03814cb7c96ec2ba9300c0ad443cb1c66d5784 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 27 Oct 2015 11:30:40 +0000 Subject: [PATCH 08/10] Fixup failing tests due to fixture move. --- app/api_topologies_test.go | 4 +- render/expected/expected.go | 90 ++++++++++++++++++------------------- render/topologies_test.go | 18 ++++---- 3 files changed, 56 insertions(+), 56 deletions(-) diff --git a/app/api_topologies_test.go b/app/api_topologies_test.go index 3587232f0..c75206d91 100644 --- a/app/api_topologies_test.go +++ b/app/api_topologies_test.go @@ -11,7 +11,7 @@ import ( "github.com/weaveworks/scope/probe/kubernetes" "github.com/weaveworks/scope/report" - "github.com/weaveworks/scope/test" + "github.com/weaveworks/scope/test/fixture" ) func TestAPITopology(t *testing.T) { @@ -62,7 +62,7 @@ func TestAPITopologyAddsKubernetes(t *testing.T) { // Enable the kubernetes topologies rpt := report.MakeReport() rpt.Pod = report.MakeTopology() - rpt.Pod.Nodes[test.ClientPodNodeID] = kubernetes.NewPod(&api.Pod{ + rpt.Pod.Nodes[fixture.ClientPodNodeID] = kubernetes.NewPod(&api.Pod{ ObjectMeta: api.ObjectMeta{ Name: "pong-a", Namespace: "ping", diff --git a/render/expected/expected.go b/render/expected/expected.go index 28ea83cfa..f977fcf70 100644 --- a/render/expected/expected.go +++ b/render/expected/expected.go @@ -377,14 +377,14 @@ var ( Rank: "ping/pong-a", Pseudo: false, Origins: report.MakeIDList( - test.Client54001NodeID, - test.Client54002NodeID, - test.ClientProcess1NodeID, - test.ClientProcess2NodeID, - test.ClientHostNodeID, - test.ClientContainerNodeID, - test.ClientContainerImageNodeID, - test.ClientPodNodeID, + fixture.Client54001NodeID, + fixture.Client54002NodeID, + fixture.ClientProcess1NodeID, + fixture.ClientProcess2NodeID, + fixture.ClientHostNodeID, + fixture.ClientContainerNodeID, + fixture.ClientContainerImageNodeID, + fixture.ClientPodNodeID, ), Node: report.MakeNode().WithAdjacent("ping/pong-b"), EdgeMetadata: report.EdgeMetadata{ @@ -399,12 +399,12 @@ var ( Rank: "ping/pong-b", Pseudo: false, Origins: report.MakeIDList( - test.Server80NodeID, - test.ServerPodNodeID, - test.ServerProcessNodeID, - test.ServerContainerNodeID, - test.ServerHostNodeID, - test.ServerContainerImageNodeID, + fixture.Server80NodeID, + fixture.ServerPodNodeID, + fixture.ServerProcessNodeID, + fixture.ServerContainerNodeID, + fixture.ServerHostNodeID, + fixture.ServerContainerImageNodeID, ), Node: report.MakeNode(), EdgeMetadata: report.EdgeMetadata{ @@ -415,13 +415,13 @@ var ( uncontainedServerID: { ID: uncontainedServerID, LabelMajor: render.UncontainedMajor, - LabelMinor: test.ServerHostName, + LabelMinor: fixture.ServerHostName, Rank: "", Pseudo: true, Origins: report.MakeIDList( - test.ServerHostNodeID, - test.NonContainerProcessNodeID, - test.NonContainerNodeID, + fixture.ServerHostNodeID, + fixture.NonContainerProcessNodeID, + fixture.NonContainerNodeID, ), Node: report.MakeNode().WithAdjacent(render.TheInternetID), EdgeMetadata: report.EdgeMetadata{}, @@ -436,37 +436,37 @@ var ( EgressByteCount: newu64(600), }, Origins: report.MakeIDList( - test.RandomClientNodeID, - test.GoogleEndpointNodeID, + fixture.RandomClientNodeID, + fixture.GoogleEndpointNodeID, ), }, }).Prune() RenderedPodServices = (render.RenderableNodes{ "ping/pongservice": { - ID: test.ServiceID, + ID: fixture.ServiceID, LabelMajor: "pongservice", LabelMinor: "2 pods", - Rank: test.ServiceID, + Rank: fixture.ServiceID, Pseudo: false, Origins: report.MakeIDList( - test.Client54001NodeID, - test.Client54002NodeID, - test.ClientProcess1NodeID, - test.ClientProcess2NodeID, - test.ClientHostNodeID, - test.ClientContainerNodeID, - test.ClientContainerImageNodeID, - test.ClientPodNodeID, - test.Server80NodeID, - test.ServerPodNodeID, - test.ServiceNodeID, - test.ServerProcessNodeID, - test.ServerContainerNodeID, - test.ServerHostNodeID, - test.ServerContainerImageNodeID, + fixture.Client54001NodeID, + fixture.Client54002NodeID, + fixture.ClientProcess1NodeID, + fixture.ClientProcess2NodeID, + fixture.ClientHostNodeID, + fixture.ClientContainerNodeID, + fixture.ClientContainerImageNodeID, + fixture.ClientPodNodeID, + fixture.Server80NodeID, + fixture.ServerPodNodeID, + fixture.ServiceNodeID, + fixture.ServerProcessNodeID, + fixture.ServerContainerNodeID, + fixture.ServerHostNodeID, + fixture.ServerContainerImageNodeID, ), - Node: report.MakeNode().WithAdjacent(test.ServiceID), // ?? Shouldn't be adjacent to itself? + Node: report.MakeNode().WithAdjacent(fixture.ServiceID), // ?? Shouldn't be adjacent to itself? EdgeMetadata: report.EdgeMetadata{ EgressPacketCount: newu64(30), EgressByteCount: newu64(300), @@ -477,13 +477,13 @@ var ( uncontainedServerID: { ID: uncontainedServerID, LabelMajor: render.UncontainedMajor, - LabelMinor: test.ServerHostName, + LabelMinor: fixture.ServerHostName, Rank: "", Pseudo: true, Origins: report.MakeIDList( - test.ServerHostNodeID, - test.NonContainerProcessNodeID, - test.NonContainerNodeID, + fixture.ServerHostNodeID, + fixture.NonContainerProcessNodeID, + fixture.NonContainerNodeID, ), Node: report.MakeNode().WithAdjacent(render.TheInternetID), EdgeMetadata: report.EdgeMetadata{}, @@ -492,14 +492,14 @@ var ( ID: render.TheInternetID, LabelMajor: render.TheInternetMajor, Pseudo: true, - Node: report.MakeNode().WithAdjacent(test.ServiceID), + Node: report.MakeNode().WithAdjacent(fixture.ServiceID), EdgeMetadata: report.EdgeMetadata{ EgressPacketCount: newu64(60), EgressByteCount: newu64(600), }, Origins: report.MakeIDList( - test.RandomClientNodeID, - test.GoogleEndpointNodeID, + fixture.RandomClientNodeID, + fixture.GoogleEndpointNodeID, ), }, }).Prune() diff --git a/render/topologies_test.go b/render/topologies_test.go index a80e70968..2a70ce68f 100644 --- a/render/topologies_test.go +++ b/render/topologies_test.go @@ -66,7 +66,7 @@ func TestHostRenderer(t *testing.T) { } func TestPodRenderer(t *testing.T) { - have := render.PodRenderer.Render(test.Report).Prune() + have := render.PodRenderer.Render(fixture.Report).Prune() want := expected.RenderedPods if !reflect.DeepEqual(want, have) { t.Error(test.Diff(want, have)) @@ -76,22 +76,22 @@ func TestPodRenderer(t *testing.T) { func TestPodFilterRenderer(t *testing.T) { // tag on containers or pod namespace in the topology and ensure // it is filtered out correctly. - input := test.Report.Copy() - input.Pod.Nodes[test.ClientPodNodeID].Metadata[kubernetes.PodID] = "kube-system/foo" - input.Pod.Nodes[test.ClientPodNodeID].Metadata[kubernetes.Namespace] = "kube-system" - input.Pod.Nodes[test.ClientPodNodeID].Metadata[kubernetes.PodName] = "foo" - input.Container.Nodes[test.ClientContainerNodeID].Metadata[docker.LabelPrefix+"io.kubernetes.pod.name"] = "kube-system/foo" + input := fixture.Report.Copy() + input.Pod.Nodes[fixture.ClientPodNodeID].Metadata[kubernetes.PodID] = "kube-system/foo" + input.Pod.Nodes[fixture.ClientPodNodeID].Metadata[kubernetes.Namespace] = "kube-system" + input.Pod.Nodes[fixture.ClientPodNodeID].Metadata[kubernetes.PodName] = "foo" + input.Container.Nodes[fixture.ClientContainerNodeID].Metadata[docker.LabelPrefix+"io.kubernetes.pod.name"] = "kube-system/foo" have := render.FilterSystem(render.PodRenderer).Render(input).Prune() want := expected.RenderedPods.Copy() - delete(want, test.ClientPodID) - delete(want, test.ClientContainerID) + delete(want, fixture.ClientPodID) + delete(want, fixture.ClientContainerID) if !reflect.DeepEqual(want, have) { t.Error(test.Diff(want, have)) } } func TestPodServiceRenderer(t *testing.T) { - have := render.PodServiceRenderer.Render(test.Report).Prune() + have := render.PodServiceRenderer.Render(fixture.Report).Prune() want := expected.RenderedPodServices if !reflect.DeepEqual(want, have) { t.Error(test.Diff(want, have)) From 07af26e05bc9557433c4716e074d6729aae82800 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 27 Oct 2015 11:39:38 +0000 Subject: [PATCH 09/10] Review feedback --- probe/endpoint/conntrack.go | 36 ++++++++++------------- probe/endpoint/conntrack_internal_test.go | 27 ++++++++++++++++- probe/endpoint/reporter.go | 4 +-- 3 files changed, 44 insertions(+), 23 deletions(-) diff --git a/probe/endpoint/conntrack.go b/probe/endpoint/conntrack.go index 2efb1f9cb..86fe814a0 100644 --- a/probe/endpoint/conntrack.go +++ b/probe/endpoint/conntrack.go @@ -69,8 +69,8 @@ type flowWalker interface { type nilFlowWalker struct{} -func (n *nilFlowWalker) stop() {} -func (n *nilFlowWalker) walkFlows(f func(flow)) {} +func (n nilFlowWalker) stop() {} +func (n nilFlowWalker) walkFlows(f func(flow)) {} // conntrackWalker uses the conntrack command to track network connections and // implement flowWalker. @@ -79,23 +79,21 @@ type conntrackWalker struct { cmd exec.Cmd activeFlows map[int64]flow // active flows in state != TIME_WAIT bufferedFlows []flow // flows coming out of activeFlows spend 1 walk cycle here - existingConns bool args []string quit chan struct{} } // newConntracker creates and starts a new conntracker. -func newConntrackFlowWalker(useConntrack, existingConns bool, args ...string) flowWalker { +func newConntrackFlowWalker(useConntrack bool, args ...string) flowWalker { if !ConntrackModulePresent() { log.Printf("Not using conntrack: module not present") - return &nilFlowWalker{} + return nilFlowWalker{} } else if !useConntrack { - return &nilFlowWalker{} + return nilFlowWalker{} } result := &conntrackWalker{ - activeFlows: map[int64]flow{}, - existingConns: existingConns, - args: args, + activeFlows: map[int64]flow{}, + args: args, } go result.loop() return result @@ -165,17 +163,15 @@ func logPipe(prefix string, reader io.Reader) { } func (c *conntrackWalker) run() { - if c.existingConns { - // Fork another conntrack, just to capture existing connections - // for which we don't get events - existingFlows, err := c.existingConnections() - if err != nil { - log.Printf("conntrack existingConnections error: %v", err) - return - } - for _, flow := range existingFlows { - c.handleFlow(flow, true) - } + // Fork another conntrack, just to capture existing connections + // for which we don't get events + existingFlows, err := c.existingConnections() + if err != nil { + log.Printf("conntrack existingConnections error: %v", err) + return + } + for _, flow := range existingFlows { + c.handleFlow(flow, true) } args := append([]string{"-E", "-o", "xml", "-p", "tcp"}, c.args...) diff --git a/probe/endpoint/conntrack_internal_test.go b/probe/endpoint/conntrack_internal_test.go index a88eb2aa0..278b1ce30 100644 --- a/probe/endpoint/conntrack_internal_test.go +++ b/probe/endpoint/conntrack_internal_test.go @@ -12,6 +12,8 @@ import ( testexec "github.com/weaveworks/scope/test/exec" ) +const conntrackCloseTag = "\n" + func makeFlow(ty string) flow { return flow{ XMLName: xml.Name{ @@ -78,12 +80,35 @@ func TestConntracker(t *testing.T) { return true } + first := true + existingConnectionsReader, existingConnectionsWriter := io.Pipe() reader, writer := io.Pipe() exec.Command = func(name string, args ...string) exec.Cmd { + if first { + first = false + return testexec.NewMockCmd(existingConnectionsReader) + } return testexec.NewMockCmd(reader) } - flowWalker := newConntrackFlowWalker(true, false) + flowWalker := newConntrackFlowWalker(true) + + // First write out some empty xml for the existing connections + ecbw := bufio.NewWriter(existingConnectionsWriter) + if _, err := ecbw.WriteString(xmlHeader); err != nil { + t.Fatal(err) + } + if _, err := ecbw.WriteString(conntrackOpenTag); err != nil { + t.Fatal(err) + } + if _, err := ecbw.WriteString(conntrackCloseTag); err != nil { + t.Fatal(err) + } + if err := ecbw.Flush(); err != nil { + t.Fatal(err) + } + + // Then write out eventa bw := bufio.NewWriter(writer) if _, err := bw.WriteString(xmlHeader); err != nil { t.Fatal(err) diff --git a/probe/endpoint/reporter.go b/probe/endpoint/reporter.go index 191bb7ea5..24ad580eb 100644 --- a/probe/endpoint/reporter.go +++ b/probe/endpoint/reporter.go @@ -52,8 +52,8 @@ func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bo hostID: hostID, hostName: hostName, includeProcesses: includeProcesses, - flowWalker: newConntrackFlowWalker(useConntrack, true), - natMapper: makeNATMapper(newConntrackFlowWalker(useConntrack, true, "--any-nat")), + flowWalker: newConntrackFlowWalker(useConntrack), + natMapper: makeNATMapper(newConntrackFlowWalker(useConntrack, "--any-nat")), reverseResolver: newReverseResolver(), } } From 5b32aa68b714b61558e454001e80009319005ea2 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 27 Oct 2015 13:17:35 +0000 Subject: [PATCH 10/10] Squashed 'tools/' changes from 4fa0c68..d6ea3ad d6ea3ad Use git diff to detect changes in rebuild-image git-subtree-dir: tools git-subtree-split: d6ea3ad3799222fe91266e8ec0789e8284017ed6 --- rebuild-image | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebuild-image b/rebuild-image index 0e582b0ba..0eb3cff3c 100755 --- a/rebuild-image +++ b/rebuild-image @@ -29,7 +29,7 @@ cached_image_rev() { has_changes() { local rev1=$1 local rev2=$2 - local changes=$(git log --oneline $rev1..$rev2 -- $INPUTFILES | wc -l) + local changes=$(git diff --oneline $rev1..$rev2 -- $INPUTFILES | wc -l) [ "$changes" -gt 0 ] }