mirror of
https://github.com/weaveworks/scope.git
synced 2026-03-04 10:41:14 +00:00
Merge pull request #2699 from weaveworks/ebpf-refactor
EbpfTracker refactoring / cleanup
This commit is contained in:
@@ -27,7 +27,7 @@ type connectionTrackerConfig struct {
|
||||
type connectionTracker struct {
|
||||
conf connectionTrackerConfig
|
||||
flowWalker flowWalker // Interface
|
||||
ebpfTracker eventTracker
|
||||
ebpfTracker *EbpfTracker
|
||||
reverseResolver *reverseResolver
|
||||
}
|
||||
|
||||
@@ -91,16 +91,18 @@ func (t *connectionTracker) ReportConnections(rpt *report.Report) {
|
||||
t.useProcfs()
|
||||
}
|
||||
|
||||
// seenTuples contains information about connections seen by conntrack and it will be passed to the /proc parser
|
||||
seenTuples := map[string]fourTuple{}
|
||||
t.performFlowWalk(rpt, seenTuples)
|
||||
// seenTuples contains information about connections seen by
|
||||
// conntrack
|
||||
seenTuples := t.performFlowWalk(rpt)
|
||||
|
||||
if t.conf.WalkProc && t.conf.Scanner != nil {
|
||||
t.performWalkProc(rpt, hostNodeID, seenTuples)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *connectionTracker) performFlowWalk(rpt *report.Report, seenTuples map[string]fourTuple) {
|
||||
// Consult the flowWalker for short-lived connections
|
||||
// performFlowWalk consults the flowWalker for short-lived connections
|
||||
func (t *connectionTracker) performFlowWalk(rpt *report.Report) map[string]fourTuple {
|
||||
seenTuples := map[string]fourTuple{}
|
||||
extraNodeInfo := map[string]string{
|
||||
Conntracked: "true",
|
||||
}
|
||||
@@ -109,6 +111,24 @@ func (t *connectionTracker) performFlowWalk(rpt *report.Report, seenTuples map[s
|
||||
seenTuples[tuple.key()] = tuple
|
||||
t.addConnection(rpt, tuple, "", extraNodeInfo, extraNodeInfo)
|
||||
})
|
||||
return seenTuples
|
||||
}
|
||||
|
||||
func (t *connectionTracker) existingFlows() map[string]fourTuple {
|
||||
seenTuples := map[string]fourTuple{}
|
||||
if !t.conf.UseConntrack {
|
||||
// log.Warnf("Not using conntrack: disabled")
|
||||
} else if err := IsConntrackSupported(t.conf.ProcRoot); err != nil {
|
||||
log.Warnf("Not using conntrack: not supported by the kernel: %s", err)
|
||||
} else if existingFlows, err := existingConnections([]string{"--any-nat"}); err != nil {
|
||||
log.Errorf("conntrack existingConnections error: %v", err)
|
||||
} else {
|
||||
for _, f := range existingFlows {
|
||||
tuple := flowToTuple(f)
|
||||
seenTuples[tuple.key()] = tuple
|
||||
}
|
||||
}
|
||||
return seenTuples
|
||||
}
|
||||
|
||||
func (t *connectionTracker) performWalkProc(rpt *report.Report, hostNodeID string, seenTuples map[string]fourTuple) error {
|
||||
@@ -144,18 +164,9 @@ func (t *connectionTracker) getInitialState() {
|
||||
processCache.Tick()
|
||||
|
||||
scanner := procspy.NewSyncConnectionScanner(processCache, t.conf.SpyProcs)
|
||||
seenTuples := map[string]fourTuple{}
|
||||
// Consult the flowWalker to get the initial state
|
||||
if err := IsConntrackSupported(t.conf.ProcRoot); t.conf.UseConntrack && err != nil {
|
||||
log.Warnf("Not using conntrack: not supported by the kernel: %s", err)
|
||||
} else if existingFlows, err := existingConnections([]string{"--any-nat"}); err != nil {
|
||||
log.Errorf("conntrack existingConnections error: %v", err)
|
||||
} else {
|
||||
for _, f := range existingFlows {
|
||||
tuple := flowToTuple(f)
|
||||
seenTuples[tuple.key()] = tuple
|
||||
}
|
||||
}
|
||||
|
||||
// Consult conntrack to get the initial state
|
||||
seenTuples := t.existingFlows()
|
||||
|
||||
conns, err := scanner.Connections()
|
||||
if err != nil {
|
||||
|
||||
@@ -24,15 +24,6 @@ type ebpfConnection struct {
|
||||
pid int
|
||||
}
|
||||
|
||||
type eventTracker interface {
|
||||
handleConnection(ev tracer.EventType, tuple fourTuple, pid int, networkNamespace string)
|
||||
walkConnections(f func(ebpfConnection))
|
||||
feedInitialConnections(ci procspy.ConnIter, seenTuples map[string]fourTuple, processesWaitingInAccept []int, hostNodeID string)
|
||||
isReadyToHandleConnections() bool
|
||||
isDead() bool
|
||||
stop()
|
||||
}
|
||||
|
||||
// EbpfTracker contains the sets of open and closed TCP connections.
|
||||
// Closed connections are kept in the `closedConnections` slice for one iteration of `walkConnections`.
|
||||
type EbpfTracker struct {
|
||||
@@ -40,8 +31,9 @@ type EbpfTracker struct {
|
||||
tracer *tracer.Tracer
|
||||
readyToHandleConnections bool
|
||||
dead bool
|
||||
lastTimestampV4 uint64
|
||||
|
||||
openConnections map[string]ebpfConnection
|
||||
openConnections map[fourTuple]ebpfConnection
|
||||
closedConnections []ebpfConnection
|
||||
}
|
||||
|
||||
@@ -79,13 +71,13 @@ func isKernelSupported() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func newEbpfTracker() (eventTracker, error) {
|
||||
func newEbpfTracker() (*EbpfTracker, error) {
|
||||
if err := isKernelSupported(); err != nil {
|
||||
return nil, fmt.Errorf("kernel not supported: %v", err)
|
||||
}
|
||||
|
||||
tracker := &EbpfTracker{
|
||||
openConnections: map[string]ebpfConnection{},
|
||||
openConnections: map[fourTuple]ebpfConnection{},
|
||||
}
|
||||
|
||||
tracer, err := tracer.NewTracer(tracker.tcpEventCbV4, tracker.tcpEventCbV6, tracker.lostCb)
|
||||
@@ -99,20 +91,17 @@ func newEbpfTracker() (eventTracker, error) {
|
||||
return tracker, nil
|
||||
}
|
||||
|
||||
var lastTimestampV4 uint64
|
||||
|
||||
func (t *EbpfTracker) tcpEventCbV4(e tracer.TcpV4) {
|
||||
if lastTimestampV4 > e.Timestamp {
|
||||
if t.lastTimestampV4 > e.Timestamp {
|
||||
// A kernel bug can cause the timestamps to be wrong (e.g. on Ubuntu with Linux 4.4.0-47.68)
|
||||
// Upgrading the kernel will fix the problem. For further info see:
|
||||
// https://github.com/iovisor/bcc/issues/790#issuecomment-263704235
|
||||
// https://github.com/weaveworks/scope/issues/2334
|
||||
log.Errorf("tcp tracer received event with timestamp %v even though the last timestamp was %v. Stopping the eBPF tracker.", e.Timestamp, lastTimestampV4)
|
||||
t.dead = true
|
||||
log.Errorf("tcp tracer received event with timestamp %v even though the last timestamp was %v. Stopping the eBPF tracker.", e.Timestamp, t.lastTimestampV4)
|
||||
t.stop()
|
||||
}
|
||||
|
||||
lastTimestampV4 = e.Timestamp
|
||||
t.lastTimestampV4 = e.Timestamp
|
||||
|
||||
if e.Type == tracer.EventFdInstall {
|
||||
t.handleFdInstall(e.Type, int(e.Pid), int(e.Fd))
|
||||
@@ -128,7 +117,6 @@ func (t *EbpfTracker) tcpEventCbV6(e tracer.TcpV6) {
|
||||
|
||||
func (t *EbpfTracker) lostCb(count uint64) {
|
||||
log.Errorf("tcp tracer lost %d events. Stopping the eBPF tracker", count)
|
||||
t.dead = true
|
||||
t.stop()
|
||||
}
|
||||
|
||||
@@ -182,21 +170,24 @@ func tupleFromPidFd(pid int, fd int) (tuple fourTuple, netns string, ok bool) {
|
||||
}
|
||||
|
||||
func (t *EbpfTracker) handleFdInstall(ev tracer.EventType, pid int, fd int) {
|
||||
if !process.IsProcInAccept("/proc", strconv.Itoa(pid)) {
|
||||
t.tracer.RemoveFdInstallWatcher(uint32(pid))
|
||||
}
|
||||
tuple, netns, ok := tupleFromPidFd(pid, fd)
|
||||
log.Debugf("EbpfTracker: got fd-install event: pid=%d fd=%d -> tuple=%s netns=%s ok=%v", pid, fd, tuple, netns, ok)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
conn := ebpfConnection{
|
||||
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
|
||||
t.openConnections[tuple] = ebpfConnection{
|
||||
incoming: true,
|
||||
tuple: tuple,
|
||||
pid: pid,
|
||||
networkNamespace: netns,
|
||||
}
|
||||
t.openConnections[tuple.String()] = conn
|
||||
if !process.IsProcInAccept("/proc", strconv.Itoa(pid)) {
|
||||
t.tracer.RemoveFdInstallWatcher(uint32(pid))
|
||||
}
|
||||
}
|
||||
|
||||
func (t *EbpfTracker) handleConnection(ev tracer.EventType, tuple fourTuple, pid int, networkNamespace string) {
|
||||
@@ -212,27 +203,25 @@ func (t *EbpfTracker) handleConnection(ev tracer.EventType, tuple fourTuple, pid
|
||||
|
||||
switch ev {
|
||||
case tracer.EventConnect:
|
||||
conn := ebpfConnection{
|
||||
t.openConnections[tuple] = ebpfConnection{
|
||||
incoming: false,
|
||||
tuple: tuple,
|
||||
pid: pid,
|
||||
networkNamespace: networkNamespace,
|
||||
}
|
||||
t.openConnections[tuple.String()] = conn
|
||||
case tracer.EventAccept:
|
||||
conn := ebpfConnection{
|
||||
t.openConnections[tuple] = ebpfConnection{
|
||||
incoming: true,
|
||||
tuple: tuple,
|
||||
pid: pid,
|
||||
networkNamespace: networkNamespace,
|
||||
}
|
||||
t.openConnections[tuple.String()] = conn
|
||||
case tracer.EventClose:
|
||||
if deadConn, ok := t.openConnections[tuple.String()]; ok {
|
||||
delete(t.openConnections, tuple.String())
|
||||
if deadConn, ok := t.openConnections[tuple]; ok {
|
||||
delete(t.openConnections, tuple)
|
||||
t.closedConnections = append(t.closedConnections, deadConn)
|
||||
} else {
|
||||
log.Debugf("EbpfTracker: unmatched close event: %s pid=%d netns=%s", tuple.String(), pid, networkNamespace)
|
||||
log.Debugf("EbpfTracker: unmatched close event: %s pid=%d netns=%s", tuple, pid, networkNamespace)
|
||||
}
|
||||
default:
|
||||
log.Debugf("EbpfTracker: unknown event: %s (%d)", ev, ev)
|
||||
@@ -255,15 +244,19 @@ func (t *EbpfTracker) walkConnections(f func(ebpfConnection)) {
|
||||
}
|
||||
|
||||
func (t *EbpfTracker) feedInitialConnections(conns procspy.ConnIter, seenTuples map[string]fourTuple, processesWaitingInAccept []int, hostNodeID string) {
|
||||
t.readyToHandleConnections = true
|
||||
t.Lock()
|
||||
for conn := conns.Next(); conn != nil; conn = conns.Next() {
|
||||
tuple, namespaceID, incoming := connectionTuple(conn, seenTuples)
|
||||
if incoming {
|
||||
t.handleConnection(tracer.EventAccept, tuple, int(conn.Proc.PID), namespaceID)
|
||||
} else {
|
||||
t.handleConnection(tracer.EventConnect, tuple, int(conn.Proc.PID), namespaceID)
|
||||
t.openConnections[tuple] = ebpfConnection{
|
||||
incoming: incoming,
|
||||
tuple: tuple,
|
||||
pid: int(conn.Proc.PID),
|
||||
networkNamespace: namespaceID,
|
||||
}
|
||||
}
|
||||
t.readyToHandleConnections = true
|
||||
t.Unlock()
|
||||
|
||||
for _, p := range processesWaitingInAccept {
|
||||
t.tracer.AddFdInstallWatcher(uint32(p))
|
||||
log.Debugf("EbpfTracker: install fd-install watcher: pid=%d", p)
|
||||
@@ -275,12 +268,17 @@ func (t *EbpfTracker) isReadyToHandleConnections() bool {
|
||||
}
|
||||
|
||||
func (t *EbpfTracker) isDead() bool {
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
return t.dead
|
||||
}
|
||||
|
||||
func (t *EbpfTracker) stop() {
|
||||
if t.tracer != nil {
|
||||
t.Lock()
|
||||
alreadyDead := t.dead
|
||||
t.dead = true
|
||||
t.Unlock()
|
||||
if !alreadyDead && t.tracer != nil {
|
||||
t.tracer.Stop()
|
||||
}
|
||||
t.dead = true
|
||||
}
|
||||
|
||||
@@ -96,37 +96,37 @@ func TestHandleConnection(t *testing.T) {
|
||||
readyToHandleConnections: true,
|
||||
dead: false,
|
||||
|
||||
openConnections: map[string]ebpfConnection{},
|
||||
openConnections: map[fourTuple]ebpfConnection{},
|
||||
closedConnections: []ebpfConnection{},
|
||||
}
|
||||
|
||||
tuple := fourTuple{IPv4ConnectEvent.SAddr.String(), IPv4ConnectEvent.DAddr.String(), uint16(IPv4ConnectEvent.SPort), uint16(IPv4ConnectEvent.DPort)}
|
||||
mockEbpfTracker.handleConnection(IPv4ConnectEvent.Type, tuple, int(IPv4ConnectEvent.Pid), strconv.FormatUint(uint64(IPv4ConnectEvent.NetNS), 10))
|
||||
if !reflect.DeepEqual(mockEbpfTracker.openConnections[tuple.String()], IPv4ConnectEbpfConnection) {
|
||||
if !reflect.DeepEqual(mockEbpfTracker.openConnections[tuple], IPv4ConnectEbpfConnection) {
|
||||
t.Errorf("Connection mismatch connect event\nTarget connection:%v\nParsed connection:%v",
|
||||
IPv4ConnectEbpfConnection, mockEbpfTracker.openConnections[tuple.String()])
|
||||
IPv4ConnectEbpfConnection, mockEbpfTracker.openConnections[tuple])
|
||||
}
|
||||
|
||||
tuple = fourTuple{IPv4ConnectCloseEvent.SAddr.String(), IPv4ConnectCloseEvent.DAddr.String(), uint16(IPv4ConnectCloseEvent.SPort), uint16(IPv4ConnectCloseEvent.DPort)}
|
||||
mockEbpfTracker.handleConnection(IPv4ConnectCloseEvent.Type, tuple, int(IPv4ConnectCloseEvent.Pid), strconv.FormatUint(uint64(IPv4ConnectCloseEvent.NetNS), 10))
|
||||
if len(mockEbpfTracker.openConnections) != 0 {
|
||||
t.Errorf("Connection mismatch close event\nConnection to close:%v",
|
||||
mockEbpfTracker.openConnections[tuple.String()])
|
||||
mockEbpfTracker.openConnections[tuple])
|
||||
}
|
||||
|
||||
mockEbpfTracker = &EbpfTracker{
|
||||
readyToHandleConnections: true,
|
||||
dead: false,
|
||||
|
||||
openConnections: map[string]ebpfConnection{},
|
||||
openConnections: map[fourTuple]ebpfConnection{},
|
||||
closedConnections: []ebpfConnection{},
|
||||
}
|
||||
|
||||
tuple = fourTuple{IPv4AcceptEvent.SAddr.String(), IPv4AcceptEvent.DAddr.String(), uint16(IPv4AcceptEvent.SPort), uint16(IPv4AcceptEvent.DPort)}
|
||||
mockEbpfTracker.handleConnection(IPv4AcceptEvent.Type, tuple, int(IPv4AcceptEvent.Pid), strconv.FormatUint(uint64(IPv4AcceptEvent.NetNS), 10))
|
||||
if !reflect.DeepEqual(mockEbpfTracker.openConnections[tuple.String()], IPv4AcceptEbpfConnection) {
|
||||
if !reflect.DeepEqual(mockEbpfTracker.openConnections[tuple], IPv4AcceptEbpfConnection) {
|
||||
t.Errorf("Connection mismatch connect event\nTarget connection:%v\nParsed connection:%v",
|
||||
IPv4AcceptEbpfConnection, mockEbpfTracker.openConnections[tuple.String()])
|
||||
IPv4AcceptEbpfConnection, mockEbpfTracker.openConnections[tuple])
|
||||
}
|
||||
|
||||
tuple = fourTuple{IPv4AcceptCloseEvent.SAddr.String(), IPv4AcceptCloseEvent.DAddr.String(), uint16(IPv4AcceptCloseEvent.SPort), uint16(IPv4AcceptCloseEvent.DPort)}
|
||||
@@ -158,8 +158,8 @@ func TestWalkConnections(t *testing.T) {
|
||||
mockEbpfTracker := &EbpfTracker{
|
||||
readyToHandleConnections: true,
|
||||
dead: false,
|
||||
openConnections: map[string]ebpfConnection{
|
||||
activeTuple.String(): {
|
||||
openConnections: map[fourTuple]ebpfConnection{
|
||||
activeTuple: {
|
||||
tuple: activeTuple,
|
||||
networkNamespace: "12345",
|
||||
incoming: true,
|
||||
@@ -207,7 +207,7 @@ func TestInvalidTimeStampDead(t *testing.T) {
|
||||
mockEbpfTracker := &EbpfTracker{
|
||||
readyToHandleConnections: true,
|
||||
dead: false,
|
||||
openConnections: map[string]ebpfConnection{},
|
||||
openConnections: map[fourTuple]ebpfConnection{},
|
||||
}
|
||||
event.Timestamp = 0
|
||||
mockEbpfTracker.tcpEventCbV4(event)
|
||||
|
||||
Reference in New Issue
Block a user