Merge pull request #2518 from kinvolk/alban/fdinstall

handle fdinstall events from tcptracer-bpf (aka "accept before kretprobe" issue)
This commit is contained in:
Alfonso Acosta
2017-05-19 15:52:54 +02:00
committed by GitHub
28 changed files with 420 additions and 93 deletions

View File

@@ -0,0 +1,39 @@
#! /bin/bash
# shellcheck disable=SC1091
. ./config.sh
start_suite "Test accept before kretprobe, see https://github.com/weaveworks/tcptracer-bpf/issues/10"
weave_on "$HOST1" launch
# Launch the server before Scope to make sure it calls accept() before Scope's
# kretprobe on the accept function is installed. We use busybox' nc instead of
# Alpine's nc so that it blocks on the accept() syscall.
weave_on "$HOST1" run -d --name server busybox /bin/sh -c "while true; do \
date ;
sleep 1 ;
done | nc -l -p 8080"
scope_on "$HOST1" launch --probe.ebpf.connections=true
wait_for_containers "$HOST1" 60 server
has_container "$HOST1" server
weave_on "$HOST1" run -d --name client busybox /bin/sh -c "ping -c 5 server.weave.local; \
while true; do \
date ;
sleep 1 ;
done | nc server.weave.local 8080"
wait_for_containers "$HOST1" 60 server client
has_container "$HOST1" client
list_containers "$HOST1"
list_connections "$HOST1"
has_connection containers "$HOST1" client server
endpoints_have_ebpf "$HOST1"
scope_end_suite

View File

