From a3c53aadf55c43360675f77965f890cd9a6ebcdb Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 27 Oct 2015 10:53:23 +0000 Subject: [PATCH] No more nil flow workers --- probe/endpoint/conntrack.go | 15 ++++++--- probe/endpoint/conntrack_internal_test.go | 6 +--- probe/endpoint/nat.go | 3 -- probe/endpoint/reporter.go | 41 +++++------------------ 4 files changed, 20 insertions(+), 45 deletions(-) diff --git a/probe/endpoint/conntrack.go b/probe/endpoint/conntrack.go index 2671c2c3a..2efb1f9cb 100644 --- a/probe/endpoint/conntrack.go +++ b/probe/endpoint/conntrack.go @@ -3,7 +3,6 @@ package endpoint import ( "bufio" "encoding/xml" - "fmt" "io" "log" "os" @@ -68,6 +67,11 @@ type flowWalker interface { stop() } +type nilFlowWalker struct{} + +func (n *nilFlowWalker) stop() {} +func (n *nilFlowWalker) walkFlows(f func(flow)) {} + // conntrackWalker uses the conntrack command to track network connections and // implement flowWalker. type conntrackWalker struct { @@ -81,9 +85,12 @@ type conntrackWalker struct { } // newConntracker creates and starts a new conntracker. -func newConntrackFlowWalker(existingConns bool, args ...string) (flowWalker, error) { +func newConntrackFlowWalker(useConntrack, existingConns bool, args ...string) flowWalker { if !ConntrackModulePresent() { - return nil, fmt.Errorf("No conntrack module") + log.Printf("Not using conntrack: module not present") + return &nilFlowWalker{} + } else if !useConntrack { + return &nilFlowWalker{} } result := &conntrackWalker{ activeFlows: map[int64]flow{}, @@ -91,7 +98,7 @@ func newConntrackFlowWalker(existingConns bool, args ...string) (flowWalker, err args: args, } go result.loop() - return result, nil + return result } // ConntrackModulePresent returns true if the kernel has the conntrack module diff --git a/probe/endpoint/conntrack_internal_test.go b/probe/endpoint/conntrack_internal_test.go index fed6ca281..a88eb2aa0 100644 --- a/probe/endpoint/conntrack_internal_test.go +++ b/probe/endpoint/conntrack_internal_test.go @@ -83,11 +83,7 @@ func TestConntracker(t *testing.T) { return testexec.NewMockCmd(reader) } - flowWalker, err := newConntrackFlowWalker(false) - if err != nil { - t.Fatal(err) - } - + flowWalker := newConntrackFlowWalker(true, false) bw := bufio.NewWriter(writer) if _, err := bw.WriteString(xmlHeader); err != nil { t.Fatal(err) diff --git a/probe/endpoint/nat.go b/probe/endpoint/nat.go index 016dced6a..205379477 100644 --- a/probe/endpoint/nat.go +++ b/probe/endpoint/nat.go @@ -49,9 +49,6 @@ func toMapping(f flow) *endpointMapping { // applyNAT duplicates Nodes in the endpoint topology of a report, based on // the NAT table. func (n natMapper) applyNAT(rpt report.Report, scope string) { - if n.flowWalker == nil { // TODO(pb) - return - } n.flowWalker.walkFlows(func(f flow) { var ( mapping = toMapping(f) diff --git a/probe/endpoint/reporter.go b/probe/endpoint/reporter.go index 4080dd0bc..191bb7ea5 100644 --- a/probe/endpoint/reporter.go +++ b/probe/endpoint/reporter.go @@ -1,7 +1,6 @@ package endpoint import ( - "log" "strconv" "time" @@ -27,7 +26,7 @@ type Reporter struct { includeProcesses bool includeNAT bool flowWalker flowWalker // interface - natMapper *natMapper + natMapper natMapper reverseResolver *reverseResolver } @@ -49,42 +48,20 @@ var SpyDuration = prometheus.NewSummaryVec( // is stored in the Endpoint topology. It optionally enriches that topology // with process (PID) information. func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bool) *Reporter { - var ( - flowWalker flowWalker - natMapper *natMapper - ) - if ConntrackModulePresent() { // TODO(pb) - if useConntrack { - var err error - if flowWalker, err = newConntrackFlowWalker(true); err != nil { - log.Printf("Failed to start conntracker for endpoint reporter: %v", err) - } - } - if natmapperFlowWalker, err := newConntrackFlowWalker(true, "--any-nat"); err == nil { - m := makeNATMapper(natmapperFlowWalker) - natMapper = &m // TODO(pb): if we only ever use this as a pointer, newNATMapper - } else { - log.Printf("Failed to start conntracker for NAT mapper: %v", err) - } - } return &Reporter{ hostID: hostID, hostName: hostName, includeProcesses: includeProcesses, - flowWalker: flowWalker, - natMapper: natMapper, + flowWalker: newConntrackFlowWalker(useConntrack, true), + natMapper: makeNATMapper(newConntrackFlowWalker(useConntrack, true, "--any-nat")), reverseResolver: newReverseResolver(), } } // Stop stop stop func (r *Reporter) Stop() { - if r.flowWalker != nil { // TODO(pb): this should never be nil (implies interface) - r.flowWalker.stop() - } - if r.natMapper != nil { // TODO(pb): this should never be nil (implies interface) - r.natMapper.stop() - } + r.flowWalker.stop() + r.natMapper.stop() r.reverseResolver.stop() } @@ -123,7 +100,8 @@ func (r *Reporter) Report() (report.Report, error) { } } - if r.flowWalker != nil { + // Consult the flowWalker for short-live connections + { extraNodeInfo := report.MakeNode().WithMetadata(report.Metadata{ Conntracked: "true", }) @@ -138,10 +116,7 @@ func (r *Reporter) Report() (report.Report, error) { }) } - if r.natMapper != nil { // TODO(pb): should never be nil - r.natMapper.applyNAT(rpt, r.hostID) - } - + r.natMapper.applyNAT(rpt, r.hostID) return rpt, nil }