diff --git a/probe/endpoint/connection_tracker.go b/probe/endpoint/connection_tracker.go index 7d6f7ea2c..511bb280d 100644 --- a/probe/endpoint/connection_tracker.go +++ b/probe/endpoint/connection_tracker.go @@ -27,7 +27,7 @@ type connectionTrackerConfig struct { type connectionTracker struct { conf connectionTrackerConfig flowWalker flowWalker // Interface - ebpfTracker eventTracker + ebpfTracker *EbpfTracker reverseResolver *reverseResolver } @@ -91,16 +91,18 @@ func (t *connectionTracker) ReportConnections(rpt *report.Report) { t.useProcfs() } - // seenTuples contains information about connections seen by conntrack and it will be passed to the /proc parser - seenTuples := map[string]fourTuple{} - t.performFlowWalk(rpt, seenTuples) + // seenTuples contains information about connections seen by + // conntrack + seenTuples := t.performFlowWalk(rpt) + if t.conf.WalkProc && t.conf.Scanner != nil { t.performWalkProc(rpt, hostNodeID, seenTuples) } } -func (t *connectionTracker) performFlowWalk(rpt *report.Report, seenTuples map[string]fourTuple) { - // Consult the flowWalker for short-lived connections +// performFlowWalk consults the flowWalker for short-lived connections +func (t *connectionTracker) performFlowWalk(rpt *report.Report) map[string]fourTuple { + seenTuples := map[string]fourTuple{} extraNodeInfo := map[string]string{ Conntracked: "true", } @@ -109,6 +111,24 @@ func (t *connectionTracker) performFlowWalk(rpt *report.Report, seenTuples map[s seenTuples[tuple.key()] = tuple t.addConnection(rpt, tuple, "", extraNodeInfo, extraNodeInfo) }) + return seenTuples +} + +func (t *connectionTracker) existingFlows() map[string]fourTuple { + seenTuples := map[string]fourTuple{} + if !t.conf.UseConntrack { + // log.Warnf("Not using conntrack: disabled") + } else if err := IsConntrackSupported(t.conf.ProcRoot); err != nil { + log.Warnf("Not using conntrack: not supported by the kernel: %s", err) + } else if existingFlows, err := existingConnections([]string{"--any-nat"}); err != nil { + log.Errorf("conntrack existingConnections error: %v", err) + } else { + for _, f := range existingFlows { + tuple := flowToTuple(f) + seenTuples[tuple.key()] = tuple + } + } + return seenTuples } func (t *connectionTracker) performWalkProc(rpt *report.Report, hostNodeID string, seenTuples map[string]fourTuple) error { @@ -144,18 +164,9 @@ func (t *connectionTracker) getInitialState() { processCache.Tick() scanner := procspy.NewSyncConnectionScanner(processCache, t.conf.SpyProcs) - seenTuples := map[string]fourTuple{} - // Consult the flowWalker to get the initial state - if err := IsConntrackSupported(t.conf.ProcRoot); t.conf.UseConntrack && err != nil { - log.Warnf("Not using conntrack: not supported by the kernel: %s", err) - } else if existingFlows, err := existingConnections([]string{"--any-nat"}); err != nil { - log.Errorf("conntrack existingConnections error: %v", err) - } else { - for _, f := range existingFlows { - tuple := flowToTuple(f) - seenTuples[tuple.key()] = tuple - } - } + + // Consult conntrack to get the initial state + seenTuples := t.existingFlows() conns, err := scanner.Connections() if err != nil { diff --git a/probe/endpoint/ebpf.go b/probe/endpoint/ebpf.go index 13e580925..f9d57dcec 100644 --- a/probe/endpoint/ebpf.go +++ b/probe/endpoint/ebpf.go @@ -24,15 +24,6 @@ type ebpfConnection struct { pid int } -type eventTracker interface { - handleConnection(ev tracer.EventType, tuple fourTuple, pid int, networkNamespace string) - walkConnections(f func(ebpfConnection)) - feedInitialConnections(ci procspy.ConnIter, seenTuples map[string]fourTuple, processesWaitingInAccept []int, hostNodeID string) - isReadyToHandleConnections() bool - isDead() bool - stop() -} - // EbpfTracker contains the sets of open and closed TCP connections. // Closed connections are kept in the `closedConnections` slice for one iteration of `walkConnections`. type EbpfTracker struct { @@ -40,8 +31,9 @@ type EbpfTracker struct { tracer *tracer.Tracer readyToHandleConnections bool dead bool + lastTimestampV4 uint64 - openConnections map[string]ebpfConnection + openConnections map[fourTuple]ebpfConnection closedConnections []ebpfConnection } @@ -79,13 +71,13 @@ func isKernelSupported() error { return nil } -func newEbpfTracker() (eventTracker, error) { +func newEbpfTracker() (*EbpfTracker, error) { if err := isKernelSupported(); err != nil { return nil, fmt.Errorf("kernel not supported: %v", err) } tracker := &EbpfTracker{ - openConnections: map[string]ebpfConnection{}, + openConnections: map[fourTuple]ebpfConnection{}, } tracer, err := tracer.NewTracer(tracker.tcpEventCbV4, tracker.tcpEventCbV6, tracker.lostCb) @@ -99,20 +91,17 @@ func newEbpfTracker() (eventTracker, error) { return tracker, nil } -var lastTimestampV4 uint64 - func (t *EbpfTracker) tcpEventCbV4(e tracer.TcpV4) { - if lastTimestampV4 > e.Timestamp { + if t.lastTimestampV4 > e.Timestamp { // A kernel bug can cause the timestamps to be wrong (e.g. on Ubuntu with Linux 4.4.0-47.68) // Upgrading the kernel will fix the problem. For further info see: // https://github.com/iovisor/bcc/issues/790#issuecomment-263704235 // https://github.com/weaveworks/scope/issues/2334 - log.Errorf("tcp tracer received event with timestamp %v even though the last timestamp was %v. Stopping the eBPF tracker.", e.Timestamp, lastTimestampV4) - t.dead = true + log.Errorf("tcp tracer received event with timestamp %v even though the last timestamp was %v. Stopping the eBPF tracker.", e.Timestamp, t.lastTimestampV4) t.stop() } - lastTimestampV4 = e.Timestamp + t.lastTimestampV4 = e.Timestamp if e.Type == tracer.EventFdInstall { t.handleFdInstall(e.Type, int(e.Pid), int(e.Fd)) @@ -128,7 +117,6 @@ func (t *EbpfTracker) tcpEventCbV6(e tracer.TcpV6) { func (t *EbpfTracker) lostCb(count uint64) { log.Errorf("tcp tracer lost %d events. Stopping the eBPF tracker", count) - t.dead = true t.stop() } @@ -182,21 +170,24 @@ func tupleFromPidFd(pid int, fd int) (tuple fourTuple, netns string, ok bool) { } func (t *EbpfTracker) handleFdInstall(ev tracer.EventType, pid int, fd int) { + if !process.IsProcInAccept("/proc", strconv.Itoa(pid)) { + t.tracer.RemoveFdInstallWatcher(uint32(pid)) + } tuple, netns, ok := tupleFromPidFd(pid, fd) log.Debugf("EbpfTracker: got fd-install event: pid=%d fd=%d -> tuple=%s netns=%s ok=%v", pid, fd, tuple, netns, ok) if !ok { return } - conn := ebpfConnection{ + + t.Lock() + defer t.Unlock() + + t.openConnections[tuple] = ebpfConnection{ incoming: true, tuple: tuple, pid: pid, networkNamespace: netns, } - t.openConnections[tuple.String()] = conn - if !process.IsProcInAccept("/proc", strconv.Itoa(pid)) { - t.tracer.RemoveFdInstallWatcher(uint32(pid)) - } } func (t *EbpfTracker) handleConnection(ev tracer.EventType, tuple fourTuple, pid int, networkNamespace string) { @@ -212,27 +203,25 @@ func (t *EbpfTracker) handleConnection(ev tracer.EventType, tuple fourTuple, pid switch ev { case tracer.EventConnect: - conn := ebpfConnection{ + t.openConnections[tuple] = ebpfConnection{ incoming: false, tuple: tuple, pid: pid, networkNamespace: networkNamespace, } - t.openConnections[tuple.String()] = conn case tracer.EventAccept: - conn := ebpfConnection{ + t.openConnections[tuple] = ebpfConnection{ incoming: true, tuple: tuple, pid: pid, networkNamespace: networkNamespace, } - t.openConnections[tuple.String()] = conn case tracer.EventClose: - if deadConn, ok := t.openConnections[tuple.String()]; ok { - delete(t.openConnections, tuple.String()) + if deadConn, ok := t.openConnections[tuple]; ok { + delete(t.openConnections, tuple) t.closedConnections = append(t.closedConnections, deadConn) } else { - log.Debugf("EbpfTracker: unmatched close event: %s pid=%d netns=%s", tuple.String(), pid, networkNamespace) + log.Debugf("EbpfTracker: unmatched close event: %s pid=%d netns=%s", tuple, pid, networkNamespace) } default: log.Debugf("EbpfTracker: unknown event: %s (%d)", ev, ev) @@ -255,15 +244,19 @@ func (t *EbpfTracker) walkConnections(f func(ebpfConnection)) { } func (t *EbpfTracker) feedInitialConnections(conns procspy.ConnIter, seenTuples map[string]fourTuple, processesWaitingInAccept []int, hostNodeID string) { - t.readyToHandleConnections = true + t.Lock() for conn := conns.Next(); conn != nil; conn = conns.Next() { tuple, namespaceID, incoming := connectionTuple(conn, seenTuples) - if incoming { - t.handleConnection(tracer.EventAccept, tuple, int(conn.Proc.PID), namespaceID) - } else { - t.handleConnection(tracer.EventConnect, tuple, int(conn.Proc.PID), namespaceID) + t.openConnections[tuple] = ebpfConnection{ + incoming: incoming, + tuple: tuple, + pid: int(conn.Proc.PID), + networkNamespace: namespaceID, } } + t.readyToHandleConnections = true + t.Unlock() + for _, p := range processesWaitingInAccept { t.tracer.AddFdInstallWatcher(uint32(p)) log.Debugf("EbpfTracker: install fd-install watcher: pid=%d", p) @@ -275,12 +268,17 @@ func (t *EbpfTracker) isReadyToHandleConnections() bool { } func (t *EbpfTracker) isDead() bool { + t.Lock() + defer t.Unlock() return t.dead } func (t *EbpfTracker) stop() { - if t.tracer != nil { + t.Lock() + alreadyDead := t.dead + t.dead = true + t.Unlock() + if !alreadyDead && t.tracer != nil { t.tracer.Stop() } - t.dead = true } diff --git a/probe/endpoint/ebpf_test.go b/probe/endpoint/ebpf_test.go index e22d00acd..24d96aab5 100644 --- a/probe/endpoint/ebpf_test.go +++ b/probe/endpoint/ebpf_test.go @@ -96,37 +96,37 @@ func TestHandleConnection(t *testing.T) { readyToHandleConnections: true, dead: false, - openConnections: map[string]ebpfConnection{}, + openConnections: map[fourTuple]ebpfConnection{}, closedConnections: []ebpfConnection{}, } tuple := fourTuple{IPv4ConnectEvent.SAddr.String(), IPv4ConnectEvent.DAddr.String(), uint16(IPv4ConnectEvent.SPort), uint16(IPv4ConnectEvent.DPort)} mockEbpfTracker.handleConnection(IPv4ConnectEvent.Type, tuple, int(IPv4ConnectEvent.Pid), strconv.FormatUint(uint64(IPv4ConnectEvent.NetNS), 10)) - if !reflect.DeepEqual(mockEbpfTracker.openConnections[tuple.String()], IPv4ConnectEbpfConnection) { + if !reflect.DeepEqual(mockEbpfTracker.openConnections[tuple], IPv4ConnectEbpfConnection) { t.Errorf("Connection mismatch connect event\nTarget connection:%v\nParsed connection:%v", - IPv4ConnectEbpfConnection, mockEbpfTracker.openConnections[tuple.String()]) + IPv4ConnectEbpfConnection, mockEbpfTracker.openConnections[tuple]) } tuple = fourTuple{IPv4ConnectCloseEvent.SAddr.String(), IPv4ConnectCloseEvent.DAddr.String(), uint16(IPv4ConnectCloseEvent.SPort), uint16(IPv4ConnectCloseEvent.DPort)} mockEbpfTracker.handleConnection(IPv4ConnectCloseEvent.Type, tuple, int(IPv4ConnectCloseEvent.Pid), strconv.FormatUint(uint64(IPv4ConnectCloseEvent.NetNS), 10)) if len(mockEbpfTracker.openConnections) != 0 { t.Errorf("Connection mismatch close event\nConnection to close:%v", - mockEbpfTracker.openConnections[tuple.String()]) + mockEbpfTracker.openConnections[tuple]) } mockEbpfTracker = &EbpfTracker{ readyToHandleConnections: true, dead: false, - openConnections: map[string]ebpfConnection{}, + openConnections: map[fourTuple]ebpfConnection{}, closedConnections: []ebpfConnection{}, } tuple = fourTuple{IPv4AcceptEvent.SAddr.String(), IPv4AcceptEvent.DAddr.String(), uint16(IPv4AcceptEvent.SPort), uint16(IPv4AcceptEvent.DPort)} mockEbpfTracker.handleConnection(IPv4AcceptEvent.Type, tuple, int(IPv4AcceptEvent.Pid), strconv.FormatUint(uint64(IPv4AcceptEvent.NetNS), 10)) - if !reflect.DeepEqual(mockEbpfTracker.openConnections[tuple.String()], IPv4AcceptEbpfConnection) { + if !reflect.DeepEqual(mockEbpfTracker.openConnections[tuple], IPv4AcceptEbpfConnection) { t.Errorf("Connection mismatch connect event\nTarget connection:%v\nParsed connection:%v", - IPv4AcceptEbpfConnection, mockEbpfTracker.openConnections[tuple.String()]) + IPv4AcceptEbpfConnection, mockEbpfTracker.openConnections[tuple]) } tuple = fourTuple{IPv4AcceptCloseEvent.SAddr.String(), IPv4AcceptCloseEvent.DAddr.String(), uint16(IPv4AcceptCloseEvent.SPort), uint16(IPv4AcceptCloseEvent.DPort)} @@ -158,8 +158,8 @@ func TestWalkConnections(t *testing.T) { mockEbpfTracker := &EbpfTracker{ readyToHandleConnections: true, dead: false, - openConnections: map[string]ebpfConnection{ - activeTuple.String(): { + openConnections: map[fourTuple]ebpfConnection{ + activeTuple: { tuple: activeTuple, networkNamespace: "12345", incoming: true, @@ -207,7 +207,7 @@ func TestInvalidTimeStampDead(t *testing.T) { mockEbpfTracker := &EbpfTracker{ readyToHandleConnections: true, dead: false, - openConnections: map[string]ebpfConnection{}, + openConnections: map[fourTuple]ebpfConnection{}, } event.Timestamp = 0 mockEbpfTracker.tcpEventCbV4(event)