@@ -43,14 +43,15 @@ scope_end_suite() {
list_containers() {
local host=$1
echo "Listing containers on ${host}:"
curl -s "http://${host}:4040/api/topology/containers?system=show" | jq -r '.nodes[] | select(has("metadata")) | .metadata[] | select(.id == "docker_image_name") | .value'
curl -s "http://${host}:4040/api/topology/containers?system=show" | jq -r '.nodes[] | select(has("metadata")) | { "image": .metadata[] | select(.id == "docker_image_name") | .value, "label": .label, "id": .id} | .id + " (" + .image + ", " + .label + ")"'
echo
}
list_connections() {
local host=$1
echo "Listing connections on ${host}:"
curl -s "http://${host}:4040/api/topology/containers?system=show" | jq -r '.nodes[] | select(has("adjacency")) | { "from": .id, "to": .adjacency[]} | .from + " -> " + .to'
curl -s "http://${host}:4040/api/topology/containers?system=show" | jq -r '.nodes[] | select(has("adjacency")) | { "from_name": .label, "from_id": .id, "to": .adjacency[]} | .from_id + " (" + .from_name+ ") -> " + .to'
echo
}
# this checks we have a named node in the given view

View File

@@ -26,6 +26,7 @@ setup_host() {
echo Prefetching Images on "$HOST"
docker_on "$HOST" pull peterbourgon/tns-db
docker_on "$HOST" pull alpine
docker_on "$HOST" pull busybox
docker_on "$HOST" pull nginx
}

View File

@@ -171,7 +171,8 @@ func (t *connectionTracker) performWalkProc(rpt *report.Report, hostNodeID strin
// once to initialize ebpfTracker
func (t *connectionTracker) getInitialState() {
var processCache *process.CachingWalker
processCache = process.NewCachingWalker(process.NewWalker(t.conf.ProcRoot))
walker := process.NewWalker(t.conf.ProcRoot, true)
processCache = process.NewCachingWalker(walker)
processCache.Tick()
scanner := procspy.NewSyncConnectionScanner(processCache)
@@ -194,7 +195,14 @@ func (t *connectionTracker) getInitialState() {
}
scanner.Stop()
t.ebpfTracker.feedInitialConnections(conns, seenTuples, report.MakeHostNodeID(t.conf.HostID))
processesWaitingInAccept := []int{}
processCache.Walk(func(p, prev process.Process) {
if p.IsWaitingInAccept {
processesWaitingInAccept = append(processesWaitingInAccept, p.PID)
}
})
t.ebpfTracker.feedInitialConnections(conns, seenTuples, processesWaitingInAccept, report.MakeHostNodeID(t.conf.HostID))
}
func (t *connectionTracker) performEbpfTrack(rpt *report.Report, hostNodeID string) error {

View File

@@ -1,14 +1,18 @@
package endpoint
import (
"bytes"
"fmt"
"regexp"
"strconv"
"sync"
"syscall"
log "github.com/Sirupsen/logrus"
"github.com/weaveworks/common/fs"
"github.com/weaveworks/scope/probe/endpoint/procspy"
"github.com/weaveworks/scope/probe/host"
"github.com/weaveworks/scope/probe/process"
"github.com/weaveworks/tcptracer-bpf/pkg/tracer"
)
@@ -23,7 +27,7 @@ type ebpfConnection struct {
type eventTracker interface {
handleConnection(ev tracer.EventType, tuple fourTuple, pid int, networkNamespace string)
walkConnections(f func(ebpfConnection))
feedInitialConnections(ci procspy.ConnIter, seenTuples map[string]fourTuple, hostNodeID string)
feedInitialConnections(ci procspy.ConnIter, seenTuples map[string]fourTuple, processesWaitingInAccept []int, hostNodeID string)
isReadyToHandleConnections() bool
isDead() bool
stop()
@@ -111,8 +115,12 @@ func tcpEventCbV4(e tracer.TcpV4) {
lastTimestampV4 = e.Timestamp
tuple := fourTuple{e.SAddr.String(), e.DAddr.String(), e.SPort, e.DPort}
ebpfTracker.handleConnection(e.Type, tuple, int(e.Pid), strconv.Itoa(int(e.NetNS)))
if e.Type == tracer.EventFdInstall {
ebpfTracker.handleFdInstall(e.Type, int(e.Pid), int(e.Fd))
} else {
tuple := fourTuple{e.SAddr.String(), e.DAddr.String(), e.SPort, e.DPort}
ebpfTracker.handleConnection(e.Type, tuple, int(e.Pid), strconv.Itoa(int(e.NetNS)))
}
}
func tcpEventCbV6(e tracer.TcpV6) {
@@ -125,6 +133,73 @@ func lostCb(count uint64) {
ebpfTracker.stop()
}
func tupleFromPidFd(pid int, fd int) (tuple fourTuple, netns string, ok bool) {
// read /proc/$pid/ns/net
//
// probe/endpoint/procspy/proc_linux.go supports Linux < 3.8 but we
// don't need that here since ebpf-enabled kernels will be > 3.8
netnsIno, err := procspy.ReadNetnsFromPID(pid)
if err != nil {
log.Debugf("netns proc file for pid %d disappeared before we could read it: %v", pid, err)
return fourTuple{}, "", false
}
netns = fmt.Sprintf("%d", netnsIno)
// find /proc/$pid/fd/$fd's ino
fdFilename := fmt.Sprintf("/proc/%d/fd/%d", pid, fd)
var statFdFile syscall.Stat_t
if err := fs.Stat(fdFilename, &statFdFile); err != nil {
log.Debugf("proc file %q disappeared before we could read it", fdFilename)
return fourTuple{}, "", false
}
if statFdFile.Mode&syscall.S_IFMT != syscall.S_IFSOCK {
log.Errorf("file %q is not a socket", fdFilename)
return fourTuple{}, "", false
}
ino := statFdFile.Ino
// read both /proc/pid/net/{tcp,tcp6}
buf := bytes.NewBuffer(make([]byte, 0, 5000))
if _, err := procspy.ReadTCPFiles(pid, buf); err != nil {
log.Debugf("TCP proc file for pid %d disappeared before we could read it: %v", pid, err)
return fourTuple{}, "", false
}
// find /proc/$pid/fd/$fd's ino in /proc/pid/net/tcp
pn := procspy.NewProcNet(buf.Bytes())
for {
n := pn.Next()
if n == nil {
log.Debugf("connection for proc file %q not found. buf=%q", fdFilename, buf.String())
break
}
if n.Inode == ino {
return fourTuple{n.LocalAddress.String(), n.RemoteAddress.String(), n.LocalPort, n.RemotePort}, netns, true
}
}
return fourTuple{}, "", false
}
func (t *EbpfTracker) handleFdInstall(ev tracer.EventType, pid int, fd int) {
tuple, netns, ok := tupleFromPidFd(pid, fd)
log.Debugf("EbpfTracker: got fd-install event: pid=%d fd=%d -> tuple=%s netns=%s ok=%v", pid, fd, tuple, netns, ok)
if !ok {
return
}
conn := ebpfConnection{
incoming: true,
tuple: tuple,
pid: pid,
networkNamespace: netns,
}
t.openConnections[tuple.String()] = conn
if !process.IsProcInAccept("/proc", strconv.Itoa(pid)) {
t.tracer.RemoveFdInstallWatcher(uint32(pid))
}
}
func (t *EbpfTracker) handleConnection(ev tracer.EventType, tuple fourTuple, pid int, networkNamespace string) {
t.Lock()
defer t.Unlock()
@@ -160,6 +235,8 @@ func (t *EbpfTracker) handleConnection(ev tracer.EventType, tuple fourTuple, pid
} else {
log.Debugf("EbpfTracker: unmatched close event: %s pid=%d netns=%s", tuple.String(), pid, networkNamespace)
}
default:
log.Debugf("EbpfTracker: unknown event: %s (%d)", ev, ev)
}
}
@@ -178,7 +255,7 @@ func (t *EbpfTracker) walkConnections(f func(ebpfConnection)) {
t.closedConnections = t.closedConnections[:0]
}
func (t *EbpfTracker) feedInitialConnections(conns procspy.ConnIter, seenTuples map[string]fourTuple, hostNodeID string) {
func (t *EbpfTracker) feedInitialConnections(conns procspy.ConnIter, seenTuples map[string]fourTuple, processesWaitingInAccept []int, hostNodeID string) {
t.readyToHandleConnections = true
for conn := conns.Next(); conn != nil; conn = conns.Next() {
var (
@@ -204,6 +281,10 @@ func (t *EbpfTracker) feedInitialConnections(conns procspy.ConnIter, seenTuples
t.handleConnection(tracer.EventAccept, tuple, int(conn.Proc.PID), namespaceID)
}
}
for _, p := range processesWaitingInAccept {
t.tracer.AddFdInstallWatcher(uint32(p))
log.Debugf("EbpfTracker: install fd-install watcher: pid=%d", p)
}
}
func (t *EbpfTracker) isReadyToHandleConnections() bool {

View File

@@ -0,0 +1,16 @@
package procspy
import (
"bytes"
"fmt"
)
// ReadTCPFiles reads the proc files tcp and tcp6 for a pid
func ReadTCPFiles(pid int, buf *bytes.Buffer) (int64, error) {
return 0, fmt.Errorf("not supported on non-Linux systems")
}
// ReadNetnsFromPID gets the netns inode of the specified pid
func ReadNetnsFromPID(pid int) (uint64, error) {
return 0, fmt.Errorf("not supported on non-Linux systems")
}

View File

@@ -62,7 +62,7 @@ func TestWalkProcPid(t *testing.T) {
defer fs_hook.Restore()
buf := bytes.Buffer{}
walker := process.NewWalker(procRoot)
walker := process.NewWalker(procRoot, false)
ticker := time.NewTicker(time.Millisecond)
defer ticker.Stop()
pWalker := newPidWalker(walker, ticker.C, 1)

View File

@@ -25,11 +25,10 @@ var (
)
type pidWalker struct {
walker process.Walker
tickc <-chan time.Time // Rate-limit clock. Sets the pace when traversing namespaces and /proc/PID/fd/* files.
stopc chan struct{} // Abort walk
fdBlockSize uint64 // Maximum number of /proc/PID/fd/* files to stat() per tick
netNamespacePathSuffix string
walker process.Walker
tickc <-chan time.Time // Rate-limit clock. Sets the pace when traversing namespaces and /proc/PID/fd/* files.
stopc chan struct{} // Abort walk
fdBlockSize uint64 // Maximum number of /proc/PID/fd/* files to stat() per tick
}
func newPidWalker(walker process.Walker, tickc <-chan time.Time, fdBlockSize uint64) pidWalker {
@@ -38,7 +37,6 @@ func newPidWalker(walker process.Walker, tickc <-chan time.Time, fdBlockSize uin
tickc: tickc,
fdBlockSize: fdBlockSize,
stopc: make(chan struct{}),
netNamespacePathSuffix: getNetNamespacePathSuffix(),
}
return w
}
@@ -91,10 +89,8 @@ func getNetNamespacePathSuffix() string {
return netNamespacePathSuffix
}
// Read the connections for a group of processes living in the same namespace,
// which are found (identically) in /proc/PID/net/tcp{,6} for any of the
// processes.
func readProcessConnections(buf *bytes.Buffer, namespaceProcs []*process.Process) (bool, error) {
// ReadTCPFiles reads the proc files tcp and tcp6 for a pid
func ReadTCPFiles(pid int, buf *bytes.Buffer) (int64, error) {
var (
errRead error
errRead6 error
@@ -102,31 +98,42 @@ func readProcessConnections(buf *bytes.Buffer, namespaceProcs []*process.Process
read6 int64
)
// even for tcp4 connections, we need to read the "tcp6" file because of IPv4-Mapped IPv6 Addresses
dirName := strconv.Itoa(pid)
read, errRead = readFile(filepath.Join(procRoot, dirName, "/net/tcp"), buf)
read6, errRead6 = readFile(filepath.Join(procRoot, dirName, "/net/tcp6"), buf)
if errRead != nil {
return read + read6, errRead
}
return read + read6, errRead6
}
// Read the connections for a group of processes living in the same namespace,
// which are found (identically) in /proc/PID/net/tcp{,6} for any of the
// processes.
func readProcessConnections(buf *bytes.Buffer, namespaceProcs []*process.Process) (bool, error) {
var (
read int64
err error
)
for _, p := range namespaceProcs {
dirName := strconv.Itoa(p.PID)
read, errRead = readFile(filepath.Join(procRoot, dirName, "/net/tcp"), buf)
read6, errRead6 = readFile(filepath.Join(procRoot, dirName, "/net/tcp6"), buf)
if errRead != nil || errRead6 != nil {
read, err = ReadTCPFiles(p.PID, buf)
if err != nil {
// try next process
continue
}
// Return after succeeding on any process
// (proc/PID/net/tcp and proc/PID/net/tcp6 are identical for all the processes in the same namespace)
return read+read6 > 0, nil
return read > 0, nil
}
// It would be cool to have an "or" error combinator
if errRead != nil {
return false, errRead
}
if errRead6 != nil {
return false, errRead6
if err != nil {
return false, err
}
return false, nil
}
// walkNamespace does the work of walk for a single namespace
@@ -199,6 +206,19 @@ func (w pidWalker) walkNamespace(namespaceID uint64, buf *bytes.Buffer, sockets
return nil
}
// ReadNetnsFromPID gets the netns inode of the specified pid
func ReadNetnsFromPID(pid int) (uint64, error) {
var statT syscall.Stat_t
dirName := strconv.Itoa(pid)
netNamespacePath := filepath.Join(procRoot, dirName, getNetNamespacePathSuffix())
if err := fs.Stat(netNamespacePath, &statT); err != nil {
return 0, err
}
return statT.Ino, nil
}
// walk walks over all numerical (PID) /proc entries. It reads
// /proc/PID/net/tcp{,6} for each namespace and sees if the ./fd/* files of each
// process in that namespace are symlinks to sockets. Returns a map from socket
@@ -207,7 +227,6 @@ func (w pidWalker) walk(buf *bytes.Buffer) (map[uint64]*Proc, error) {
var (
sockets = map[uint64]*Proc{} // map socket inode -> process
namespaces = map[uint64][]*process.Process{} // map network namespace id -> processes
statT syscall.Stat_t
)
// We do two process traversals: One to group processes by namespace and
@@ -219,14 +238,11 @@ func (w pidWalker) walk(buf *bytes.Buffer) (map[uint64]*Proc, error) {
// the processes living in that namespace.
w.walker.Walk(func(p, _ process.Process) {
dirName := strconv.Itoa(p.PID)
netNamespacePath := filepath.Join(procRoot, dirName, w.netNamespacePathSuffix)
if err := fs.Stat(netNamespacePath, &statT); err != nil {
namespaceID, err := ReadNetnsFromPID(p.PID)
if err != nil {
return
}
namespaceID := statT.Ino
namespaces[namespaceID] = append(namespaces[namespaceID], &p)
})

View File

@@ -63,12 +63,12 @@ again:
p.c.LocalAddress, p.c.LocalPort = scanAddressNA(local, &p.bytesLocal)
p.c.RemoteAddress, p.c.RemotePort = scanAddressNA(remote, &p.bytesRemote)
p.c.inode = parseDec(inode)
p.c.Inode = parseDec(inode)
p.b = nextLine(b)
if _, alreadySeen := p.seen[p.c.inode]; alreadySeen {
if _, alreadySeen := p.seen[p.c.Inode]; alreadySeen {
goto again
}
p.seen[p.c.inode] = struct{}{}
p.seen[p.c.Inode] = struct{}{}
return &p.c
}

View File

@@ -20,28 +20,28 @@ func TestProcNet(t *testing.T) {
LocalPort: 0xa6c0,
RemoteAddress: net.IP([]byte{0, 0, 0, 0}),
RemotePort: 0x0,
inode: 5107,
Inode: 5107,
},
{
LocalAddress: net.IP([]byte{0, 0, 0, 0}),
LocalPort: 0x006f,
RemoteAddress: net.IP([]byte{0, 0, 0, 0}),
RemotePort: 0x0,
inode: 5084,
Inode: 5084,
},
{
LocalAddress: net.IP([]byte{0x7f, 0x0, 0x0, 0x01}),
LocalPort: 0x0019,
RemoteAddress: net.IP([]byte{0, 0, 0, 0}),
RemotePort: 0x0,
inode: 10550,
Inode: 10550,
},
{
LocalAddress: net.IP([]byte{0x2e, 0xf6, 0x2c, 0xa1}),
LocalPort: 0xe4d7,
RemoteAddress: net.IP([]byte{0xc0, 0x1e, 0xfc, 0x57}),
RemotePort: 0x01bb,
inode: 639474,
Inode: 639474,
},
}
for i := 0; i < 4; i++ {
@@ -73,7 +73,7 @@ func TestTransport6(t *testing.T) {
RemoteAddress: net.IP(make([]byte, 16)),
RemotePort: 0x0,
// uid: 0,
inode: 23661201,
Inode: 23661201,
},
{
// state: 1,
@@ -92,7 +92,7 @@ func TestTransport6(t *testing.T) {
}),
RemotePort: 0x01bb,
// uid: 1000,
inode: 36856710,
Inode: 36856710,
},
}
@@ -148,7 +148,7 @@ func TestProcNetFiltersDuplicates(t *testing.T) {
LocalPort: 0xa6c0,
RemoteAddress: net.IP([]byte{0, 0, 0, 0}),
RemotePort: 0x0,
inode: 5107,
Inode: 5107,
}
have := p.Next()
want := expected

View File

@@ -22,7 +22,7 @@ type Connection struct {
LocalPort uint16
RemoteAddress net.IP
RemotePort uint16
inode uint64
Inode uint64
Proc Proc
}

View File

@@ -26,7 +26,7 @@ func (c *pnConnIter) Next() *Connection {
bufPool.Put(c.buf)
return nil
}
if proc, ok := c.procs[n.inode]; ok {
if proc, ok := c.procs[n.Inode]; ok {
n.Proc = *proc
}
return n

View File

@@ -14,7 +14,7 @@ import (
func TestLinuxConnections(t *testing.T) {
fs_hook.Mock(mockFS)
defer fs_hook.Restore()
scanner := NewConnectionScanner(process.NewWalker("/proc"))
scanner := NewConnectionScanner(process.NewWalker("/proc", false))
defer scanner.Stop()
// let the background scanner finish its first pass
@@ -30,7 +30,7 @@ func TestLinuxConnections(t *testing.T) {
LocalPort: 42688,
RemoteAddress: net.ParseIP("0.0.0.0").To4(),
RemotePort: 0,
inode: 5107,
Inode: 5107,
Proc: Proc{
PID: 1,
Name: "foo",

View File

@@ -4,15 +4,16 @@ import "sync"
// Process represents a single process.
type Process struct {
PID, PPID int
Name string
Cmdline string
Threads int
Jiffies uint64
RSSBytes uint64
RSSBytesLimit uint64
OpenFilesCount int
OpenFilesLimit uint64
PID, PPID int
Name string
Cmdline string
Threads int
Jiffies uint64
RSSBytes uint64
RSSBytesLimit uint64
OpenFilesCount int
OpenFilesLimit uint64
IsWaitingInAccept bool
}
// Walker is something that walks the /proc directory

View File

@@ -8,7 +8,7 @@ import (
)
// NewWalker returns a Darwin (lsof-based) walker.
func NewWalker(_ string) Walker {
func NewWalker(_ string, _ bool) Walker {
return &walker{}
}
@@ -22,6 +22,13 @@ const (
// These functions copied from procspy.
// IsProcInAccept returns true if the process has a at least one thread
// blocked on the accept() system call
func IsProcInAccept(procRoot, pid string) (ret bool) {
// Not implemented on darwin
return false
}
func (walker) Walk(f func(Process, Process)) error {
output, err := exec.Command(
lsofBinary,

View File

@@ -17,7 +17,8 @@ import (
)
type walker struct {
procRoot string
procRoot string
gatheringWaitingInAccept bool
}
var (
@@ -38,8 +39,11 @@ const (
)
// NewWalker creates a new process Walker.
func NewWalker(procRoot string) Walker {
return &walker{procRoot: procRoot}
func NewWalker(procRoot string, gatheringWaitingInAccept bool) Walker {
return &walker{
procRoot: procRoot,
gatheringWaitingInAccept: gatheringWaitingInAccept,
}
}
// skipNSpaces skips nSpaces in buf and updates the cursor 'pos'
@@ -166,6 +170,30 @@ func (w *walker) readCmdline(filename string) (cmdline, name string) {
return
}
// IsProcInAccept returns true if the process has a at least one thread
// blocked on the accept() system call
func IsProcInAccept(procRoot, pid string) (ret bool) {
tasks, err := fs.ReadDirNames(path.Join(procRoot, pid, "task"))
if err != nil {
// if the process has terminated, it is obviously not blocking
// on the accept system call
return false
}
for _, tid := range tasks {
buf, err := fs.ReadFile(path.Join(procRoot, pid, "task", tid, "wchan"))
if err != nil {
// if a thread has terminated, it is obviously not
// blocking on the accept system call
continue
}
if strings.TrimSpace(string(buf)) == "inet_csk_accept" {
return true
}
}
return false
}
// Walk walks the supplied directory (expecting it to look like /proc)
// and marshalls the files into instances of Process, which it then
// passes one-by-one to the supplied function. Walk is only made public
@@ -215,17 +243,23 @@ func (w *walker) Walk(f func(Process, Process)) error {
cmdlineCache.Set([]byte(filename), []byte(fmt.Sprintf("%s\x00%s", cmdline, name)), cmdlineCacheTimeout)
}
isWaitingInAccept := false
if w.gatheringWaitingInAccept {
isWaitingInAccept = IsProcInAccept(w.procRoot, filename)
}
f(Process{
PID: pid,
PPID: ppid,
Name: name,
Cmdline: cmdline,
Threads: threads,
Jiffies: jiffies,
RSSBytes: rss,
RSSBytesLimit: rssLimit,
OpenFilesCount: openFilesCount,
OpenFilesLimit: openFilesLimit,
PID: pid,
PPID: ppid,
Name: name,
Cmdline: cmdline,
Threads: threads,
Jiffies: jiffies,
RSSBytes: rss,
RSSBytesLimit: rssLimit,
OpenFilesCount: openFilesCount,
OpenFilesLimit: openFilesLimit,
IsWaitingInAccept: isWaitingInAccept,
}, Process{})
}

View File

@@ -88,7 +88,7 @@ func TestWalker(t *testing.T) {
}
have := map[int]process.Process{}
walker := process.NewWalker("/proc")
walker := process.NewWalker("/proc", false)
err := walker.Walk(func(p, _ process.Process) {
have[p.PID] = p
})

View File

@@ -13,7 +13,7 @@ func TestBasicWalk(t *testing.T) {
procRoot = "/proc"
procFunc = func(process.Process, process.Process) {}
)
if err := process.NewWalker(procRoot).Walk(procFunc); err != nil {
if err := process.NewWalker(procRoot, false).Walk(procFunc); err != nil {
t.Fatal(err)
}
}

View File

@@ -158,7 +158,7 @@ func probeMain(flags probeFlags, targets []appclient.Target) {
var processCache *process.CachingWalker
if flags.procEnabled {
processCache = process.NewCachingWalker(process.NewWalker(flags.procRoot))
processCache = process.NewCachingWalker(process.NewWalker(flags.procRoot, false))
p.AddTicker(processCache)
p.AddReporter(process.NewReporter(processCache, hostID, process.GetDeltaTotalJiffies, flags.noCommandLineArguments))
}

View File

@@ -32,6 +32,7 @@ func tcpV4ToGo(data *[]byte) (ret TcpV4) {
ret.SPort = uint16(eventC.sport)
ret.DPort = uint16(eventC.dport)
ret.NetNS = uint32(eventC.netns)
ret.Fd = uint32(eventC.fd)
return
}
@@ -64,6 +65,7 @@ func tcpV6ToGo(data *[]byte) (ret TcpV6) {
ret.SPort = uint16(eventC.sport)
ret.DPort = uint16(eventC.dport)
ret.NetNS = uint32(eventC.netns)
ret.Fd = uint32(eventC.fd)
return
}

View File

@@ -8,9 +8,10 @@ type EventType uint32
// These constants should be in sync with the equivalent definitions in the ebpf program.
const (
EventConnect EventType = 1
EventAccept = 2
EventClose = 3
EventConnect EventType = 1
EventAccept = 2
EventClose = 3
EventFdInstall = 4
)
func (e EventType) String() string {
@@ -21,6 +22,8 @@ func (e EventType) String() string {
return "accept"
case EventClose:
return "close"
case EventFdInstall:
return "fdinstall"
default:
return "unknown"
}
@@ -38,6 +41,7 @@ type TcpV4 struct {
SPort uint16 // Local TCP port
DPort uint16 // Remote TCP port
NetNS uint32 // Network namespace ID (as in /proc/$pid/ns/net)
Fd uint32 // File descriptor for fd_install events
}
// TcpV6 represents a TCP event (connect, accept or close) on IPv6
@@ -52,4 +56,5 @@ type TcpV6 struct {
SPort uint16 // Local TCP port
DPort uint16 // Remote TCP port
NetNS uint32 // Network namespace ID (as in /proc/$pid/ns/net)
Fd uint32 // File descriptor for fd_install events
}

File diff suppressed because one or more lines are too long

View File

@@ -5,6 +5,7 @@ package tracer
import (
"bytes"
"fmt"
"unsafe"
bpflib "github.com/iovisor/gobpf/elf"
)
@@ -111,6 +112,19 @@ func NewTracer(tcpEventCbV4 func(TcpV4), tcpEventCbV6 func(TcpV6), lostCb func(l
}, nil
}
func (t *Tracer) AddFdInstallWatcher(pid uint32) (err error) {
var one uint32 = 1
mapFdInstall := t.m.Map("fdinstall_pids")
err = t.m.UpdateElement(mapFdInstall, unsafe.Pointer(&pid), unsafe.Pointer(&one), 0)
return err
}
func (t *Tracer) RemoveFdInstallWatcher(pid uint32) (err error) {
mapFdInstall := t.m.Map("fdinstall_pids")
err = t.m.DeleteElement(mapFdInstall, unsafe.Pointer(&pid))
return err
}
func (t *Tracer) Stop() {
close(t.stopChan)
t.perfMapIPV4.PollStop()

View File

@@ -15,6 +15,12 @@ func TracerAsset() ([]byte, error) {
func NewTracer(tcpEventCbV4 func(TcpV4), tcpEventCbV6 func(TcpV6), lostCb func(lost uint64)) (*Tracer, error) {
return nil, fmt.Errorf("not supported on non-Linux systems")
}
func (t *Tracer) AddFdInstallWatcher(pid uint32) (err error) {
return fmt.Errorf("not supported on non-Linux systems")
}
func (t *Tracer) RemoveFdInstallWatcher(pid uint32) (err error) {
return fmt.Errorf("not supported on non-Linux systems")
}
func (t *Tracer) Stop() {
}

View File

@@ -78,6 +78,26 @@ struct bpf_map_def SEC("maps/tuplepid_ipv6") tuplepid_ipv6 = {
.max_entries = 1024,
};
/* This is a key/value store with the keys being a pid
* and the values being a fd unsigned int.
*/
struct bpf_map_def SEC("maps/fdinstall_ret") fdinstall_ret = {
.type = BPF_MAP_TYPE_HASH,
.key_size = sizeof(__u64),
.value_size = sizeof(unsigned int),
.max_entries = 1024,
};
/* This is a key/value store with the keys being a pid (tgid)
* and the values being a boolean.
*/
struct bpf_map_def SEC("maps/fdinstall_pids") fdinstall_pids = {
.type = BPF_MAP_TYPE_HASH,
.key_size = sizeof(__u32),
.value_size = sizeof(__u32),
.max_entries = 1024,
};
/* http://stackoverflow.com/questions/1001307/detecting-endianness-programmatically-in-a-c-program */
__attribute__((always_inline))
static bool is_big_endian(void)
@@ -821,6 +841,48 @@ int kretprobe__inet_csk_accept(struct pt_regs *ctx)
return 0;
}
SEC("kprobe/fd_install")
int kprobe__fd_install(struct pt_regs *ctx)
{
u64 pid = bpf_get_current_pid_tgid();
u32 tgid = pid >> 32;
unsigned long fd = (unsigned long) PT_REGS_PARM1(ctx);
u32 *exists = NULL;
exists = bpf_map_lookup_elem(&fdinstall_pids, &tgid);
if (exists == NULL || !*exists)
return 0;
bpf_map_update_elem(&fdinstall_ret, &pid, &fd, BPF_ANY);
return 0;
}
SEC("kretprobe/fd_install")
int kretprobe__fd_install(struct pt_regs *ctx)
{
u64 pid = bpf_get_current_pid_tgid();
unsigned long *fd;
fd = bpf_map_lookup_elem(&fdinstall_ret, &pid);
if (fd == NULL) {
return 0; // missed entry
}
bpf_map_delete_elem(&fdinstall_ret, &pid);
u32 cpu = bpf_get_smp_processor_id();
struct tcp_ipv4_event_t evt = {
.timestamp = bpf_ktime_get_ns(),
.cpu = cpu,
.type = TCP_EVENT_TYPE_FD_INSTALL,
};
evt.pid = pid >> 32;
evt.fd = *(__u32*)fd;
bpf_get_current_comm(&evt.comm, sizeof(evt.comm));
bpf_perf_event_output(ctx, &tcp_event_ipv4, cpu, &evt, sizeof(evt));
return 0;
}
char _license[] SEC("license") = "GPL";
// this number will be interpreted by gobpf-elf-loader to set the current
// running kernel version

View File

@@ -3,9 +3,10 @@
#include <linux/types.h>
#define TCP_EVENT_TYPE_CONNECT 1
#define TCP_EVENT_TYPE_ACCEPT 2
#define TCP_EVENT_TYPE_CLOSE 3
#define TCP_EVENT_TYPE_CONNECT 1
#define TCP_EVENT_TYPE_ACCEPT 2
#define TCP_EVENT_TYPE_CLOSE 3
#define TCP_EVENT_TYPE_FD_INSTALL 4
#define GUESS_SADDR 0
#define GUESS_DADDR 1
@@ -30,6 +31,8 @@ struct tcp_ipv4_event_t {
__u16 sport;
__u16 dport;
__u32 netns;
__u32 fd;
__u32 dummy;
};
struct tcp_ipv6_event_t {
@@ -46,6 +49,8 @@ struct tcp_ipv6_event_t {
__u16 sport;
__u16 dport;
__u32 netns;
__u32 fd;
__u32 dummy;
};
// tcp_set_state doesn't run in the context of the process that initiated the

View File

@@ -1,19 +1,28 @@
package main
import (
"flag"
"fmt"
"os"
"os/signal"
"strconv"
"strings"
"github.com/weaveworks/tcptracer-bpf/pkg/tracer"
)
var watchFdInstallPids string
var lastTimestampV4 uint64
var lastTimestampV6 uint64
func tcpEventCbV4(e tracer.TcpV4) {
fmt.Printf("%v cpu#%d %s %v %s %v:%v %v:%v %v\n",
e.Timestamp, e.CPU, e.Type, e.Pid, e.Comm, e.SAddr, e.SPort, e.DAddr, e.DPort, e.NetNS)
if e.Type == tracer.EventFdInstall {
fmt.Printf("%v cpu#%d %s %v %s %v\n",
e.Timestamp, e.CPU, e.Type, e.Pid, e.Comm, e.Fd)
} else {
fmt.Printf("%v cpu#%d %s %v %s %v:%v %v:%v %v\n",
e.Timestamp, e.CPU, e.Type, e.Pid, e.Comm, e.SAddr, e.SPort, e.DAddr, e.DPort, e.NetNS)
}
if lastTimestampV4 > e.Timestamp {
fmt.Printf("ERROR: late event!\n")
@@ -40,9 +49,15 @@ func lostCb(count uint64) {
os.Exit(1)
}
func init() {
flag.StringVar(&watchFdInstallPids, "monitor-fdinstall-pids", "", "a comma-separated list of pids that need to be monitored for fdinstall events")
flag.Parse()
}
func main() {
if len(os.Args) != 1 {
fmt.Fprintf(os.Stderr, "Usage: %s\n", os.Args[0])
if flag.NArg() > 1 {
flag.Usage()
os.Exit(1)
}
@@ -52,6 +67,20 @@ func main() {
os.Exit(1)
}
for _, p := range strings.Split(watchFdInstallPids, ",") {
if p == "" {
continue
}
pid, err := strconv.ParseUint(p, 10, 32)
if err != nil {
fmt.Fprintf(os.Stderr, "Invalid pid: %v\n", err)
os.Exit(1)
}
fmt.Printf("Monitor fdinstall events for pid %d\n", pid)
t.AddFdInstallWatcher(uint32(pid))
}
fmt.Printf("Ready\n")
sig := make(chan os.Signal, 1)

2
vendor/manifest vendored
View File

@@ -1462,7 +1462,7 @@
"importpath": "github.com/weaveworks/tcptracer-bpf",
"repository": "https://github.com/weaveworks/tcptracer-bpf",
"vcs": "git",
"revision": "a82fffdbfee2ffe2c469279dbfeb3734cf7de1f2",
"revision": "783f088bbe3e91d4d23cf2f48072f80de2fd03fc",
"branch": "master",
"notests": true
},