From 766d8772d76c16613cf987c57f50d3b62bec6763 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 7 Sep 2015 21:26:59 +0000 Subject: [PATCH] Do a conntrack -L before -E to capture existing connections and NAT mappings. --- probe/endpoint/conntrack.go | 65 ++++++++++++++++++++++++++++---- probe/endpoint/conntrack_test.go | 2 +- probe/endpoint/nat.go | 2 +- probe/endpoint/reporter.go | 2 +- 4 files changed, 61 insertions(+), 10 deletions(-) diff --git a/probe/endpoint/conntrack.go b/probe/endpoint/conntrack.go index e853855bc..5bf61ae5c 100644 --- a/probe/endpoint/conntrack.go +++ b/probe/endpoint/conntrack.go @@ -4,6 +4,7 @@ import ( "bufio" "encoding/xml" "fmt" + "io" "log" "os" "strings" @@ -59,21 +60,28 @@ type Flow struct { Original, Reply, Independent *Meta `xml:"-"` } +type conntrack struct { + XMLName xml.Name `xml:"conntrack"` + Flows []Flow `xml:"flow"` +} + // Conntracker uses the conntrack command to track network connections type Conntracker struct { sync.Mutex cmd exec.Cmd activeFlows map[int64]Flow // active flows in state != TIME_WAIT bufferedFlows []Flow // flows coming out of activeFlows spend 1 walk cycle here + existingConns bool } // NewConntracker creates and starts a new Conntracter -func NewConntracker(args ...string) (*Conntracker, error) { +func NewConntracker(existingConns bool, args ...string) (*Conntracker, error) { if !ConntrackModulePresent() { return nil, fmt.Errorf("No conntrack module") } result := &Conntracker{ - activeFlows: map[int64]Flow{}, + activeFlows: map[int64]Flow{}, + existingConns: existingConns, } go result.run(args...) return result, nil @@ -105,6 +113,19 @@ var ConntrackModulePresent = func() bool { // NB this is not re-entrant! func (c *Conntracker) run(args ...string) { + if c.existingConns { + // Fork another conntrack, just to capture existing connections + // for which we don't get events + existingFlows, err := c.existingConnections(args...) + if err != nil { + log.Printf("conntrack existingConnections error: %v", err) + return + } + for _, flow := range existingFlows { + c.handleFlow(flow, true) + } + } + args = append([]string{"-E", "-o", "xml", "-p", "tcp"}, args...) cmd := exec.Command("conntrack", args...) stdout, err := cmd.StdoutPipe() @@ -143,17 +164,47 @@ func (c *Conntracker) run(args ...string) { return } + defer func() { + log.Printf("contrack exiting") + }() + // Now loop on the output stream decoder := xml.NewDecoder(reader) for { var f Flow if err := decoder.Decode(&f); err != nil { log.Printf("conntrack error: %v", err) + return } - c.handleFlow(f) + c.handleFlow(f, false) } } +func (c *Conntracker) existingConnections(args ...string) ([]Flow, error) { + var conntrack conntrack + args = append([]string{"-L", "-o", "xml", "-p", "tcp"}, args...) + cmd := exec.Command("conntrack", args...) + stdout, err := cmd.StdoutPipe() + if err != nil { + return conntrack.Flows, err + } + if err := cmd.Start(); err != nil { + return conntrack.Flows, err + } + defer func() { + if err := cmd.Wait(); err != nil { + log.Printf("conntrack existingConnections exit error: %v", err) + } + }() + if err := xml.NewDecoder(stdout).Decode(&conntrack); err != nil { + if err == io.EOF { + return conntrack.Flows, err + } + return conntrack.Flows, err + } + return conntrack.Flows, nil +} + // Stop stop stop func (c *Conntracker) Stop() { c.Lock() @@ -167,7 +218,7 @@ func (c *Conntracker) Stop() { } } -func (c *Conntracker) handleFlow(f Flow) { +func (c *Conntracker) handleFlow(f Flow, forceAdd bool) { // A flow consists of 3 'metas' - the 'original' 4 tuple (as seen by this // host) and the 'reply' 4 tuple, which is what it has been rewritten to. // This code finds those metas, which are identified by a Direction @@ -194,15 +245,15 @@ func (c *Conntracker) handleFlow(f Flow) { c.Lock() defer c.Unlock() - switch f.Type { - case New, Update: + switch { + case forceAdd || f.Type == New || f.Type == Update: if f.Independent.State != TimeWait { c.activeFlows[f.Independent.ID] = f } else if _, ok := c.activeFlows[f.Independent.ID]; ok { delete(c.activeFlows, f.Independent.ID) c.bufferedFlows = append(c.bufferedFlows, f) } - case Destroy: + case f.Type == Destroy: if _, ok := c.activeFlows[f.Independent.ID]; ok { delete(c.activeFlows, f.Independent.ID) c.bufferedFlows = append(c.bufferedFlows, f) diff --git a/probe/endpoint/conntrack_test.go b/probe/endpoint/conntrack_test.go index daf96f59d..6b5b03f09 100644 --- a/probe/endpoint/conntrack_test.go +++ b/probe/endpoint/conntrack_test.go @@ -76,7 +76,7 @@ func TestConntracker(t *testing.T) { return testExec.NewMockCmd(reader) } - conntracker, err := NewConntracker() + conntracker, err := NewConntracker(false) if err != nil { t.Fatal(err) } diff --git a/probe/endpoint/nat.go b/probe/endpoint/nat.go index 746fcb7a7..d3531f438 100644 --- a/probe/endpoint/nat.go +++ b/probe/endpoint/nat.go @@ -21,7 +21,7 @@ type natmapper struct { } func newNATMapper() (*natmapper, error) { - ct, err := NewConntracker("--any-nat") + ct, err := NewConntracker(true, "--any-nat") if err != nil { return nil, err } diff --git a/probe/endpoint/reporter.go b/probe/endpoint/reporter.go index 6df5eb229..0b52f772f 100644 --- a/probe/endpoint/reporter.go +++ b/probe/endpoint/reporter.go @@ -55,7 +55,7 @@ func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bo err error ) if conntrackModulePresent && useConntrack { - conntracker, err = NewConntracker() + conntracker, err = NewConntracker(true) if err != nil { log.Printf("Failed to start conntracker: %v", err) }