From 7da599194d6f773e8f48d2a8bc8521abdfb82f6c Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 23 Sep 2015 11:01:29 +0000 Subject: [PATCH] Test the nat mapper. --- probe/endpoint/conntrack.go | 22 +++++--- probe/endpoint/conntrack_test.go | 90 ++++++++++++++++++-------------- probe/endpoint/nat.go | 14 ++--- probe/endpoint/nat_test.go | 64 +++++++++++++++++++++++ probe/endpoint/reporter.go | 12 ++--- 5 files changed, 142 insertions(+), 60 deletions(-) create mode 100644 probe/endpoint/nat_test.go diff --git a/probe/endpoint/conntrack.go b/probe/endpoint/conntrack.go index 1f3d87613..149f6896e 100644 --- a/probe/endpoint/conntrack.go +++ b/probe/endpoint/conntrack.go @@ -65,8 +65,14 @@ type conntrack struct { Flows []Flow `xml:"flow"` } +// Conntracker is somethin that tracks connections. +type Conntracker interface { + WalkFlows(f func(Flow)) + Stop() +} + // Conntracker uses the conntrack command to track network connections -type Conntracker struct { +type conntracker struct { sync.Mutex cmd exec.Cmd activeFlows map[int64]Flow // active flows in state != TIME_WAIT @@ -75,11 +81,11 @@ type Conntracker struct { } // NewConntracker creates and starts a new Conntracter -func NewConntracker(existingConns bool, args ...string) (*Conntracker, error) { +var NewConntracker = func(existingConns bool, args ...string) (Conntracker, error) { if !ConntrackModulePresent() { return nil, fmt.Errorf("No conntrack module") } - result := &Conntracker{ + result := &conntracker{ activeFlows: map[int64]Flow{}, existingConns: existingConns, } @@ -112,7 +118,7 @@ var ConntrackModulePresent = func() bool { } // NB this is not re-entrant! -func (c *Conntracker) run(args ...string) { +func (c *conntracker) run(args ...string) { if c.existingConns { // Fork another conntrack, just to capture existing connections // for which we don't get events @@ -178,7 +184,7 @@ func (c *Conntracker) run(args ...string) { } } -func (c *Conntracker) existingConnections(args ...string) ([]Flow, error) { +func (c *conntracker) existingConnections(args ...string) ([]Flow, error) { args = append([]string{"-L", "-o", "xml", "-p", "tcp"}, args...) cmd := exec.Command("conntrack", args...) stdout, err := cmd.StdoutPipe() @@ -203,7 +209,7 @@ func (c *Conntracker) existingConnections(args ...string) ([]Flow, error) { } // Stop stop stop -func (c *Conntracker) Stop() { +func (c *conntracker) Stop() { c.Lock() defer c.Unlock() if c.cmd == nil { @@ -215,7 +221,7 @@ func (c *Conntracker) Stop() { } } -func (c *Conntracker) handleFlow(f Flow, forceAdd bool) { +func (c *conntracker) handleFlow(f Flow, forceAdd bool) { // A flow consists of 3 'metas' - the 'original' 4 tuple (as seen by this // host) and the 'reply' 4 tuple, which is what it has been rewritten to. // This code finds those metas, which are identified by a Direction @@ -260,7 +266,7 @@ func (c *Conntracker) handleFlow(f Flow, forceAdd bool) { // WalkFlows calls f with all active flows and flows that have come and gone // since the last call to WalkFlows -func (c *Conntracker) WalkFlows(f func(Flow)) { +func (c *conntracker) WalkFlows(f func(Flow)) { c.Lock() defer c.Unlock() for _, flow := range c.activeFlows { diff --git a/probe/endpoint/conntrack_test.go b/probe/endpoint/conntrack_test.go index 6b5b03f09..16b8f8e6e 100644 --- a/probe/endpoint/conntrack_test.go +++ b/probe/endpoint/conntrack_test.go @@ -13,54 +13,62 @@ import ( testExec "github.com/weaveworks/scope/test/exec" ) -func makeFlow(id int64, srcIP, dstIP string, srcPort, dstPort int, ty, state string) Flow { +func makeFlow(ty string) Flow { return Flow{ XMLName: xml.Name{ Local: "flow", }, Type: ty, - Metas: []Meta{ - { - XMLName: xml.Name{ - Local: "meta", - }, - Direction: "original", - Layer3: Layer3{ - XMLName: xml.Name{ - Local: "layer3", - }, - SrcIP: srcIP, - DstIP: dstIP, - }, - Layer4: Layer4{ - XMLName: xml.Name{ - Local: "layer4", - }, - SrcPort: srcPort, - DstPort: dstPort, - Proto: TCP, - }, + } +} + +func addMeta(f *Flow, dir, srcIP, dstIP string, srcPort, dstPort int) *Meta { + meta := Meta{ + XMLName: xml.Name{ + Local: "meta", + }, + Direction: dir, + Layer3: Layer3{ + XMLName: xml.Name{ + Local: "layer3", }, - { - XMLName: xml.Name{ - Local: "meta", - }, - Direction: "independent", - ID: id, - State: state, - Layer3: Layer3{ - XMLName: xml.Name{ - Local: "layer3", - }, - }, - Layer4: Layer4{ - XMLName: xml.Name{ - Local: "layer4", - }, - }, + SrcIP: srcIP, + DstIP: dstIP, + }, + Layer4: Layer4{ + XMLName: xml.Name{ + Local: "layer4", + }, + SrcPort: srcPort, + DstPort: dstPort, + Proto: TCP, + }, + } + f.Metas = append(f.Metas, meta) + return &meta +} + +func addIndependant(f *Flow, id int64, state string) *Meta { + meta := Meta{ + XMLName: xml.Name{ + Local: "meta", + }, + Direction: "independent", + ID: id, + State: state, + Layer3: Layer3{ + XMLName: xml.Name{ + Local: "layer3", + }, + }, + Layer4: Layer4{ + XMLName: xml.Name{ + Local: "layer4", }, }, } + f.Metas = append(f.Metas, meta) + return &meta } func TestConntracker(t *testing.T) { @@ -121,7 +129,9 @@ func TestConntracker(t *testing.T) { } } - flow1 := makeFlow(1, "1.2.3.4", "2.3.4.5", 2, 3, New, "") + flow1 := makeFlow(New) + addMeta(&flow1, "original", "1.2.3.4", "2.3.4.5", 2, 3) + addIndependant(&flow1, 1, "") writeFlow(flow1) test.Poll(t, ts, []Flow{flow1}, have) diff --git a/probe/endpoint/nat.go b/probe/endpoint/nat.go index d3531f438..023f0f4c1 100644 --- a/probe/endpoint/nat.go +++ b/probe/endpoint/nat.go @@ -16,16 +16,18 @@ type endpointMapping struct { rewrittenPort int } -type natmapper struct { - *Conntracker +// NATMapper rewrites a report to deal with NAT's connections +type NATMapper struct { + Conntracker } -func newNATMapper() (*natmapper, error) { +// NewNATMapper is exposed for testing +func NewNATMapper() (*NATMapper, error) { ct, err := NewConntracker(true, "--any-nat") if err != nil { return nil, err } - return &natmapper{ct}, nil + return &NATMapper{ct}, nil } func toMapping(f Flow) *endpointMapping { @@ -49,9 +51,9 @@ func toMapping(f Flow) *endpointMapping { return &mapping } -// applyNAT duplicates Nodes in the endpoint topology of a +// ApplyNAT duplicates Nodes in the endpoint topology of a // report, based on the NAT table as returns by natTable. -func (n *natmapper) applyNAT(rpt report.Report, scope string) { +func (n *NATMapper) ApplyNAT(rpt report.Report, scope string) { n.WalkFlows(func(f Flow) { var ( mapping = toMapping(f) diff --git a/probe/endpoint/nat_test.go b/probe/endpoint/nat_test.go new file mode 100644 index 000000000..8858b07c3 --- /dev/null +++ b/probe/endpoint/nat_test.go @@ -0,0 +1,64 @@ +package endpoint_test + +import ( + "reflect" + "testing" + + "github.com/weaveworks/scope/probe/endpoint" + "github.com/weaveworks/scope/report" + "github.com/weaveworks/scope/test" +) + +type mockConntracker struct { + flows []endpoint.Flow +} + +func (m *mockConntracker) WalkFlows(f func(endpoint.Flow)) { + for _, flow := range m.flows { + f(flow) + } +} + +func (m *mockConntracker) Stop() {} + +func TestNat(t *testing.T) { + oldNewConntracker := endpoint.NewConntracker + defer func() { endpoint.NewConntracker = oldNewConntracker }() + + endpoint.NewConntracker = func(existingConns bool, args ...string) (endpoint.Conntracker, error) { + flow := makeFlow("") + addIndependant(&flow, 1, "") + flow.Original = addMeta(&flow, "original", "10.0.47.1", "2.3.4.5", 80, 22222) + flow.Reply = addMeta(&flow, "reply", "2.3.4.5", "1.2.3.4", 22222, 80) + + return &mockConntracker{ + flows: []endpoint.Flow{flow}, + }, nil + } + + have := report.MakeReport() + originalID := report.MakeEndpointNodeID("host1", "10.0.47.1", "80") + have.Endpoint.AddNode(originalID, report.MakeNodeWith(report.Metadata{ + endpoint.Addr: "10.0.47.1", + endpoint.Port: "80", + "foo": "bar", + })) + + want := have.Copy() + want.Endpoint.AddNode(report.MakeEndpointNodeID("host1", "1.2.3.4", "80"), report.MakeNodeWith(report.Metadata{ + endpoint.Addr: "1.2.3.4", + endpoint.Port: "80", + "copy_of": originalID, + "foo": "bar", + })) + + natmapper, err := endpoint.NewNATMapper() + if err != nil { + t.Fatal(err) + } + + natmapper.ApplyNAT(have, "host1") + if !reflect.DeepEqual(want, have) { + t.Fatal(test.Diff(want, have)) + } +} diff --git a/probe/endpoint/reporter.go b/probe/endpoint/reporter.go index 8be759b8a..9cd45725d 100644 --- a/probe/endpoint/reporter.go +++ b/probe/endpoint/reporter.go @@ -26,8 +26,8 @@ type Reporter struct { hostName string includeProcesses bool includeNAT bool - conntracker *Conntracker - natmapper *natmapper + conntracker Conntracker + natmapper *NATMapper revResolver *ReverseResolver } @@ -51,8 +51,8 @@ var SpyDuration = prometheus.NewSummaryVec( func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bool) *Reporter { var ( conntrackModulePresent = ConntrackModulePresent() - conntracker *Conntracker - natmapper *natmapper + conntracker Conntracker + natmapper *NATMapper err error ) if conntrackModulePresent && useConntrack { @@ -62,7 +62,7 @@ func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bo } } if conntrackModulePresent { - natmapper, err = newNATMapper() + natmapper, err = NewNATMapper() if err != nil { log.Printf("Failed to start natMapper: %v", err) } @@ -139,7 +139,7 @@ func (r *Reporter) Report() (report.Report, error) { } if r.natmapper != nil { - r.natmapper.applyNAT(rpt, r.hostID) + r.natmapper.ApplyNAT(rpt, r.hostID) } return rpt, nil