Files
weave-scope/probe/endpoint/procspy/reader_linux.go
Iago López Galeiras 9920c4ea48 Add eBPF connection tracking without dependencies on kernel headers
Based on work from Lorenzo, updated by Iago, Alban, Alessandro and
Michael.

This PR adds connection tracking using eBPF. This feature is not enabled by default.
For now, you can enable it by launching scope with the following command:

```
sudo ./scope launch --probe.ebpf.connections=true
```

This patch allows scope to get notified of every connection event,
without relying on the parsing of /proc/$pid/net/tcp{,6} and
/proc/$pid/fd/*, and therefore improve performance.

We vendor https://github.com/iovisor/gobpf in Scope to load the
pre-compiled ebpf program and https://github.com/weaveworks/tcptracer-bpf
to guess the offsets of the structures we need in the kernel. In this
way we don't need a different pre-compiled ebpf object file per kernel.
The pre-compiled ebpf program is included in the vendoring of
tcptracer-bpf.

The ebpf program uses kprobes/kretprobes on the following kernel functions:
- tcp_v4_connect
- tcp_v6_connect
- tcp_set_state
- inet_csk_accept
- tcp_close

It generates "connect", "accept" and "close" events containing the
connection tuple but also pid and netns.
Note: the IPv6 events are not supported in Scope and thus not passed on.

probe/endpoint/ebpf.go maintains the list of connections. Similarly to
conntrack, it also keeps the dead connections for one iteration in order
to report short-lived connections.

The code for parsing /proc/$pid/net/tcp{,6} and /proc/$pid/fd/* is still
there and still used at start-up because eBPF only brings us the events
and not the initial state. However, the /proc parsing for the initial
state is now done in foreground instead of background, via
newForegroundReader().

NAT resolution on connections from eBPF works in the same way as it did
on connections from /proc: by using conntrack. One of the two conntrack
instances is only started to get the initial state and then it is
stopped since eBPF detects short-lived connections.

The Scope Docker image size comparison:
- weaveworks/scope in current master:  22 MB (compressed),  68 MB
  (uncompressed)
- weaveworks/scope with this patchset: 23 MB (compressed), 69 MB
  (uncompressed)

Fixes #1168 (walking /proc to obtain connections is very expensive)

Fixes #1260 (Short-lived connections not tracked for containers in
shared networking namespaces)

Fixes #1962 (Port ebpf tracker to Go)

Fixes #1961 (Remove runtime kernel header dependency from ebpf tracker)
2017-03-08 22:11:12 +01:00

193 lines
5.5 KiB
Go

package procspy
import (
"bytes"
"io"
"sync"
"time"
log "github.com/Sirupsen/logrus"
"github.com/weaveworks/scope/probe/process"
)
const (
initialRateLimitPeriod = 50 * time.Millisecond // Read 20 * fdBlockSize file descriptors (/proc/PID/fd/*) per namespace per second
maxRateLimitPeriod = 500 * time.Millisecond // Read at least 2 * fdBlockSize file descriptors per namespace per second
minRateLimitPeriod = initialRateLimitPeriod
fdBlockSize = uint64(300) // Maximum number of /proc/PID/fd/* files to stat per rate-limit period
// (as a rule of thumb going through each block should be more expensive than reading /proc/PID/tcp{,6})
targetWalkTime = 10 * time.Second // Aim at walking all files in 10 seconds
)
type reader interface {
getWalkedProcPid(buf *bytes.Buffer) (map[uint64]*Proc, error)
stop()
}
type backgroundReader struct {
stopc chan struct{}
mtx sync.Mutex
latestBuf *bytes.Buffer
latestSockets map[uint64]*Proc
}
// starts a rate-limited background goroutine to read the expensive files from
// proc.
func newBackgroundReader(walker process.Walker) reader {
br := &backgroundReader{
stopc: make(chan struct{}),
latestSockets: map[uint64]*Proc{},
}
go br.loop(walker)
return br
}
func (br *backgroundReader) stop() {
close(br.stopc)
}
func (br *backgroundReader) getWalkedProcPid(buf *bytes.Buffer) (map[uint64]*Proc, error) {
br.mtx.Lock()
defer br.mtx.Unlock()
// Don't access latestBuf directly but create a reader. In this way,
// the buffer will not be empty in the next call of getWalkedProcPid
// and it can be copied again.
_, err := io.Copy(buf, bytes.NewReader(br.latestBuf.Bytes()))
return br.latestSockets, err
}
func (br *backgroundReader) loop(walker process.Walker) {
var (
begin time.Time // when we started the last performWalk
tickc = time.After(time.Millisecond) // fire immediately
walkc chan walkResult // initially nil, i.e. off
rateLimitPeriod = initialRateLimitPeriod
restInterval time.Duration
ticker = time.NewTicker(rateLimitPeriod)
pWalker = newPidWalker(walker, ticker.C, fdBlockSize)
)
for {
select {
case <-tickc:
tickc = nil // turn off until the next loop
walkc = make(chan walkResult, 1) // turn on (need buffered so we don't leak performWalk)
begin = time.Now() // reset counter
go performWalk(pWalker, walkc) // do work
case result := <-walkc:
// Expose results
br.mtx.Lock()
br.latestBuf = result.buf
br.latestSockets = result.sockets
br.mtx.Unlock()
// Schedule next walk and adjust its rate limit
walkTime := time.Since(begin)
rateLimitPeriod, restInterval = scheduleNextWalk(rateLimitPeriod, walkTime)
ticker.Stop()
ticker = time.NewTicker(rateLimitPeriod)
pWalker.tickc = ticker.C
walkc = nil // turn off until the next loop
tickc = time.After(restInterval) // turn on
case <-br.stopc:
pWalker.stop()
ticker.Stop()
return // abort
}
}
}
type foregroundReader struct {
stopc chan struct{}
latestBuf *bytes.Buffer
latestSockets map[uint64]*Proc
ticker *time.Ticker
}
// reads synchronously files from /proc
func newForegroundReader(walker process.Walker) reader {
fr := &foregroundReader{
stopc: make(chan struct{}),
latestSockets: map[uint64]*Proc{},
}
var (
walkc = make(chan walkResult)
ticker = time.NewTicker(time.Millisecond) // fire every millisecond
pWalker = newPidWalker(walker, ticker.C, fdBlockSize)
)
go performWalk(pWalker, walkc)
result := <-walkc
fr.latestBuf = result.buf
fr.latestSockets = result.sockets
fr.ticker = ticker
return fr
}
func (fr *foregroundReader) stop() {
fr.ticker.Stop()
close(fr.stopc)
}
func (fr *foregroundReader) getWalkedProcPid(buf *bytes.Buffer) (map[uint64]*Proc, error) {
// Don't access latestBuf directly but create a reader. In this way,
// the buffer will not be empty in the next call of getWalkedProcPid
// and it can be copied again.
_, err := io.Copy(buf, bytes.NewReader(fr.latestBuf.Bytes()))
return fr.latestSockets, err
}
type walkResult struct {
buf *bytes.Buffer
sockets map[uint64]*Proc
}
func performWalk(w pidWalker, c chan<- walkResult) {
var (
err error
result = walkResult{
buf: bytes.NewBuffer(make([]byte, 0, 5000)),
}
)
result.sockets, err = w.walk(result.buf)
if err != nil {
log.Errorf("background /proc reader: error walking /proc: %s", err)
result.buf.Reset()
result.sockets = nil
}
c <- result
}
// Adjust rate limit for next walk and calculate when it should be started
func scheduleNextWalk(rateLimitPeriod time.Duration, took time.Duration) (newRateLimitPeriod time.Duration, restInterval time.Duration) {
log.Debugf("background /proc reader: full pass took %s", took)
if float64(took)/float64(targetWalkTime) > 1.5 {
log.Warnf(
"background /proc reader: full pass took %s: 50%% more than expected (%s)",
took,
targetWalkTime,
)
}
// Adjust rate limit to more-accurately meet the target walk time in next iteration
newRateLimitPeriod = time.Duration(float64(targetWalkTime) / float64(took) * float64(rateLimitPeriod))
if newRateLimitPeriod > maxRateLimitPeriod {
newRateLimitPeriod = maxRateLimitPeriod
} else if newRateLimitPeriod < minRateLimitPeriod {
newRateLimitPeriod = minRateLimitPeriod
}
log.Debugf("background /proc reader: new rate limit period %s", newRateLimitPeriod)
return newRateLimitPeriod, targetWalkTime - took
}