Files
weave-scope/probe/process/walker.go
Alban Crequy d715ccc391 ebpf: handle fd_install events from tcptracer-bpf
Since https://github.com/weaveworks/tcptracer-bpf/pull/39, tcptracer-bpf
can generate "fd_install" events when a process installs a new file
descriptor in its fd table. Those events must be requested explicitely
on a per-pid basis with tracer.AddFdInstallWatcher(pid).

This is useful to know about "accept" events that would otherwise be
missed because kretprobes are not triggered for functions that were
called before the installation of the kretprobe.

This patch find all the processes that are currently blocked on an
accept() syscall during the EbpfTracker initialization.
feedInitialConnections() will use tracer.AddFdInstallWatcher() to
subscribe to fd_install  events. When a fd_install event is received,
synthesise an accept event with the connection tuple and the network
namespace (from /proc).
2017-05-19 14:49:38 +02:00

68 lines
1.5 KiB
Go

package process
import "sync"
// Process represents a single process.
type Process struct {
PID, PPID int
Name string
Cmdline string
Threads int
Jiffies uint64
RSSBytes uint64
RSSBytesLimit uint64
OpenFilesCount int
OpenFilesLimit uint64
IsWaitingInAccept bool
}
// Walker is something that walks the /proc directory
type Walker interface {
Walk(func(Process, Process)) error
}
// CachingWalker is a walker than caches a copy of the output from another
// Walker, and then allows other concurrent readers to Walk that copy.
type CachingWalker struct {
cache map[int]Process
previousByPID map[int]Process
cacheLock sync.RWMutex
source Walker
}
// NewCachingWalker returns a new CachingWalker
func NewCachingWalker(source Walker) *CachingWalker {
return &CachingWalker{source: source}
}
// Name of this ticker, for metrics gathering
func (*CachingWalker) Name() string { return "Process" }
// Walk walks a cached copy of process list
func (c *CachingWalker) Walk(f func(Process, Process)) error {
c.cacheLock.RLock()
defer c.cacheLock.RUnlock()
for _, p := range c.cache {
f(p, c.previousByPID[p.PID])
}
return nil
}
// Tick updates cached copy of process list
func (c *CachingWalker) Tick() error {
newCache := map[int]Process{}
err := c.source.Walk(func(p, _ Process) {
newCache[p.PID] = p
})
if err != nil {
return err
}
c.cacheLock.Lock()
defer c.cacheLock.Unlock()
c.previousByPID = c.cache
c.cache = newCache
return nil
}