Merge pull request #3712 from weaveworks/3711-restart-sync

fix(probe/ebpf): feed initial connections synchronously on restart
This commit is contained in:
Bryan Boreham
2019-10-22 12:09:49 +01:00
committed by GitHub

View File

@@ -34,7 +34,7 @@ func newConnectionTracker(conf ReporterConfig) connectionTracker {
et, err := newEbpfTracker()
if err == nil {
ct.ebpfTracker = et
go ct.getInitialState()
go feedEBPFInitialState(conf, et)
return ct
}
log.Warnf("Error setting up the eBPF tracker, falling back to proc scanning: %v", err)
@@ -86,7 +86,7 @@ func (t *connectionTracker) ReportConnections(rpt *report.Report) {
log.Warnf("ebpf tracker died, restarting it")
err := t.ebpfTracker.restart()
if err == nil {
go t.getInitialState()
feedEBPFInitialState(t.conf, t.ebpfTracker)
t.performEbpfTrack(rpt, hostNodeID)
return
}
@@ -108,13 +108,13 @@ func (t *connectionTracker) ReportConnections(rpt *report.Report) {
}
}
func (t *connectionTracker) existingFlows() map[string]fourTuple {
func existingFlowsFromConntrack(conf ReporterConfig) map[string]fourTuple {
seenTuples := map[string]fourTuple{}
if !t.conf.UseConntrack {
if !conf.UseConntrack {
// log.Warnf("Not using conntrack: disabled")
} else if err := IsConntrackSupported(t.conf.ProcRoot); err != nil {
} else if err := IsConntrackSupported(conf.ProcRoot); err != nil {
log.Warnf("Not using conntrack: not supported by the kernel: %s", err)
} else if existingFlows, err := conntrack.ConnectionsSize(t.conf.BufferSize); err != nil {
} else if existingFlows, err := conntrack.ConnectionsSize(conf.BufferSize); err != nil {
log.Errorf("conntrack existingConnections error: %v", err)
} else {
for _, f := range existingFlows {
@@ -147,18 +147,20 @@ func (t *connectionTracker) performWalkProc(rpt *report.Report, hostNodeID strin
return nil
}
// getInitialState runs conntrack and proc parsing synchronously only
// feedEBPFInitialState runs conntrack and proc parsing synchronously only
// once to initialize ebpfTracker
func (t *connectionTracker) getInitialState() {
// This is run on a background goroutine during initial setup, so does
// not take *connectionTracker which could change under it
func feedEBPFInitialState(conf ReporterConfig, ebpfTracker *EbpfTracker) {
var processCache *process.CachingWalker
walker := process.NewWalker(t.conf.ProcRoot, true)
walker := process.NewWalker(conf.ProcRoot, true)
processCache = process.NewCachingWalker(walker)
processCache.Tick()
scanner := procspy.NewSyncConnectionScanner(processCache, t.conf.SpyProcs)
scanner := procspy.NewSyncConnectionScanner(processCache, conf.SpyProcs)
// Consult conntrack to get the initial state
seenTuples := t.existingFlows()
seenTuples := existingFlowsFromConntrack(conf)
conns, err := scanner.Connections()
if err != nil {
@@ -173,7 +175,7 @@ func (t *connectionTracker) getInitialState() {
}
})
t.ebpfTracker.feedInitialConnections(conns, seenTuples, processesWaitingInAccept, report.MakeHostNodeID(t.conf.HostID))
ebpfTracker.feedInitialConnections(conns, seenTuples, processesWaitingInAccept, report.MakeHostNodeID(conf.HostID))
}
func (t *connectionTracker) performEbpfTrack(rpt *report.Report, hostNodeID string) error {