From cb40ad3a908bb7814867369fd6cc0d8088a39385 Mon Sep 17 00:00:00 2001 From: Peter Bourgon Date: Fri, 2 Oct 2015 18:38:04 +0200 Subject: [PATCH] Continued un-exporting of symbols; renames - Unexport consts, types, vars, etc. - Rename Conntracker (interface) to FlowWalker, to match its definition. - Rename conntracker (type) to conntrackWalker, to match the interface. - Move conntrack_test.go to conntrack_internal_test.go and package endpoint --- probe/endpoint/conntrack.go | 106 +++++++++--------- ...ack_test.go => conntrack_internal_test.go} | 63 +++++------ probe/endpoint/nat.go | 12 +- probe/endpoint/nat_internal_test.go | 94 +++------------- probe/endpoint/reporter.go | 24 ++-- 5 files changed, 116 insertions(+), 183 deletions(-) rename probe/endpoint/{conntrack_test.go => conntrack_internal_test.go} (65%) diff --git a/probe/endpoint/conntrack.go b/probe/endpoint/conntrack.go index 531e000cb..2671c2c3a 100644 --- a/probe/endpoint/conntrack.go +++ b/probe/endpoint/conntrack.go @@ -14,82 +14,79 @@ import ( "github.com/weaveworks/scope/common/exec" ) -// Constants exported for testing const ( modules = "/proc/modules" conntrackModule = "nf_conntrack" - XMLHeader = "\n" - ConntrackOpenTag = "\n" - TimeWait = "TIME_WAIT" - TCP = "tcp" - New = "new" - Update = "update" - Destroy = "destroy" + xmlHeader = "\n" + conntrackOpenTag = "\n" + timeWait = "TIME_WAIT" + tcpProto = "tcp" + newType = "new" + updateType = "update" + destroyType = "destroy" ) -// Layer3 - these structs are for the parsed conntrack output -type Layer3 struct { +type layer3 struct { XMLName xml.Name `xml:"layer3"` SrcIP string `xml:"src"` DstIP string `xml:"dst"` } -// Layer4 - these structs are for the parsed conntrack output -type Layer4 struct { +type layer4 struct { XMLName xml.Name `xml:"layer4"` SrcPort int `xml:"sport"` DstPort int `xml:"dport"` Proto string `xml:"protoname,attr"` } -// Meta - these structs are for the parsed conntrack output -type Meta struct { +type meta struct { XMLName xml.Name `xml:"meta"` Direction string `xml:"direction,attr"` - Layer3 Layer3 `xml:"layer3"` - Layer4 Layer4 `xml:"layer4"` + Layer3 layer3 `xml:"layer3"` + Layer4 layer4 `xml:"layer4"` ID int64 `xml:"id"` State string `xml:"state"` } -// Flow - these structs are for the parsed conntrack output -type Flow struct { +type flow struct { XMLName xml.Name `xml:"flow"` - Metas []Meta `xml:"meta"` + Metas []meta `xml:"meta"` Type string `xml:"type,attr"` - Original, Reply, Independent *Meta `xml:"-"` + Original, Reply, Independent *meta `xml:"-"` } type conntrack struct { XMLName xml.Name `xml:"conntrack"` - Flows []Flow `xml:"flow"` + Flows []flow `xml:"flow"` } -// Conntracker is something that tracks connections. -type Conntracker interface { - WalkFlows(f func(Flow)) - Stop() +// flowWalker is something that maintains flows, and provides an accessor +// method to walk them. +type flowWalker interface { + walkFlows(f func(flow)) + stop() } -// Conntracker uses the conntrack command to track network connections -type conntracker struct { +// conntrackWalker uses the conntrack command to track network connections and +// implement flowWalker. +type conntrackWalker struct { sync.Mutex cmd exec.Cmd - activeFlows map[int64]Flow // active flows in state != TIME_WAIT - bufferedFlows []Flow // flows coming out of activeFlows spend 1 walk cycle here + activeFlows map[int64]flow // active flows in state != TIME_WAIT + bufferedFlows []flow // flows coming out of activeFlows spend 1 walk cycle here existingConns bool args []string quit chan struct{} } -// NewConntracker creates and starts a new Conntracter -func NewConntracker(existingConns bool, args ...string) (Conntracker, error) { +// newConntracker creates and starts a new conntracker. +func newConntrackFlowWalker(existingConns bool, args ...string) (flowWalker, error) { if !ConntrackModulePresent() { return nil, fmt.Errorf("No conntrack module") } - result := &conntracker{ - activeFlows: map[int64]Flow{}, + result := &conntrackWalker{ + activeFlows: map[int64]flow{}, existingConns: existingConns, args: args, } @@ -121,7 +118,7 @@ var ConntrackModulePresent = func() bool { return false } -func (c *conntracker) loop() { +func (c *conntrackWalker) loop() { // conntrack can sometimes fail with ENOBUFS, when there is a particularly // high connection rate. In these cases just retry in a loop, so we can // survive the spike. For sustained loads this degrades nicely, as we @@ -139,7 +136,7 @@ func (c *conntracker) loop() { } } -func (c *conntracker) clearFlows() { +func (c *conntrackWalker) clearFlows() { c.Lock() defer c.Unlock() @@ -147,7 +144,7 @@ func (c *conntracker) clearFlows() { c.bufferedFlows = append(c.bufferedFlows, f) } - c.activeFlows = map[int64]Flow{} + c.activeFlows = map[int64]flow{} } func logPipe(prefix string, reader io.Reader) { @@ -160,7 +157,7 @@ func logPipe(prefix string, reader io.Reader) { } } -func (c *conntracker) run() { +func (c *conntrackWalker) run() { if c.existingConns { // Fork another conntrack, just to capture existing connections // for which we don't get events @@ -217,14 +214,14 @@ func (c *conntracker) run() { if line, err := reader.ReadString('\n'); err != nil { log.Printf("conntrack error: %v", err) return - } else if line != XMLHeader { + } else if line != xmlHeader { log.Printf("conntrack invalid output: '%s'", line) return } if line, err := reader.ReadString('\n'); err != nil { log.Printf("conntrack error: %v", err) return - } else if line != ConntrackOpenTag { + } else if line != conntrackOpenTag { log.Printf("conntrack invalid output: '%s'", line) return } @@ -234,7 +231,7 @@ func (c *conntracker) run() { // Now loop on the output stream decoder := xml.NewDecoder(reader) for { - var f Flow + var f flow if err := decoder.Decode(&f); err != nil { log.Printf("conntrack error: %v", err) return @@ -243,15 +240,15 @@ func (c *conntracker) run() { } } -func (c *conntracker) existingConnections() ([]Flow, error) { +func (c *conntrackWalker) existingConnections() ([]flow, error) { args := append([]string{"-L", "-o", "xml", "-p", "tcp"}, c.args...) cmd := exec.Command("conntrack", args...) stdout, err := cmd.StdoutPipe() if err != nil { - return []Flow{}, err + return []flow{}, err } if err := cmd.Start(); err != nil { - return []Flow{}, err + return []flow{}, err } defer func() { if err := cmd.Wait(); err != nil { @@ -260,15 +257,14 @@ func (c *conntracker) existingConnections() ([]Flow, error) { }() var result conntrack if err := xml.NewDecoder(stdout).Decode(&result); err == io.EOF { - return []Flow{}, nil + return []flow{}, nil } else if err != nil { - return []Flow{}, err + return []flow{}, err } return result.Flows, nil } -// Stop stop stop -func (c *conntracker) Stop() { +func (c *conntrackWalker) stop() { c.Lock() defer c.Unlock() close(c.quit) @@ -277,7 +273,7 @@ func (c *conntracker) Stop() { } } -func (c *conntracker) handleFlow(f Flow, forceAdd bool) { +func (c *conntrackWalker) handleFlow(f flow, forceAdd bool) { // A flow consists of 3 'metas' - the 'original' 4 tuple (as seen by this // host) and the 'reply' 4 tuple, which is what it has been rewritten to. // This code finds those metas, which are identified by a Direction @@ -297,7 +293,7 @@ func (c *conntracker) handleFlow(f Flow, forceAdd bool) { // For not, I'm only interested in tcp connections - there is too much udp // traffic going on (every container talking to weave dns, for example) to // render nicely. TODO: revisit this. - if f.Original.Layer4.Proto != TCP { + if f.Original.Layer4.Proto != tcpProto { return } @@ -305,14 +301,14 @@ func (c *conntracker) handleFlow(f Flow, forceAdd bool) { defer c.Unlock() switch { - case forceAdd || f.Type == New || f.Type == Update: - if f.Independent.State != TimeWait { + case forceAdd || f.Type == newType || f.Type == updateType: + if f.Independent.State != timeWait { c.activeFlows[f.Independent.ID] = f } else if _, ok := c.activeFlows[f.Independent.ID]; ok { delete(c.activeFlows, f.Independent.ID) c.bufferedFlows = append(c.bufferedFlows, f) } - case f.Type == Destroy: + case f.Type == destroyType: if _, ok := c.activeFlows[f.Independent.ID]; ok { delete(c.activeFlows, f.Independent.ID) c.bufferedFlows = append(c.bufferedFlows, f) @@ -320,9 +316,9 @@ func (c *conntracker) handleFlow(f Flow, forceAdd bool) { } } -// WalkFlows calls f with all active flows and flows that have come and gone -// since the last call to WalkFlows -func (c *conntracker) WalkFlows(f func(Flow)) { +// walkFlows calls f with all active flows and flows that have come and gone +// since the last call to walkFlows +func (c *conntrackWalker) walkFlows(f func(flow)) { c.Lock() defer c.Unlock() for _, flow := range c.activeFlows { diff --git a/probe/endpoint/conntrack_test.go b/probe/endpoint/conntrack_internal_test.go similarity index 65% rename from probe/endpoint/conntrack_test.go rename to probe/endpoint/conntrack_internal_test.go index 16b8f8e6e..fed6ca281 100644 --- a/probe/endpoint/conntrack_test.go +++ b/probe/endpoint/conntrack_internal_test.go @@ -1,4 +1,4 @@ -package endpoint_test +package endpoint import ( "bufio" @@ -8,13 +8,12 @@ import ( "time" "github.com/weaveworks/scope/common/exec" - . "github.com/weaveworks/scope/probe/endpoint" "github.com/weaveworks/scope/test" - testExec "github.com/weaveworks/scope/test/exec" + testexec "github.com/weaveworks/scope/test/exec" ) -func makeFlow(ty string) Flow { - return Flow{ +func makeFlow(ty string) flow { + return flow{ XMLName: xml.Name{ Local: "flow", }, @@ -22,46 +21,46 @@ func makeFlow(ty string) Flow { } } -func addMeta(f *Flow, dir, srcIP, dstIP string, srcPort, dstPort int) *Meta { - meta := Meta{ +func addMeta(f *flow, dir, srcIP, dstIP string, srcPort, dstPort int) *meta { + meta := meta{ XMLName: xml.Name{ Local: "meta", }, Direction: dir, - Layer3: Layer3{ + Layer3: layer3{ XMLName: xml.Name{ Local: "layer3", }, SrcIP: srcIP, DstIP: dstIP, }, - Layer4: Layer4{ + Layer4: layer4{ XMLName: xml.Name{ Local: "layer4", }, SrcPort: srcPort, DstPort: dstPort, - Proto: TCP, + Proto: tcpProto, }, } f.Metas = append(f.Metas, meta) return &meta } -func addIndependant(f *Flow, id int64, state string) *Meta { - meta := Meta{ +func addIndependant(f *flow, id int64, state string) *meta { + meta := meta{ XMLName: xml.Name{ Local: "meta", }, Direction: "independent", ID: id, State: state, - Layer3: Layer3{ + Layer3: layer3{ XMLName: xml.Name{ Local: "layer3", }, }, - Layer4: Layer4{ + Layer4: layer4{ XMLName: xml.Name{ Local: "layer4", }, @@ -81,19 +80,19 @@ func TestConntracker(t *testing.T) { reader, writer := io.Pipe() exec.Command = func(name string, args ...string) exec.Cmd { - return testExec.NewMockCmd(reader) + return testexec.NewMockCmd(reader) } - conntracker, err := NewConntracker(false) + flowWalker, err := newConntrackFlowWalker(false) if err != nil { t.Fatal(err) } bw := bufio.NewWriter(writer) - if _, err := bw.WriteString(XMLHeader); err != nil { + if _, err := bw.WriteString(xmlHeader); err != nil { t.Fatal(err) } - if _, err := bw.WriteString(ConntrackOpenTag); err != nil { + if _, err := bw.WriteString(conntrackOpenTag); err != nil { t.Fatal(err) } if err := bw.Flush(); err != nil { @@ -101,8 +100,8 @@ func TestConntracker(t *testing.T) { } have := func() interface{} { - result := []Flow{} - conntracker.WalkFlows(func(f Flow) { + result := []flow{} + flowWalker.walkFlows(func(f flow) { f.Original = nil f.Reply = nil f.Independent = nil @@ -113,11 +112,11 @@ func TestConntracker(t *testing.T) { ts := 100 * time.Millisecond // First, assert we have no flows - test.Poll(t, ts, []Flow{}, have) + test.Poll(t, ts, []flow{}, have) // Now add some flows xmlEncoder := xml.NewEncoder(bw) - writeFlow := func(f Flow) { + writeFlow := func(f flow) { if err := xmlEncoder.Encode(f); err != nil { t.Fatal(err) } @@ -129,25 +128,25 @@ func TestConntracker(t *testing.T) { } } - flow1 := makeFlow(New) + flow1 := makeFlow(newType) addMeta(&flow1, "original", "1.2.3.4", "2.3.4.5", 2, 3) addIndependant(&flow1, 1, "") writeFlow(flow1) - test.Poll(t, ts, []Flow{flow1}, have) + test.Poll(t, ts, []flow{flow1}, have) // Now check when we remove the flow, we still get it in the next Walk - flow1.Type = Destroy + flow1.Type = destroyType writeFlow(flow1) - test.Poll(t, ts, []Flow{flow1}, have) - test.Poll(t, ts, []Flow{}, have) + test.Poll(t, ts, []flow{flow1}, have) + test.Poll(t, ts, []flow{}, have) // This time we're not going to remove it, but put it in state TIME_WAIT - flow1.Type = New + flow1.Type = newType writeFlow(flow1) - test.Poll(t, ts, []Flow{flow1}, have) + test.Poll(t, ts, []flow{flow1}, have) - flow1.Metas[1].State = TimeWait + flow1.Metas[1].State = timeWait writeFlow(flow1) - test.Poll(t, ts, []Flow{flow1}, have) - test.Poll(t, ts, []Flow{}, have) + test.Poll(t, ts, []flow{flow1}, have) + test.Poll(t, ts, []flow{}, have) } diff --git a/probe/endpoint/nat.go b/probe/endpoint/nat.go index ab1b8f5f1..016dced6a 100644 --- a/probe/endpoint/nat.go +++ b/probe/endpoint/nat.go @@ -18,14 +18,14 @@ type endpointMapping struct { // natMapper rewrites a report to deal with NAT'd connections. type natMapper struct { - Conntracker + flowWalker } -func makeNATMapper(ct Conntracker) natMapper { - return natMapper{ct} +func makeNATMapper(fw flowWalker) natMapper { + return natMapper{fw} } -func toMapping(f Flow) *endpointMapping { +func toMapping(f flow) *endpointMapping { var mapping endpointMapping if f.Original.Layer3.SrcIP == f.Reply.Layer3.DstIP { mapping = endpointMapping{ @@ -49,10 +49,10 @@ func toMapping(f Flow) *endpointMapping { // applyNAT duplicates Nodes in the endpoint topology of a report, based on // the NAT table. func (n natMapper) applyNAT(rpt report.Report, scope string) { - if n.Conntracker == nil { // TODO(pb) + if n.flowWalker == nil { // TODO(pb) return } - n.Conntracker.WalkFlows(func(f Flow) { + n.flowWalker.walkFlows(func(f flow) { var ( mapping = toMapping(f) realEndpointID = report.MakeEndpointNodeID(scope, mapping.originalIP, strconv.Itoa(mapping.originalPort)) diff --git a/probe/endpoint/nat_internal_test.go b/probe/endpoint/nat_internal_test.go index 81ebb051a..105666d36 100644 --- a/probe/endpoint/nat_internal_test.go +++ b/probe/endpoint/nat_internal_test.go @@ -1,7 +1,6 @@ package endpoint import ( - "encoding/xml" "reflect" "testing" @@ -9,78 +8,17 @@ import ( "github.com/weaveworks/scope/test" ) -type mockConntracker struct { - flows []Flow +type mockFlowWalker struct { + flows []flow } -func (m *mockConntracker) WalkFlows(f func(Flow)) { +func (m *mockFlowWalker) walkFlows(f func(flow)) { for _, flow := range m.flows { f(flow) } } -func (m *mockConntracker) Stop() {} - -// TODO(pb): dedupe later -func makeFlow(ty string) Flow { - return Flow{ - XMLName: xml.Name{ - Local: "flow", - }, - Type: ty, - } -} - -// TODO(pb): dedupe later -func addMeta(f *Flow, dir, srcIP, dstIP string, srcPort, dstPort int) *Meta { - meta := Meta{ - XMLName: xml.Name{ - Local: "meta", - }, - Direction: dir, - Layer3: Layer3{ - XMLName: xml.Name{ - Local: "layer3", - }, - SrcIP: srcIP, - DstIP: dstIP, - }, - Layer4: Layer4{ - XMLName: xml.Name{ - Local: "layer4", - }, - SrcPort: srcPort, - DstPort: dstPort, - Proto: TCP, - }, - } - f.Metas = append(f.Metas, meta) - return &meta -} - -// TODO(pb): dedupe later -func addIndependant(f *Flow, id int64, state string) *Meta { - meta := Meta{ - XMLName: xml.Name{ - Local: "meta", - }, - Direction: "independent", - ID: id, - State: state, - Layer3: Layer3{ - XMLName: xml.Name{ - Local: "layer3", - }, - }, - Layer4: Layer4{ - XMLName: xml.Name{ - Local: "layer4", - }, - }, - } - f.Metas = append(f.Metas, meta) - return &meta -} +func (m *mockFlowWalker) stop() {} func TestNat(t *testing.T) { // test that two containers, on the docker network, get their connections mapped @@ -92,12 +30,12 @@ func TestNat(t *testing.T) { // from the PoV of host1 { - flow := makeFlow("") - addIndependant(&flow, 1, "") - flow.Original = addMeta(&flow, "original", "2.3.4.5", "1.2.3.4", 222222, 80) - flow.Reply = addMeta(&flow, "reply", "10.0.47.1", "2.3.4.5", 80, 222222) - ct := &mockConntracker{ - flows: []Flow{flow}, + f := makeFlow("") + addIndependant(&f, 1, "") + f.Original = addMeta(&f, "original", "2.3.4.5", "1.2.3.4", 222222, 80) + f.Reply = addMeta(&f, "reply", "10.0.47.1", "2.3.4.5", 80, 222222) + ct := &mockFlowWalker{ + flows: []flow{f}, } have := report.MakeReport() @@ -124,12 +62,12 @@ func TestNat(t *testing.T) { // form the PoV of host2 { - flow := makeFlow("") - addIndependant(&flow, 2, "") - flow.Original = addMeta(&flow, "original", "10.0.47.2", "1.2.3.4", 22222, 80) - flow.Reply = addMeta(&flow, "reply", "1.2.3.4", "2.3.4.5", 80, 22223) - ct := &mockConntracker{ - flows: []Flow{flow}, + f := makeFlow("") + addIndependant(&f, 2, "") + f.Original = addMeta(&f, "original", "10.0.47.2", "1.2.3.4", 22222, 80) + f.Reply = addMeta(&f, "reply", "1.2.3.4", "2.3.4.5", 80, 22223) + ct := &mockFlowWalker{ + flows: []flow{f}, } have := report.MakeReport() diff --git a/probe/endpoint/reporter.go b/probe/endpoint/reporter.go index 0311ad3a0..4080dd0bc 100644 --- a/probe/endpoint/reporter.go +++ b/probe/endpoint/reporter.go @@ -26,7 +26,7 @@ type Reporter struct { hostName string includeProcesses bool includeNAT bool - conntracker Conntracker + flowWalker flowWalker // interface natMapper *natMapper reverseResolver *reverseResolver } @@ -50,18 +50,18 @@ var SpyDuration = prometheus.NewSummaryVec( // with process (PID) information. func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bool) *Reporter { var ( - conntracker Conntracker - natMapper *natMapper + flowWalker flowWalker + natMapper *natMapper ) if ConntrackModulePresent() { // TODO(pb) if useConntrack { var err error - if conntracker, err = NewConntracker(true); err != nil { + if flowWalker, err = newConntrackFlowWalker(true); err != nil { log.Printf("Failed to start conntracker for endpoint reporter: %v", err) } } - if natmapperConntracker, err := NewConntracker(true, "--any-nat"); err == nil { - m := makeNATMapper(natmapperConntracker) + if natmapperFlowWalker, err := newConntrackFlowWalker(true, "--any-nat"); err == nil { + m := makeNATMapper(natmapperFlowWalker) natMapper = &m // TODO(pb): if we only ever use this as a pointer, newNATMapper } else { log.Printf("Failed to start conntracker for NAT mapper: %v", err) @@ -71,7 +71,7 @@ func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bo hostID: hostID, hostName: hostName, includeProcesses: includeProcesses, - conntracker: conntracker, + flowWalker: flowWalker, natMapper: natMapper, reverseResolver: newReverseResolver(), } @@ -79,11 +79,11 @@ func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bo // Stop stop stop func (r *Reporter) Stop() { - if r.conntracker != nil { // TODO(pb): this should never be nil (implies interface) - r.conntracker.Stop() + if r.flowWalker != nil { // TODO(pb): this should never be nil (implies interface) + r.flowWalker.stop() } if r.natMapper != nil { // TODO(pb): this should never be nil (implies interface) - r.natMapper.Stop() + r.natMapper.stop() } r.reverseResolver.stop() } @@ -123,11 +123,11 @@ func (r *Reporter) Report() (report.Report, error) { } } - if r.conntracker != nil { + if r.flowWalker != nil { extraNodeInfo := report.MakeNode().WithMetadata(report.Metadata{ Conntracked: "true", }) - r.conntracker.WalkFlows(func(f Flow) { + r.flowWalker.walkFlows(func(f flow) { var ( localPort = uint16(f.Original.Layer4.SrcPort) remotePort = uint16(f.Original.Layer4.DstPort)