Add eBPF connection tracking without dependencies on kernel headers

Based on work from Lorenzo, updated by Iago, Alban, Alessandro and
Michael.

This PR adds connection tracking using eBPF. This feature is not enabled by default.
For now, you can enable it by launching scope with the following command:

```
sudo ./scope launch --probe.ebpf.connections=true
```

This patch allows scope to get notified of every connection event,
without relying on the parsing of /proc/$pid/net/tcp{,6} and
/proc/$pid/fd/*, and therefore improve performance.

We vendor https://github.com/iovisor/gobpf in Scope to load the
pre-compiled ebpf program and https://github.com/weaveworks/tcptracer-bpf
to guess the offsets of the structures we need in the kernel. In this
way we don't need a different pre-compiled ebpf object file per kernel.
The pre-compiled ebpf program is included in the vendoring of
tcptracer-bpf.

The ebpf program uses kprobes/kretprobes on the following kernel functions:
- tcp_v4_connect
- tcp_v6_connect
- tcp_set_state
- inet_csk_accept
- tcp_close

It generates "connect", "accept" and "close" events containing the
connection tuple but also pid and netns.
Note: the IPv6 events are not supported in Scope and thus not passed on.

probe/endpoint/ebpf.go maintains the list of connections. Similarly to
conntrack, it also keeps the dead connections for one iteration in order
to report short-lived connections.

The code for parsing /proc/$pid/net/tcp{,6} and /proc/$pid/fd/* is still
there and still used at start-up because eBPF only brings us the events
and not the initial state. However, the /proc parsing for the initial
state is now done in foreground instead of background, via
newForegroundReader().

NAT resolution on connections from eBPF works in the same way as it did
on connections from /proc: by using conntrack. One of the two conntrack
instances is only started to get the initial state and then it is
stopped since eBPF detects short-lived connections.

The Scope Docker image size comparison:
- weaveworks/scope in current master:  22 MB (compressed),  68 MB
  (uncompressed)
- weaveworks/scope with this patchset: 23 MB (compressed), 69 MB
  (uncompressed)

Fixes #1168 (walking /proc to obtain connections is very expensive)

Fixes #1260 (Short-lived connections not tracked for containers in
shared networking namespaces)

Fixes #1962 (Port ebpf tracker to Go)

Fixes #1961 (Remove runtime kernel header dependency from ebpf tracker)
This commit is contained in:
Iago López Galeiras
2017-03-08 15:24:37 +01:00
committed by Alban Crequy
parent 6ede5e5c91
commit 9920c4ea48
18 changed files with 878 additions and 191 deletions

View File

@@ -24,13 +24,24 @@ RM=--rm
RUN_FLAGS=-ti
BUILD_IN_CONTAINER=true
GO_ENV=GOGC=off
GO=env $(GO_ENV) go
NO_CROSS_COMP=unset GOOS GOARCH
GO_HOST=$(NO_CROSS_COMP); $(GO)
WITH_GO_HOST_ENV=$(NO_CROSS_COMP); $(GO_ENV)
GO_BUILD_INSTALL_DEPS=-i
GO_BUILD_TAGS='netgo unsafe'
GO_BUILD_FLAGS=$(GO_BUILD_INSTALL_DEPS) -ldflags "-extldflags \"-static\" -X main.version=$(SCOPE_VERSION) -s -w" -tags $(GO_BUILD_TAGS)
GOOS=$(shell go tool dist env | grep GOOS | sed -e 's/GOOS="\(.*\)"/\1/')
ifeq ($(GOOS),linux)
GO_ENV+=CGO_ENABLED=1
endif
ifeq ($(GOARCH),arm)
ARM_CC=CC=/usr/bin/arm-linux-gnueabihf-gcc
endif
GO=env $(GO_ENV) $(ARM_CC) go
NO_CROSS_COMP=unset GOOS GOARCH
GO_HOST=$(NO_CROSS_COMP); env $(GO_ENV) go
WITH_GO_HOST_ENV=$(NO_CROSS_COMP); $(GO_ENV)
IMAGE_TAG=$(shell ./tools/image-tag)
all: $(SCOPE_EXPORT)

View File

@@ -1,6 +1,9 @@
FROM golang:1.7.4
FROM ubuntu:yakkety
ENV GOPATH /go
ENV GOVERSION 1.7
ENV PATH /go/bin:/usr/lib/go-${GOVERSION}/bin:/usr/bin:/bin:/usr/sbin:/sbin
RUN apt-get update && \
apt-get install -y libpcap-dev python-requests time file shellcheck && \
apt-get install -y libpcap-dev python-requests time file shellcheck golang-${GOVERSION} git gcc-arm-linux-gnueabihf && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
RUN go clean -i net && \
go install -tags netgo std && \
@@ -13,7 +16,7 @@ RUN go get -tags netgo \
github.com/fatih/hclfmt \
github.com/mjibson/esc \
github.com/client9/misspell/cmd/misspell && \
chmod a+wr --recursive /usr/local/go/pkg && \
chmod a+wr --recursive /usr/lib/go-${GOVERSION}/pkg && \
rm -rf /go/pkg/ /go/src/
COPY build.sh /
ENTRYPOINT ["/build.sh"]

View File

@@ -0,0 +1,250 @@
package endpoint
import (
"strconv"
log "github.com/Sirupsen/logrus"
"github.com/weaveworks/scope/probe/endpoint/procspy"
"github.com/weaveworks/scope/probe/process"
"github.com/weaveworks/scope/report"
)
// connectionTrackerConfig are the config options for the endpoint tracker.
type connectionTrackerConfig struct {
HostID string
HostName string
SpyProcs bool
UseConntrack bool
WalkProc bool
UseEbpfConn bool
ProcRoot string
BufferSize int
Scanner procspy.ConnectionScanner
DNSSnooper *DNSSnooper
}
type connectionTracker struct {
conf connectionTrackerConfig
flowWalker flowWalker // Interface
ebpfTracker eventTracker
reverseResolver *reverseResolver
processCache *process.CachingWalker
}
func newConnectionTracker(conf connectionTrackerConfig) connectionTracker {
if !conf.UseEbpfConn {
// ebpf OFF, use flowWalker
return connectionTracker{
conf: conf,
flowWalker: newConntrackFlowWalker(conf.UseConntrack, conf.ProcRoot, conf.BufferSize, "--any-nat"),
ebpfTracker: nil,
reverseResolver: newReverseResolver(),
}
}
// When ebpf will be active by default, check if it starts correctly otherwise fallback to flowWalk
et, err := newEbpfTracker(conf.UseEbpfConn)
if err != nil {
// TODO: fallback to flowWalker, when ebpf is enabled by default
log.Errorf("Error setting up the ebpfTracker, connections will not be reported: %s", err)
noopConnectionTracker := connectionTracker{
conf: conf,
flowWalker: nil,
ebpfTracker: nil,
reverseResolver: nil,
}
return noopConnectionTracker
}
var processCache *process.CachingWalker
processCache = process.NewCachingWalker(process.NewWalker(conf.ProcRoot))
processCache.Tick()
ct := connectionTracker{
conf: conf,
flowWalker: nil,
ebpfTracker: et,
reverseResolver: newReverseResolver(),
processCache: processCache,
}
go ct.getInitialState()
return ct
}
func flowToTuple(f flow) (ft fourTuple) {
ft = fourTuple{
f.Original.Layer3.SrcIP,
f.Original.Layer3.DstIP,
uint16(f.Original.Layer4.SrcPort),
uint16(f.Original.Layer4.DstPort),
}
// Handle DNAT-ed connections in the initial state
if f.Original.Layer3.DstIP != f.Reply.Layer3.SrcIP {
ft = fourTuple{
f.Reply.Layer3.DstIP,
f.Reply.Layer3.SrcIP,
uint16(f.Reply.Layer4.DstPort),
uint16(f.Reply.Layer4.SrcPort),
}
}
return ft
}
// ReportConnections calls trackers accordingly to the configuration.
// When ebpf is enabled, only performEbpfTrack() is called
func (t *connectionTracker) ReportConnections(rpt *report.Report) {
hostNodeID := report.MakeHostNodeID(t.conf.HostID)
if t.ebpfTracker != nil {
t.performEbpfTrack(rpt, hostNodeID)
return
}
// seenTuples contains information about connections seen by conntrack and it will be passed to the /proc parser
seenTuples := map[string]fourTuple{}
if t.flowWalker != nil {
t.performFlowWalk(rpt, &seenTuples)
}
// if eBPF was enabled but failed to initialize, Scanner will be nil.
// We can't recover from this, so don't walk proc in that case.
// TODO: implement fallback
if t.conf.WalkProc && t.conf.Scanner != nil {
t.performWalkProc(rpt, hostNodeID, &seenTuples)
}
}
func (t *connectionTracker) performFlowWalk(rpt *report.Report, seenTuples *map[string]fourTuple) {
// Consult the flowWalker for short-lived connections
extraNodeInfo := map[string]string{
Conntracked: "true",
}
t.flowWalker.walkFlows(func(f flow, alive bool) {
tuple := flowToTuple(f)
(*seenTuples)[tuple.key()] = tuple
t.addConnection(rpt, tuple, "", extraNodeInfo, extraNodeInfo)
})
}
func (t *connectionTracker) performWalkProc(rpt *report.Report, hostNodeID string, seenTuples *map[string]fourTuple) error {
conns, err := t.conf.Scanner.Connections(t.conf.SpyProcs)
if err != nil {
return err
}
for conn := conns.Next(); conn != nil; conn = conns.Next() {
var (
namespaceID string
tuple = fourTuple{
conn.LocalAddress.String(),
conn.RemoteAddress.String(),
conn.LocalPort,
conn.RemotePort,
}
toNodeInfo = map[string]string{Procspied: "true"}
fromNodeInfo = map[string]string{Procspied: "true"}
)
if conn.Proc.PID > 0 {
fromNodeInfo[process.PID] = strconv.FormatUint(uint64(conn.Proc.PID), 10)
fromNodeInfo[report.HostNodeID] = hostNodeID
}
if conn.Proc.NetNamespaceID > 0 {
namespaceID = strconv.FormatUint(conn.Proc.NetNamespaceID, 10)
}
// If we've already seen this connection, we should know the direction
// (or have already figured it out), so we normalize and use the
// canonical direction. Otherwise, we can use a port-heuristic to guess
// the direction.
canonical, ok := (*seenTuples)[tuple.key()]
if (ok && canonical != tuple) || (!ok && tuple.fromPort < tuple.toPort) {
tuple.reverse()
toNodeInfo, fromNodeInfo = fromNodeInfo, toNodeInfo
}
t.addConnection(rpt, tuple, namespaceID, fromNodeInfo, toNodeInfo)
}
return nil
}
func (t *connectionTracker) getInitialState() {
scanner := procspy.NewSyncConnectionScanner(t.processCache)
// Run conntrack and proc parsing synchronously only once to initialize ebpfTracker
seenTuples := map[string]fourTuple{}
// Consult the flowWalker to get the initial state
if err := IsConntrackSupported(t.conf.ProcRoot); t.conf.UseConntrack && err != nil {
log.Warnf("Not using conntrack: not supported by the kernel: %s", err)
} else if existingFlows, err := existingConnections([]string{"--any-nat"}); err != nil {
log.Errorf("conntrack existingConnections error: %v", err)
} else {
for _, f := range existingFlows {
tuple := flowToTuple(f)
seenTuples[tuple.key()] = tuple
}
}
conns, err := scanner.Connections(t.conf.SpyProcs)
if err != nil {
log.Errorf("Error initializing ebpfTracker while scanning /proc, continuing without initial connections: %s", err)
}
scanner.Stop()
t.ebpfTracker.feedInitialConnections(conns, seenTuples, report.MakeHostNodeID(t.conf.HostID))
}
func (t *connectionTracker) performEbpfTrack(rpt *report.Report, hostNodeID string) error {
t.ebpfTracker.walkConnections(func(e ebpfConnection) {
fromNodeInfo := map[string]string{
EBPF: "true",
}
toNodeInfo := map[string]string{
EBPF: "true",
}
if e.pid > 0 {
fromNodeInfo[process.PID] = strconv.Itoa(e.pid)
fromNodeInfo[report.HostNodeID] = hostNodeID
}
if e.incoming {
t.addConnection(rpt, reverse(e.tuple), e.networkNamespace, toNodeInfo, fromNodeInfo)
} else {
t.addConnection(rpt, e.tuple, e.networkNamespace, fromNodeInfo, toNodeInfo)
}
})
return nil
}
func (t *connectionTracker) addConnection(rpt *report.Report, ft fourTuple, namespaceID string, extraFromNode, extraToNode map[string]string) {
var (
fromNode = t.makeEndpointNode(namespaceID, ft.fromAddr, ft.fromPort, extraFromNode)
toNode = t.makeEndpointNode(namespaceID, ft.toAddr, ft.toPort, extraToNode)
)
rpt.Endpoint = rpt.Endpoint.AddNode(fromNode.WithEdge(toNode.ID, report.EdgeMetadata{}))
rpt.Endpoint = rpt.Endpoint.AddNode(toNode)
}
func (t *connectionTracker) makeEndpointNode(namespaceID string, addr string, port uint16, extra map[string]string) report.Node {
portStr := strconv.Itoa(int(port))
node := report.MakeNodeWith(
report.MakeEndpointNodeID(t.conf.HostID, namespaceID, addr, portStr),
map[string]string{Addr: addr, Port: portStr})
if names := t.conf.DNSSnooper.CachedNamesForIP(addr); len(names) > 0 {
node = node.WithSet(SnoopedDNSNames, report.MakeStringSet(names...))
}
if names, err := t.reverseResolver.get(addr); err == nil && len(names) > 0 {
node = node.WithSet(ReverseDNSNames, report.MakeStringSet(names...))
}
if extra != nil {
node = node.WithLatests(extra)
}
return node
}
func (t *connectionTracker) Stop() error {
if t.ebpfTracker != nil {
t.ebpfTracker.stop()
}
if t.flowWalker != nil {
t.flowWalker.stop()
}
t.reverseResolver.stop()
return nil
}

View File

@@ -65,14 +65,14 @@ type conntrack struct {
// flowWalker is something that maintains flows, and provides an accessor
// method to walk them.
type flowWalker interface {
walkFlows(f func(flow))
walkFlows(f func(f flow, active bool))
stop()
}
type nilFlowWalker struct{}
func (n nilFlowWalker) stop() {}
func (n nilFlowWalker) walkFlows(f func(flow)) {}
func (n nilFlowWalker) stop() {}
func (n nilFlowWalker) walkFlows(f func(flow, bool)) {}
// conntrackWalker uses the conntrack command to track network connections and
// implement flowWalker.
@@ -160,7 +160,7 @@ func logPipe(prefix string, reader io.Reader) {
func (c *conntrackWalker) run() {
// Fork another conntrack, just to capture existing connections
// for which we don't get events
existingFlows, err := c.existingConnections()
existingFlows, err := existingConnections(c.args)
if err != nil {
log.Errorf("conntrack existingConnections error: %v", err)
return
@@ -354,8 +354,8 @@ func decodeStreamedFlow(scanner *bufio.Scanner) (flow, error) {
return f, nil
}
func (c *conntrackWalker) existingConnections() ([]flow, error) {
args := append([]string{"-L", "-o", "id", "-p", "tcp"}, c.args...)
func existingConnections(conntrackWalkerArgs []string) ([]flow, error) {
args := append([]string{"-L", "-o", "id", "-p", "tcp"}, conntrackWalkerArgs...)
cmd := exec.Command("conntrack", args...)
stdout, err := cmd.StdoutPipe()
if err != nil {
@@ -463,14 +463,14 @@ func (c *conntrackWalker) handleFlow(f flow, forceAdd bool) {
// walkFlows calls f with all active flows and flows that have come and gone
// since the last call to walkFlows
func (c *conntrackWalker) walkFlows(f func(flow)) {
func (c *conntrackWalker) walkFlows(f func(flow, bool)) {
c.Lock()
defer c.Unlock()
for _, flow := range c.activeFlows {
f(flow)
f(flow, true)
}
for _, flow := range c.bufferedFlows {
f(flow)
f(flow, false)
}
c.bufferedFlows = c.bufferedFlows[:0]
}

217
probe/endpoint/ebpf.go Normal file
View File

@@ -0,0 +1,217 @@
package endpoint
import (
"errors"
"fmt"
"regexp"
"strconv"
"sync"
log "github.com/Sirupsen/logrus"
"github.com/weaveworks/scope/probe/endpoint/procspy"
"github.com/weaveworks/scope/probe/host"
"github.com/weaveworks/tcptracer-bpf/pkg/tracer"
)
// An ebpfConnection represents a TCP connection
type ebpfConnection struct {
tuple fourTuple
networkNamespace string
incoming bool
pid int
}
type eventTracker interface {
handleConnection(ev tracer.EventType, tuple fourTuple, pid int, networkNamespace string)
walkConnections(f func(ebpfConnection))
feedInitialConnections(ci procspy.ConnIter, seenTuples map[string]fourTuple, hostNodeID string)
isReadyToHandleConnections() bool
stop()
}
var ebpfTracker *EbpfTracker
// EbpfTracker contains the sets of open and closed TCP connections.
// Closed connections are kept in the `closedConnections` slice for one iteration of `walkConnections`.
type EbpfTracker struct {
sync.Mutex
tracer *tracer.Tracer
readyToHandleConnections bool
dead bool
openConnections map[string]ebpfConnection
closedConnections []ebpfConnection
}
var releaseRegex = regexp.MustCompile(`^(\d+)\.(\d+).*$`)
func isKernelSupported() error {
release, _, err := host.GetKernelReleaseAndVersion()
if err != nil {
return err
}
releaseParts := releaseRegex.FindStringSubmatch(release)
if len(releaseParts) != 3 {
return fmt.Errorf("got invalid release version %q (expected format '4.4[.2-1]')", release)
}
major, err := strconv.Atoi(releaseParts[1])
if err != nil {
return err
}
minor, err := strconv.Atoi(releaseParts[2])
if err != nil {
return err
}
if major > 4 {
return nil
}
if major < 4 || minor < 4 {
return fmt.Errorf("got kernel %s but need kernel >=4.4", release)
}
return nil
}
func newEbpfTracker(useEbpfConn bool) (eventTracker, error) {
if !useEbpfConn {
return nil, errors.New("ebpf tracker not enabled")
}
if err := isKernelSupported(); err != nil {
return nil, fmt.Errorf("kernel not supported: %v", err)
}
t, err := tracer.NewTracer(tcpEventCbV4, tcpEventCbV6)
if err != nil {
return nil, err
}
tracker := &EbpfTracker{
openConnections: map[string]ebpfConnection{},
tracer: t,
}
ebpfTracker = tracker
return tracker, nil
}
var lastTimestampV4 uint64
func tcpEventCbV4(e tracer.TcpV4) {
if lastTimestampV4 > e.Timestamp {
log.Errorf("ERROR: late event!\n")
}
lastTimestampV4 = e.Timestamp
tuple := fourTuple{e.SAddr.String(), e.DAddr.String(), e.SPort, e.DPort}
ebpfTracker.handleConnection(e.Type, tuple, int(e.Pid), strconv.Itoa(int(e.NetNS)))
}
func tcpEventCbV6(e tracer.TcpV6) {
// TODO: IPv6 not supported in Scope
}
func (t *EbpfTracker) handleConnection(ev tracer.EventType, tuple fourTuple, pid int, networkNamespace string) {
t.Lock()
defer t.Unlock()
if !t.isReadyToHandleConnections() {
return
}
log.Debugf("handleConnection(%v, [%v:%v --> %v:%v], pid=%v, netNS=%v)",
ev, tuple.fromAddr, tuple.fromPort, tuple.toAddr, tuple.toPort, pid, networkNamespace)
switch ev {
case tracer.EventConnect:
conn := ebpfConnection{
incoming: false,
tuple: tuple,
pid: pid,
networkNamespace: networkNamespace,
}
t.openConnections[tuple.String()] = conn
case tracer.EventAccept:
conn := ebpfConnection{
incoming: true,
tuple: tuple,
pid: pid,
networkNamespace: networkNamespace,
}
t.openConnections[tuple.String()] = conn
case tracer.EventClose:
if deadConn, ok := t.openConnections[tuple.String()]; ok {
delete(t.openConnections, tuple.String())
t.closedConnections = append(t.closedConnections, deadConn)
} else {
log.Debugf("EbpfTracker: unmatched close event: %s pid=%d netns=%s", tuple.String(), pid, networkNamespace)
}
}
}
// walkConnections calls f with all open connections and connections that have come and gone
// since the last call to walkConnections
func (t *EbpfTracker) walkConnections(f func(ebpfConnection)) {
t.Lock()
defer t.Unlock()
for _, connection := range t.openConnections {
f(connection)
}
for _, connection := range t.closedConnections {
f(connection)
}
t.closedConnections = t.closedConnections[:0]
}
func (t *EbpfTracker) feedInitialConnections(conns procspy.ConnIter, seenTuples map[string]fourTuple, hostNodeID string) {
t.readyToHandleConnections = true
for conn := conns.Next(); conn != nil; conn = conns.Next() {
var (
namespaceID string
tuple = fourTuple{
conn.LocalAddress.String(),
conn.RemoteAddress.String(),
conn.LocalPort,
conn.RemotePort,
}
)
if conn.Proc.NetNamespaceID > 0 {
namespaceID = strconv.FormatUint(conn.Proc.NetNamespaceID, 10)
}
// We can use a port-heuristic to guess the direction.
// We assume that tuple.fromPort < tuple.toPort is a connect event (outgoing)
canonical, ok := seenTuples[tuple.key()]
if (ok && canonical != tuple) || (!ok && tuple.fromPort < tuple.toPort) {
t.handleConnection(tracer.EventConnect, tuple, int(conn.Proc.PID), namespaceID)
} else {
t.handleConnection(tracer.EventAccept, tuple, int(conn.Proc.PID), namespaceID)
}
}
}
func (t *EbpfTracker) isReadyToHandleConnections() bool {
return t.readyToHandleConnections
}
func (t *EbpfTracker) stop() {
// TODO: implement proper stopping logic
//
// Even if we stop the go routine, it's not enough since we disabled the
// async proc parser. We leave this uninmplemented for now because:
//
// * Ebpf parsing is optional (need to be enabled explicitly with
// --probe.ebpf.connections=true), if a user enables it we assume they
// check on the logs whether it works or not
//
// * It's unlikely that the ebpf tracker stops working if it started
// successfully and if it does, we probaby want it to fail hard
}

184
probe/endpoint/ebpf_test.go Normal file
View File

@@ -0,0 +1,184 @@
package endpoint
import (
"net"
"reflect"
"strconv"
"testing"
"github.com/weaveworks/tcptracer-bpf/pkg/tracer"
)
func TestHandleConnection(t *testing.T) {
var (
ServerPid uint32 = 42
ClientPid uint32 = 43
ServerIP = net.IP("127.0.0.1")
ClientIP = net.IP("127.0.0.2")
ServerPort uint16 = 12345
ClientPort uint16 = 6789
NetNS uint32 = 123456789
IPv4ConnectEvent = tracer.TcpV4{
CPU: 0,
Type: tracer.EventConnect,
Pid: ClientPid,
Comm: "cmd",
SAddr: ClientIP,
DAddr: ServerIP,
SPort: ClientPort,
DPort: ServerPort,
NetNS: NetNS,
}
IPv4ConnectEbpfConnection = ebpfConnection{
tuple: fourTuple{
fromAddr: ClientIP.String(),
toAddr: ServerIP.String(),
fromPort: ClientPort,
toPort: ServerPort,
},
networkNamespace: strconv.Itoa(int(NetNS)),
incoming: false,
pid: int(ClientPid),
}
IPv4ConnectCloseEvent = tracer.TcpV4{
CPU: 0,
Type: tracer.EventClose,
Pid: ClientPid,
Comm: "cmd",
SAddr: ClientIP,
DAddr: ServerIP,
SPort: ClientPort,
DPort: ServerPort,
NetNS: NetNS,
}
IPv4AcceptEvent = tracer.TcpV4{
CPU: 0,
Type: tracer.EventAccept,
Pid: ServerPid,
Comm: "cmd",
SAddr: ServerIP,
DAddr: ClientIP,
SPort: ServerPort,
DPort: ClientPort,
NetNS: NetNS,
}
IPv4AcceptEbpfConnection = ebpfConnection{
tuple: fourTuple{
fromAddr: ServerIP.String(),
toAddr: ClientIP.String(),
fromPort: ServerPort,
toPort: ClientPort,
},
networkNamespace: strconv.Itoa(int(NetNS)),
incoming: true,
pid: int(ServerPid),
}
IPv4AcceptCloseEvent = tracer.TcpV4{
CPU: 0,
Type: tracer.EventClose,
Pid: ClientPid,
Comm: "cmd",
SAddr: ServerIP,
DAddr: ClientIP,
SPort: ServerPort,
DPort: ClientPort,
NetNS: NetNS,
}
)
mockEbpfTracker := &EbpfTracker{
readyToHandleConnections: true,
dead: false,
openConnections: map[string]ebpfConnection{},
closedConnections: []ebpfConnection{},
}
tuple := fourTuple{IPv4ConnectEvent.SAddr.String(), IPv4ConnectEvent.DAddr.String(), uint16(IPv4ConnectEvent.SPort), uint16(IPv4ConnectEvent.DPort)}
mockEbpfTracker.handleConnection(IPv4ConnectEvent.Type, tuple, int(IPv4ConnectEvent.Pid), strconv.FormatUint(uint64(IPv4ConnectEvent.NetNS), 10))
if !reflect.DeepEqual(mockEbpfTracker.openConnections[tuple.String()], IPv4ConnectEbpfConnection) {
t.Errorf("Connection mismatch connect event\nTarget connection:%v\nParsed connection:%v",
IPv4ConnectEbpfConnection, mockEbpfTracker.openConnections[tuple.String()])
}
tuple = fourTuple{IPv4ConnectCloseEvent.SAddr.String(), IPv4ConnectCloseEvent.DAddr.String(), uint16(IPv4ConnectCloseEvent.SPort), uint16(IPv4ConnectCloseEvent.DPort)}
mockEbpfTracker.handleConnection(IPv4ConnectCloseEvent.Type, tuple, int(IPv4ConnectCloseEvent.Pid), strconv.FormatUint(uint64(IPv4ConnectCloseEvent.NetNS), 10))
if len(mockEbpfTracker.openConnections) != 0 {
t.Errorf("Connection mismatch close event\nConnection to close:%v",
mockEbpfTracker.openConnections[tuple.String()])
}
mockEbpfTracker = &EbpfTracker{
readyToHandleConnections: true,
dead: false,
openConnections: map[string]ebpfConnection{},
closedConnections: []ebpfConnection{},
}
tuple = fourTuple{IPv4AcceptEvent.SAddr.String(), IPv4AcceptEvent.DAddr.String(), uint16(IPv4AcceptEvent.SPort), uint16(IPv4AcceptEvent.DPort)}
mockEbpfTracker.handleConnection(IPv4AcceptEvent.Type, tuple, int(IPv4AcceptEvent.Pid), strconv.FormatUint(uint64(IPv4AcceptEvent.NetNS), 10))
if !reflect.DeepEqual(mockEbpfTracker.openConnections[tuple.String()], IPv4AcceptEbpfConnection) {
t.Errorf("Connection mismatch connect event\nTarget connection:%v\nParsed connection:%v",
IPv4AcceptEbpfConnection, mockEbpfTracker.openConnections[tuple.String()])
}
tuple = fourTuple{IPv4AcceptCloseEvent.SAddr.String(), IPv4AcceptCloseEvent.DAddr.String(), uint16(IPv4AcceptCloseEvent.SPort), uint16(IPv4AcceptCloseEvent.DPort)}
mockEbpfTracker.handleConnection(IPv4AcceptCloseEvent.Type, tuple, int(IPv4AcceptCloseEvent.Pid), strconv.FormatUint(uint64(IPv4AcceptCloseEvent.NetNS), 10))
if len(mockEbpfTracker.openConnections) != 0 {
t.Errorf("Connection mismatch close event\nConnection to close:%v",
mockEbpfTracker.openConnections)
}
}
func TestWalkConnections(t *testing.T) {
var (
cnt int
activeTuple = fourTuple{
fromAddr: "",
toAddr: "",
fromPort: 0,
toPort: 0,
}
inactiveTuple = fourTuple{
fromAddr: "",
toAddr: "",
fromPort: 0,
toPort: 0,
}
)
mockEbpfTracker := &EbpfTracker{
readyToHandleConnections: true,
dead: false,
openConnections: map[string]ebpfConnection{
activeTuple.String(): {
tuple: activeTuple,
networkNamespace: "12345",
incoming: true,
pid: 0,
},
},
closedConnections: []ebpfConnection{
{
tuple: inactiveTuple,
networkNamespace: "12345",
incoming: false,
pid: 0,
},
},
}
mockEbpfTracker.walkConnections(func(e ebpfConnection) {
cnt++
})
if cnt != 2 {
t.Errorf("walkConnetions found %v instead of 2 connections", cnt)
}
}

View File

@@ -0,0 +1,45 @@
package endpoint
import (
"fmt"
"sort"
"strings"
)
// fourTuple is an (IP, port, IP, port) tuple, representing a connection
// active tells whether the connection belongs to an activeFlow (see
// conntrack.go)
type fourTuple struct {
fromAddr, toAddr string
fromPort, toPort uint16
}
func (t fourTuple) String() string {
return fmt.Sprintf("%s:%d-%s:%d", t.fromAddr, t.fromPort, t.toAddr, t.toPort)
}
// key is a sortable direction-independent key for tuples, used to look up a
// fourTuple when you are unsure of its direction.
func (t fourTuple) key() string {
key := []string{
fmt.Sprintf("%s:%d", t.fromAddr, t.fromPort),
fmt.Sprintf("%s:%d", t.toAddr, t.toPort),
}
sort.Strings(key)
return strings.Join(key, " ")
}
// reverse flips the direction of the tuple
func (t *fourTuple) reverse() {
t.fromAddr, t.fromPort, t.toAddr, t.toPort = t.toAddr, t.toPort, t.fromAddr, t.fromPort
}
// reverse flips the direction of a tuple, without side effects
func reverse(tuple fourTuple) fourTuple {
return fourTuple{
fromAddr: tuple.toAddr,
toAddr: tuple.fromAddr,
fromPort: tuple.toPort,
toPort: tuple.fromPort,
}
}

View File

@@ -49,7 +49,7 @@ func toMapping(f flow) *endpointMapping {
// applyNAT duplicates Nodes in the endpoint topology of a report, based on
// the NAT table.
func (n natMapper) applyNAT(rpt report.Report, scope string) {
n.flowWalker.walkFlows(func(f flow) {
n.flowWalker.walkFlows(func(f flow, active bool) {
mapping := toMapping(f)
realEndpointPort := strconv.Itoa(mapping.originalPort)

View File

@@ -13,9 +13,9 @@ type mockFlowWalker struct {
flows []flow
}
func (m *mockFlowWalker) walkFlows(f func(flow)) {
func (m *mockFlowWalker) walkFlows(f func(f flow, active bool)) {
for _, flow := range m.flows {
f(flow)
f(flow, true)
}
}

View File

@@ -20,6 +20,11 @@ const (
targetWalkTime = 10 * time.Second // Aim at walking all files in 10 seconds
)
type reader interface {
getWalkedProcPid(buf *bytes.Buffer) (map[uint64]*Proc, error)
stop()
}
type backgroundReader struct {
stopc chan struct{}
mtx sync.Mutex
@@ -29,7 +34,7 @@ type backgroundReader struct {
// starts a rate-limited background goroutine to read the expensive files from
// proc.
func newBackgroundReader(walker process.Walker) *backgroundReader {
func newBackgroundReader(walker process.Walker) reader {
br := &backgroundReader{
stopc: make(chan struct{}),
latestSockets: map[uint64]*Proc{},
@@ -54,28 +59,6 @@ func (br *backgroundReader) getWalkedProcPid(buf *bytes.Buffer) (map[uint64]*Pro
return br.latestSockets, err
}
type walkResult struct {
buf *bytes.Buffer
sockets map[uint64]*Proc
}
func performWalk(w pidWalker, c chan<- walkResult) {
var (
err error
result = walkResult{
buf: bytes.NewBuffer(make([]byte, 0, 5000)),
}
)
result.sockets, err = w.walk(result.buf)
if err != nil {
log.Errorf("background /proc reader: error walking /proc: %s", err)
result.buf.Reset()
result.sockets = nil
}
c <- result
}
func (br *backgroundReader) loop(walker process.Walker) {
var (
begin time.Time // when we started the last performWalk
@@ -120,6 +103,71 @@ func (br *backgroundReader) loop(walker process.Walker) {
}
}
type foregroundReader struct {
stopc chan struct{}
latestBuf *bytes.Buffer
latestSockets map[uint64]*Proc
ticker *time.Ticker
}
// reads synchronously files from /proc
func newForegroundReader(walker process.Walker) reader {
fr := &foregroundReader{
stopc: make(chan struct{}),
latestSockets: map[uint64]*Proc{},
}
var (
walkc = make(chan walkResult)
ticker = time.NewTicker(time.Millisecond) // fire every millisecond
pWalker = newPidWalker(walker, ticker.C, fdBlockSize)
)
go performWalk(pWalker, walkc)
result := <-walkc
fr.latestBuf = result.buf
fr.latestSockets = result.sockets
fr.ticker = ticker
return fr
}
func (fr *foregroundReader) stop() {
fr.ticker.Stop()
close(fr.stopc)
}
func (fr *foregroundReader) getWalkedProcPid(buf *bytes.Buffer) (map[uint64]*Proc, error) {
// Don't access latestBuf directly but create a reader. In this way,
// the buffer will not be empty in the next call of getWalkedProcPid
// and it can be copied again.
_, err := io.Copy(buf, bytes.NewReader(fr.latestBuf.Bytes()))
return fr.latestSockets, err
}
type walkResult struct {
buf *bytes.Buffer
sockets map[uint64]*Proc
}
func performWalk(w pidWalker, c chan<- walkResult) {
var (
err error
result = walkResult{
buf: bytes.NewBuffer(make([]byte, 0, 5000)),
}
)
result.sockets, err = w.walk(result.buf)
if err != nil {
log.Errorf("background /proc reader: error walking /proc: %s", err)
result.buf.Reset()
result.sockets = nil
}
c <- result
}
// Adjust rate limit for next walk and calculate when it should be started
func scheduleNextWalk(rateLimitPeriod time.Duration, took time.Duration) (newRateLimitPeriod time.Duration, restInterval time.Duration) {
log.Debugf("background /proc reader: full pass took %s", took)

View File

@@ -18,6 +18,11 @@ func NewConnectionScanner(_ process.Walker) ConnectionScanner {
return &darwinScanner{}
}
// NewSyncConnectionScanner creates a new synchronous Darwin ConnectionScanner
func NewSyncConnectionScanner(_ process.Walker) ConnectionScanner {
return &darwinScanner{}
}
type darwinScanner struct{}
// Connections returns all established (TCP) connections. No need to be root

View File

@@ -38,8 +38,14 @@ func NewConnectionScanner(walker process.Walker) ConnectionScanner {
return &linuxScanner{br}
}
// NewSyncConnectionScanner creates a new synchronous Linux ConnectionScanner
func NewSyncConnectionScanner(walker process.Walker) ConnectionScanner {
fr := newForegroundReader(walker)
return &linuxScanner{fr}
}
type linuxScanner struct {
br *backgroundReader
r reader
}
func (s *linuxScanner) Connections(processes bool) (ConnIter, error) {
@@ -50,7 +56,7 @@ func (s *linuxScanner) Connections(processes bool) (ConnIter, error) {
var procs map[uint64]*Proc
if processes {
var err error
if procs, err = s.br.getWalkedProcPid(buf); err != nil {
if procs, err = s.r.getWalkedProcPid(buf); err != nil {
return nil, err
}
}
@@ -68,5 +74,5 @@ func (s *linuxScanner) Connections(processes bool) (ConnIter, error) {
}
func (s *linuxScanner) Stop() {
s.br.stop()
s.r.stop()
}

View File

@@ -1,16 +1,10 @@
package endpoint
import (
"fmt"
"sort"
"strconv"
"strings"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/scope/probe/endpoint/procspy"
"github.com/weaveworks/scope/probe/process"
"github.com/weaveworks/scope/report"
)
@@ -19,6 +13,7 @@ const (
Addr = "addr" // typically IPv4
Port = "port"
Conntracked = "conntracked"
EBPF = "eBPF"
Procspied = "procspied"
ReverseDNSNames = "reverse_dns_names"
SnoopedDNSNames = "snooped_dns_names"
@@ -31,6 +26,7 @@ type ReporterConfig struct {
SpyProcs bool
UseConntrack bool
WalkProc bool
UseEbpfConn bool
ProcRoot string
BufferSize int
Scanner procspy.ConnectionScanner
@@ -39,10 +35,9 @@ type ReporterConfig struct {
// Reporter generates Reports containing the Endpoint topology.
type Reporter struct {
conf ReporterConfig
flowWalker flowWalker // interface
natMapper natMapper
reverseResolver *reverseResolver
conf ReporterConfig
connectionTracker connectionTracker
natMapper natMapper
}
// SpyDuration is an exported prometheus metric
@@ -64,10 +59,20 @@ var SpyDuration = prometheus.NewSummaryVec(
// with process (PID) information.
func NewReporter(conf ReporterConfig) *Reporter {
return &Reporter{
conf: conf,
flowWalker: newConntrackFlowWalker(conf.UseConntrack, conf.ProcRoot, conf.BufferSize),
natMapper: makeNATMapper(newConntrackFlowWalker(conf.UseConntrack, conf.ProcRoot, conf.BufferSize, "--any-nat")),
reverseResolver: newReverseResolver(),
conf: conf,
connectionTracker: newConnectionTracker(connectionTrackerConfig{
HostID: conf.HostID,
HostName: conf.HostName,
SpyProcs: conf.SpyProcs,
UseConntrack: conf.UseConntrack,
WalkProc: conf.WalkProc,
UseEbpfConn: conf.UseEbpfConn,
ProcRoot: conf.ProcRoot,
BufferSize: conf.BufferSize,
Scanner: conf.Scanner,
DNSSnooper: conf.DNSSnooper,
}),
natMapper: makeNATMapper(newConntrackFlowWalker(conf.UseConntrack, conf.ProcRoot, conf.BufferSize, "--any-nat")),
}
}
@@ -76,141 +81,20 @@ func (Reporter) Name() string { return "Endpoint" }
// Stop stop stop
func (r *Reporter) Stop() {
r.flowWalker.stop()
r.connectionTracker.Stop()
r.natMapper.stop()
r.reverseResolver.stop()
r.conf.Scanner.Stop()
}
type fourTuple struct {
fromAddr, toAddr string
fromPort, toPort uint16
}
// key is a sortable direction-independent key for tuples, used to look up a
// fourTuple, when you are unsure of it's direction.
func (t fourTuple) key() string {
key := []string{
fmt.Sprintf("%s:%d", t.fromAddr, t.fromPort),
fmt.Sprintf("%s:%d", t.toAddr, t.toPort),
}
sort.Strings(key)
return strings.Join(key, " ")
}
// reverse flips the direction of the tuple
func (t *fourTuple) reverse() {
t.fromAddr, t.fromPort, t.toAddr, t.toPort = t.toAddr, t.toPort, t.fromAddr, t.fromPort
}
// Report implements Reporter.
func (r *Reporter) Report() (report.Report, error) {
defer func(begin time.Time) {
SpyDuration.WithLabelValues().Observe(time.Since(begin).Seconds())
}(time.Now())
hostNodeID := report.MakeHostNodeID(r.conf.HostID)
rpt := report.MakeReport()
seenTuples := map[string]fourTuple{}
// Consult the flowWalker for short-lived connections
{
extraNodeInfo := map[string]string{
Conntracked: "true",
}
r.flowWalker.walkFlows(func(f flow) {
tuple := fourTuple{
f.Original.Layer3.SrcIP,
f.Original.Layer3.DstIP,
uint16(f.Original.Layer4.SrcPort),
uint16(f.Original.Layer4.DstPort),
}
// Handle DNAT-ed short-lived connections.
// The NAT mapper won't help since it only runs periodically,
// missing the short-lived connections.
if f.Original.Layer3.DstIP != f.Reply.Layer3.SrcIP {
tuple = fourTuple{
f.Reply.Layer3.DstIP,
f.Reply.Layer3.SrcIP,
uint16(f.Reply.Layer4.DstPort),
uint16(f.Reply.Layer4.SrcPort),
}
}
seenTuples[tuple.key()] = tuple
r.addConnection(&rpt, tuple, "", extraNodeInfo, extraNodeInfo)
})
}
if r.conf.WalkProc {
conns, err := r.conf.Scanner.Connections(r.conf.SpyProcs)
if err != nil {
return rpt, err
}
for conn := conns.Next(); conn != nil; conn = conns.Next() {
var (
namespaceID string
tuple = fourTuple{
conn.LocalAddress.String(),
conn.RemoteAddress.String(),
conn.LocalPort,
conn.RemotePort,
}
toNodeInfo = map[string]string{Procspied: "true"}
fromNodeInfo = map[string]string{Procspied: "true"}
)
if conn.Proc.PID > 0 {
fromNodeInfo[process.PID] = strconv.FormatUint(uint64(conn.Proc.PID), 10)
fromNodeInfo[report.HostNodeID] = hostNodeID
}
if conn.Proc.NetNamespaceID > 0 {
namespaceID = strconv.FormatUint(conn.Proc.NetNamespaceID, 10)
}
// If we've already seen this connection, we should know the direction
// (or have already figured it out), so we normalize and use the
// canonical direction. Otherwise, we can use a port-heuristic to guess
// the direction.
canonical, ok := seenTuples[tuple.key()]
if (ok && canonical != tuple) || (!ok && tuple.fromPort < tuple.toPort) {
tuple.reverse()
toNodeInfo, fromNodeInfo = fromNodeInfo, toNodeInfo
}
r.addConnection(&rpt, tuple, namespaceID, fromNodeInfo, toNodeInfo)
}
}
r.connectionTracker.ReportConnections(&rpt)
r.natMapper.applyNAT(rpt, r.conf.HostID)
return rpt, nil
}
func (r *Reporter) addConnection(rpt *report.Report, t fourTuple, namespaceID string, extraFromNode, extraToNode map[string]string) {
var (
fromNode = r.makeEndpointNode(namespaceID, t.fromAddr, t.fromPort, extraFromNode)
toNode = r.makeEndpointNode(namespaceID, t.toAddr, t.toPort, extraToNode)
)
rpt.Endpoint = rpt.Endpoint.AddNode(fromNode.WithEdge(toNode.ID, report.EdgeMetadata{}))
rpt.Endpoint = rpt.Endpoint.AddNode(toNode)
}
func (r *Reporter) makeEndpointNode(namespaceID string, addr string, port uint16, extra map[string]string) report.Node {
portStr := strconv.Itoa(int(port))
node := report.MakeNodeWith(
report.MakeEndpointNodeID(r.conf.HostID, namespaceID, addr, portStr),
map[string]string{Addr: addr, Port: portStr})
if names := r.conf.DNSSnooper.CachedNamesForIP(addr); len(names) > 0 {
node = node.WithSet(SnoopedDNSNames, report.MakeStringSet(names...))
}
if names, err := r.reverseResolver.get(addr); err == nil && len(names) > 0 {
node = node.WithSet(ReverseDNSNames, report.MakeStringSet(names...))
}
if extra != nil {
node = node.WithLatests(extra)
}
return node
}
func newu64(i uint64) *uint64 {
return &i
}

View File

@@ -99,6 +99,7 @@ type probeFlags struct {
spyProcs bool // Associate endpoints with processes (must be root)
procEnabled bool // Produce process topology & process nodes in endpoint
useEbpfConn bool // Enable connection tracking with eBPF
procRoot string
dockerEnabled bool
@@ -283,6 +284,7 @@ func main() {
flag.BoolVar(&flags.probe.spyProcs, "probe.proc.spy", true, "associate endpoints with processes (needs root)")
flag.StringVar(&flags.probe.procRoot, "probe.proc.root", "/proc", "location of the proc filesystem")
flag.BoolVar(&flags.probe.procEnabled, "probe.processes", true, "produce process topology & include procspied connections")
flag.BoolVar(&flags.probe.useEbpfConn, "probe.ebpf.connections", false, "enable connection tracking with eBPF")
// Docker
flag.BoolVar(&flags.probe.dockerEnabled, "probe.docker", false, "collect Docker-related attributes for processes")

View File

@@ -161,7 +161,10 @@ func probeMain(flags probeFlags, targets []appclient.Target) {
var scanner procspy.ConnectionScanner
if flags.procEnabled {
processCache = process.NewCachingWalker(process.NewWalker(flags.procRoot))
scanner = procspy.NewConnectionScanner(processCache)
// The eBPF tracker finds connections itself and does not need the connection scanner
if !flags.useEbpfConn {
scanner = procspy.NewConnectionScanner(processCache)
}
p.AddTicker(processCache)
p.AddReporter(process.NewReporter(processCache, hostID, process.GetDeltaTotalJiffies, flags.noCommandLineArguments))
}
@@ -179,6 +182,7 @@ func probeMain(flags probeFlags, targets []appclient.Target) {
SpyProcs: flags.spyProcs,
UseConntrack: flags.useConntrack,
WalkProc: flags.procEnabled,
UseEbpfConn: flags.useEbpfConn,
ProcRoot: flags.procRoot,
BufferSize: flags.conntrackBufferSize,
Scanner: scanner,

View File

@@ -82,6 +82,18 @@ func ColorConnected(r Renderer) Renderer {
// FilterFunc is the function type used by Filters
type FilterFunc func(report.Node) bool
// AnyFilterFunc checks if any of the filterfuncs matches.
func AnyFilterFunc(fs ...FilterFunc) FilterFunc {
return func(n report.Node) bool {
for _, f := range fs {
if f(n) {
return true
}
}
return false
}
}
// ComposeFilterFuncs composes filterfuncs into a single FilterFunc checking all.
func ComposeFilterFuncs(fs ...FilterFunc) FilterFunc {
return func(n report.Node) bool {
@@ -224,15 +236,30 @@ func IsRunning(n report.Node) bool {
// IsStopped checks if the node is *not* a running docker container
var IsStopped = Complement(IsRunning)
func nonProcspiedFilter(node report.Node) bool {
_, ok := node.Latest.Lookup(endpoint.Procspied)
return ok
}
func nonEBPFFilter(node report.Node) bool {
_, ok := node.Latest.Lookup(endpoint.EBPF)
return ok
}
// FilterNonProcspied removes endpoints which were not found in procspy.
func FilterNonProcspied(r Renderer) Renderer {
return MakeFilter(
func(node report.Node) bool {
_, ok := node.Latest.Lookup(endpoint.Procspied)
return ok
},
r,
)
return MakeFilter(nonProcspiedFilter, r)
}
// FilterNonEBPF removes endpoints which were not found via eBPF.
func FilterNonEBPF(r Renderer) Renderer {
return MakeFilter(nonEBPFFilter, r)
}
// FilterNonProcspiedNorEBPF removes endpoints which were not found in procspy
// nor via eBPF.
func FilterNonProcspiedNorEBPF(r Renderer) Renderer {
return MakeFilter(AnyFilterFunc(nonProcspiedFilter, nonEBPFFilter), r)
}
// IsApplication checks if the node is an "application" node

View File

@@ -23,7 +23,7 @@ func renderProcesses(rpt report.Report) bool {
}
// EndpointRenderer is a Renderer which produces a renderable endpoint graph.
var EndpointRenderer = FilterNonProcspied(SelectEndpoint)
var EndpointRenderer = FilterNonProcspiedNorEBPF(SelectEndpoint)
// ProcessRenderer is a Renderer which produces a renderable process
// graph by merging the endpoint graph and the process topology.

1
scope
View File

@@ -170,6 +170,7 @@ launch_command() {
echo docker run --privileged $USERNS_HOST -d --name="$SCOPE_CONTAINER_NAME" --net=host --pid=host \
-v /var/run/docker.sock:/var/run/docker.sock \
-v /var/run/scope/plugins:/var/run/scope/plugins \
-v /sys/kernel/debug:/sys/kernel/debug \
-e CHECKPOINT_DISABLE \
$WEAVESCOPE_DOCKER_ARGS "$SCOPE_IMAGE" --probe.docker=true
}