From ae83c6545e31e8cc1a691adf0e65906507420e9a Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Mon, 14 Oct 2019 08:22:21 +0000 Subject: [PATCH 1/2] fix(probe/ebpf): feed initial connections synchronously on restart If we run `getInitialState()` async there is some chance we will see another ebpf failure and call `useProcfs()` before `getInitialState()` gets to the last line, whereupon it will crash on nil pointer. Also it seems pointless to call `performEbpfTrack()` without waiting for something to feed in, so I suspect this is what the original author had in mind. It will slow down this one `Report()` on machines with a lot of processes or connections, but ebpfTracker restart is supposed to be a rare event. --- probe/endpoint/connection_tracker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/probe/endpoint/connection_tracker.go b/probe/endpoint/connection_tracker.go index ce39ced15..92fe84b58 100644 --- a/probe/endpoint/connection_tracker.go +++ b/probe/endpoint/connection_tracker.go @@ -86,7 +86,7 @@ func (t *connectionTracker) ReportConnections(rpt *report.Report) { log.Warnf("ebpf tracker died, restarting it") err := t.ebpfTracker.restart() if err == nil { - go t.getInitialState() + t.getInitialState() t.performEbpfTrack(rpt, hostNodeID) return } From b9f10e9b73d652226b8ac81b6784f9b61228bff6 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Mon, 14 Oct 2019 11:22:54 +0000 Subject: [PATCH 2/2] refactor(probe/ebpf): make ebpf setup safer It was possible for `t.ebpfTracker` to change underneath this code while running on a background goroutine, so change it to take `ebpfTracker` as a parameter. While we're here, rename the functions to better match what they do. --- probe/endpoint/connection_tracker.go | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/probe/endpoint/connection_tracker.go b/probe/endpoint/connection_tracker.go index 92fe84b58..bc62476f1 100644 --- a/probe/endpoint/connection_tracker.go +++ b/probe/endpoint/connection_tracker.go @@ -34,7 +34,7 @@ func newConnectionTracker(conf ReporterConfig) connectionTracker { et, err := newEbpfTracker() if err == nil { ct.ebpfTracker = et - go ct.getInitialState() + go feedEBPFInitialState(conf, et) return ct } log.Warnf("Error setting up the eBPF tracker, falling back to proc scanning: %v", err) @@ -86,7 +86,7 @@ func (t *connectionTracker) ReportConnections(rpt *report.Report) { log.Warnf("ebpf tracker died, restarting it") err := t.ebpfTracker.restart() if err == nil { - t.getInitialState() + feedEBPFInitialState(t.conf, t.ebpfTracker) t.performEbpfTrack(rpt, hostNodeID) return } @@ -108,13 +108,13 @@ func (t *connectionTracker) ReportConnections(rpt *report.Report) { } } -func (t *connectionTracker) existingFlows() map[string]fourTuple { +func existingFlowsFromConntrack(conf ReporterConfig) map[string]fourTuple { seenTuples := map[string]fourTuple{} - if !t.conf.UseConntrack { + if !conf.UseConntrack { // log.Warnf("Not using conntrack: disabled") - } else if err := IsConntrackSupported(t.conf.ProcRoot); err != nil { + } else if err := IsConntrackSupported(conf.ProcRoot); err != nil { log.Warnf("Not using conntrack: not supported by the kernel: %s", err) - } else if existingFlows, err := conntrack.ConnectionsSize(t.conf.BufferSize); err != nil { + } else if existingFlows, err := conntrack.ConnectionsSize(conf.BufferSize); err != nil { log.Errorf("conntrack existingConnections error: %v", err) } else { for _, f := range existingFlows { @@ -147,18 +147,20 @@ func (t *connectionTracker) performWalkProc(rpt *report.Report, hostNodeID strin return nil } -// getInitialState runs conntrack and proc parsing synchronously only +// feedEBPFInitialState runs conntrack and proc parsing synchronously only // once to initialize ebpfTracker -func (t *connectionTracker) getInitialState() { +// This is run on a background goroutine during initial setup, so does +// not take *connectionTracker which could change under it +func feedEBPFInitialState(conf ReporterConfig, ebpfTracker *EbpfTracker) { var processCache *process.CachingWalker - walker := process.NewWalker(t.conf.ProcRoot, true) + walker := process.NewWalker(conf.ProcRoot, true) processCache = process.NewCachingWalker(walker) processCache.Tick() - scanner := procspy.NewSyncConnectionScanner(processCache, t.conf.SpyProcs) + scanner := procspy.NewSyncConnectionScanner(processCache, conf.SpyProcs) // Consult conntrack to get the initial state - seenTuples := t.existingFlows() + seenTuples := existingFlowsFromConntrack(conf) conns, err := scanner.Connections() if err != nil { @@ -173,7 +175,7 @@ func (t *connectionTracker) getInitialState() { } }) - t.ebpfTracker.feedInitialConnections(conns, seenTuples, processesWaitingInAccept, report.MakeHostNodeID(t.conf.HostID)) + ebpfTracker.feedInitialConnections(conns, seenTuples, processesWaitingInAccept, report.MakeHostNodeID(conf.HostID)) } func (t *connectionTracker) performEbpfTrack(rpt *report.Report, hostNodeID string) error {