Prototype to trace incoming and outgoing connections from containers.

This commit is contained in:
Tom Wilkie
2015-05-14 15:42:21 +00:00
committed by Tom Wilkie
parent bc8f86a75f
commit e09774ba5a
16 changed files with 1264 additions and 0 deletions

2
.gitignore vendored
View File

@@ -50,6 +50,8 @@ experimental/genreport/genreport
experimental/graphviz/graphviz
experimental/oneshot/oneshot
experimental/_integration/_integration
experimental/tracer/main/main
experimental/tracer/tracer.tar
*sublime-project
*sublime-workspace
*npm-debug.log

3
experimental/tracer/.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
main/main
tracer.tar
main/static.go

View File

@@ -0,0 +1,14 @@
tracer.tar: main/main main/Dockerfile
docker build -t weaveworks/tracer main/
docker save weaveworks/tracer:latest >$@
main/main: main/*.go main/static.go ptrace/*.go
go get -tags netgo ./$(@D)
go build -ldflags "-extldflags \"-static\" -X main.version $(SCOPE_VERSION)" -tags netgo -o $@ ./$(@D)
main/static.go: ui/*
esc -o main/static.go -prefix ui ui
clean:
go clean ./..
rm -f main/static.go tracer.tar main/main

View File

@@ -0,0 +1,9 @@
Run tracer:
- make
- ./tracer.sh start
TODO:
- need to stich traces together
- deal with persistant connections
- make it work for goroutines
- test with jvm based app

View File

@@ -0,0 +1,6 @@
FROM gliderlabs/alpine
MAINTAINER Weaveworks Inc <help@weave.works>
WORKDIR /home/weave
COPY ./main /home/weave/
EXPOSE 4050
ENTRYPOINT ["/home/weave/main"]

View File

@@ -0,0 +1,79 @@
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"strconv"
"github.com/gorilla/mux"
dockerClient "github.com/fsouza/go-dockerclient"
"github.com/weaveworks/scope/probe/docker"
)
func respondWith(w http.ResponseWriter, code int, response interface{}) {
w.Header().Set("Content-Type", "application/json")
w.Header().Add("Cache-Control", "no-cache")
w.WriteHeader(code)
if err := json.NewEncoder(w).Encode(response); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
log.Printf("Error handling http request: %v", err.Error())
}
}
func (t *tracer) http(port int) {
router := mux.NewRouter()
router.Methods("GET").Path("/containers").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
containers := []*dockerClient.Container{}
t.docker.WalkContainers(func(container docker.Container) {
containers = append(containers, container.Container())
})
respondWith(w, http.StatusOK, containers)
})
router.Methods("GET").Path("/pid").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
respondWith(w, http.StatusOK, t.ptrace.AttachedPIDs())
})
router.Methods("POST").Path("/pid/{pid:\\d+}").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
pid, err := strconv.Atoi(mux.Vars(r)["pid"])
if err != nil {
respondWith(w, http.StatusBadRequest, err.Error())
return
}
t.ptrace.TraceProcess(pid)
w.WriteHeader(204)
})
router.Methods("DELETE").Path("/pid/{pid:\\d+}").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
pid, err := strconv.Atoi(mux.Vars(r)["pid"])
if err != nil {
respondWith(w, http.StatusBadRequest, err.Error())
return
}
t.ptrace.StopTracing(pid)
w.WriteHeader(204)
})
router.Methods("GET").Path("/trace").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
respondWith(w, http.StatusOK, t.store.Traces())
})
router.Methods("GET").PathPrefix("/").Handler(http.FileServer(FS(false))) // everything else is static
log.Printf("Launching HTTP API on port %d", port)
srv := &http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: router,
}
if err := srv.ListenAndServe(); err != nil {
log.Printf("Unable to create http listener: %v", err)
}
}

View File

@@ -0,0 +1,54 @@
package main
import (
"log"
_ "net/http/pprof"
"os"
"os/signal"
"runtime"
"syscall"
"time"
"github.com/weaveworks/scope/experimental/tracer/ptrace"
"github.com/weaveworks/scope/probe/docker"
)
const (
procRoot = "/proc"
pollInterval = 10 * time.Second
)
type tracer struct {
ptrace ptrace.PTracer
store *store
docker docker.Registry
}
func main() {
dockerRegistry, err := docker.NewRegistry(pollInterval)
if err != nil {
log.Fatalf("Could start docker watcher: %v", err)
}
tracer := tracer{
ptrace: ptrace.NewPTracer(),
store: newStore(),
docker: dockerRegistry,
}
go tracer.http(6060)
handleSignals()
}
func handleSignals() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGQUIT)
buf := make([]byte, 1<<20)
for {
sig := <-sigs
switch sig {
case syscall.SIGQUIT:
stacklen := runtime.Stack(buf, true)
log.Printf("=== received SIGQUIT ===\n*** goroutine dump...\n%s\n*** end\n", buf[:stacklen])
}
}
}

View File

@@ -0,0 +1,108 @@
package main
import (
"math/rand"
"sync"
"github.com/msackman/skiplist"
"github.com/weaveworks/scope/experimental/tracer/ptrace"
)
const epsilon = int64(5)
// Traces are indexed by from addr, from port, and start time.
type key struct {
fromAddr uint32
fromPort uint16
startTime int64
}
type trace struct {
pid int
root *ptrace.Fd
children []*trace
}
type store struct {
sync.RWMutex
traces *skiplist.SkipList
}
func newKey(fd *ptrace.Fd) key {
var fromAddr uint32
for _, b := range fd.FromAddr.To4() {
fromAddr <<= 8
fromAddr |= uint32(b)
}
return key{fromAddr, fd.FromPort, fd.Start}
}
func (l key) LessThan(other skiplist.Comparable) bool {
r := other.(key)
return l.fromAddr < r.fromAddr && l.fromPort < r.fromPort && l.startTime < r.startTime
}
func (l key) Equal(other skiplist.Comparable) bool {
r := other.(key)
if l.fromAddr != r.fromAddr || l.fromPort != r.fromPort {
return false
}
diff := l.startTime - r.startTime
return -epsilon < diff && diff < epsilon
}
func newStore() *store {
return &store{traces: skiplist.New(rand.New(rand.NewSource(0)))}
}
func (s *store) RecordConnection(pid int, connection *ptrace.Fd) {
s.Lock()
defer s.Unlock()
newTrace := &trace{pid: pid, root: connection}
newTraceKey := newKey(connection)
// First, see if this new conneciton is a child of an existing connection.
// This indicates we have a parent connection to attach to.
// If not, insert this connection.
if parentNode := s.traces.Get(newTraceKey); parentNode != nil {
parentNode.Remove()
parentTrace := parentNode.Value.(*trace)
parentTrace.children = append(parentTrace.children, newTrace)
} else {
s.traces.Insert(newTraceKey, newTrace)
}
// Next, see if we already know about the child connections
// If not, insert each of our children.
for _, childConnection := range connection.Children {
childTraceKey := newKey(childConnection)
if childNode := s.traces.Get(childTraceKey); childNode != nil {
childNode.Remove()
childTrace := childNode.Value.(*trace)
newTrace.children = append(newTrace.children, childTrace)
} else {
s.traces.Insert(childTraceKey, newTrace)
}
}
}
func (s *store) Traces() []*trace {
s.RLock()
defer s.RUnlock()
var traces []*trace
var cur = s.traces.First()
for {
traces = append(traces, cur.Value.(*trace))
cur = cur.Next()
if cur == nil {
break
}
}
return traces
}

View File

@@ -0,0 +1,165 @@
package ptrace
import (
"bufio"
"encoding/hex"
"fmt"
"net"
"os"
"regexp"
"strconv"
"time"
)
const (
listening = iota
incoming
outgoing
)
const (
socketPattern = `^socket:\[(\d+)\]$`
tcpPattern = `^\s*(?P<fd>\d+): (?P<localaddr>[A-F0-9]{8}):(?P<localport>[A-F0-9]{4}) ` +
`(?P<remoteaddr>[A-F0-9]{8}):(?P<remoteport>[A-F0-9]{4}) (?:[A-F0-9]{2}) (?:[A-F0-9]{8}):(?:[A-F0-9]{8}) ` +
`(?:[A-F0-9]{2}):(?:[A-F0-9]{8}) (?:[A-F0-9]{8}) \s+(?:\d+) \s+(?:\d+) (?P<inode>\d+)`
)
var (
socketRegex = regexp.MustCompile(socketPattern)
tcpRegexp = regexp.MustCompile(tcpPattern)
)
// Fd represents a connect and subsequent connections caused by it.
type Fd struct {
direction int
fd int
Start int64
stop int64
sent int64
received int64
FromAddr net.IP
FromPort uint16
toAddr net.IP
toPort uint16
// Fds are connections, and can have a causal-link to other Fds
Children []*Fd
}
func getLocalAddr(pid, fd int) (addr net.IP, port uint16, err error) {
var (
socket string
match []string
inode int
tcpFile *os.File
scanner *bufio.Scanner
candidate int
port64 int64
)
socket, err = os.Readlink(fmt.Sprintf("/proc/%d/fd/%d", pid, fd))
if err != nil {
return
}
match = socketRegex.FindStringSubmatch(socket)
if match == nil {
err = fmt.Errorf("Fd %d not a socket", fd)
return
}
inode, err = strconv.Atoi(match[1])
if err != nil {
return
}
tcpFile, err = os.Open(fmt.Sprintf("/proc/%d/net/tcp", pid))
if err != nil {
return
}
defer tcpFile.Close()
scanner = bufio.NewScanner(tcpFile)
for scanner.Scan() {
match = tcpRegexp.FindStringSubmatch(scanner.Text())
if match == nil {
continue
}
candidate, err = strconv.Atoi(match[6])
if err != nil {
return
}
if candidate != inode {
continue
}
addr = make([]byte, 4)
if _, err = hex.Decode(addr, []byte(match[2])); err != nil {
return
}
addr[0], addr[1], addr[2], addr[3] = addr[3], addr[2], addr[1], addr[0]
// use a 32 bit int for target, at the result is a uint16
port64, err = strconv.ParseInt(match[3], 16, 32)
if err != nil {
return
}
port = uint16(port64)
return
}
if err = scanner.Err(); err != nil {
return
}
err = fmt.Errorf("Fd %d not found for proc %d", fd, pid)
return
}
// We want to get the listening address from /proc
func newListeningFd(pid, fd int) (*Fd, error) {
localAddr, localPort, err := getLocalAddr(pid, fd)
if err != nil {
return nil, err
}
return &Fd{
direction: listening, fd: fd, Start: time.Now().Unix(),
toAddr: localAddr, toPort: uint16(localPort),
}, nil
}
// We intercepted a connect syscall
func newConnectionFd(pid, fd int, remoteAddr net.IP, remotePort uint16) (*Fd, error) {
localAddr, localPort, err := getLocalAddr(pid, fd)
if err != nil {
return nil, err
}
return &Fd{
direction: outgoing, fd: fd, Start: time.Now().Unix(),
FromAddr: localAddr, FromPort: uint16(localPort),
toAddr: remoteAddr, toPort: remotePort,
}, nil
}
// We got a new connection on a listening socket
func (fd *Fd) newConnection(addr net.IP, port uint16, newFd int) (*Fd, error) {
if fd.direction != listening {
return nil, fmt.Errorf("New connection on non-listening fd!")
}
return &Fd{
direction: incoming, fd: newFd, Start: time.Now().Unix(),
toAddr: fd.toAddr, toPort: fd.toPort,
FromAddr: addr, FromPort: port,
}, nil
}
func (fd *Fd) close() {
fd.stop = time.Now().Unix()
}

View File

@@ -0,0 +1,99 @@
package ptrace
import (
"fmt"
"io/ioutil"
"log"
"strconv"
"sync"
)
type process struct {
sync.Mutex
pid int
detaching bool
detached chan struct{}
tracer *PTracer
threads map[int]*thread
fds map[int]*Fd
}
func newProcess(pid int, tracer *PTracer) *process {
return &process{
pid: pid,
tracer: tracer,
threads: make(map[int]*thread),
fds: make(map[int]*Fd),
detached: make(chan struct{}),
}
}
func (p *process) trace() {
go p.loop()
}
// This doesn't actually guarantees we follow all the threads. Oops.
func (p *process) loop() {
var (
attached int
)
log.Printf("Tracing process %d", p.pid)
for {
ps, err := ioutil.ReadDir(fmt.Sprintf("/proc/%d/task", p.pid))
if err != nil {
log.Printf("ReadDir failed, pid=%d, err=%v", p.pid, err)
return
}
attached = 0
for _, file := range ps {
pid, err := strconv.Atoi(file.Name())
if err != nil {
log.Printf("'%s' is not a pid: %v", file.Name(), err)
attached++
continue
}
p.Lock()
t, ok := p.threads[pid]
if !ok {
t = p.tracer.traceThread(pid, p)
p.threads[pid] = t
}
p.Unlock()
if !t.attached {
continue
}
attached++
}
// When we successfully attach to all threads
// we can be sure to catch new clones, so we
// can quit.
if attached == len(ps) {
break
}
}
log.Printf("Successfully attached to %d threads", attached)
}
func (p *process) newThread(thread *thread) {
p.Lock()
defer p.Unlock()
p.threads[thread.tid] = thread
}
func (p *process) newFd(fd *Fd) error {
_, ok := p.fds[fd.fd]
if ok {
return fmt.Errorf("New fd %d, alread exists!", fd.fd)
}
p.fds[fd.fd] = fd
return nil
}

View File

@@ -0,0 +1,314 @@
package ptrace
import (
"bufio"
"fmt"
"log"
"os"
"runtime"
"strconv"
"strings"
"syscall"
)
const (
ptraceOptions = syscall.PTRACE_O_TRACESYSGOOD | syscall.PTRACE_O_TRACECLONE
ptraceTracesysgoodBit = 0x80
)
// PTracer ptrace processed and threads
type PTracer struct {
// All ptrace calls must come from the
// same thread. So we wait on a separate
// thread.
ops chan func()
stopped chan stopped
quit chan struct{}
childAttached chan struct{} //used to signal the wait loop
threads map[int]*thread
processes map[int]*process
}
type stopped struct {
pid int
status syscall.WaitStatus
}
// NewPTracer creates a new ptracer.
func NewPTracer() PTracer {
t := PTracer{
ops: make(chan func()),
stopped: make(chan stopped),
quit: make(chan struct{}),
childAttached: make(chan struct{}),
threads: make(map[int]*thread),
processes: make(map[int]*process),
}
go t.waitLoop()
go t.loop()
return t
}
// TraceProcess starts tracing the given pid
func (t *PTracer) TraceProcess(pid int) *process {
result := make(chan *process)
t.ops <- func() {
process := newProcess(pid, t)
t.processes[pid] = process
process.trace()
result <- process
}
return <-result
}
// StopTracing stops tracing all threads for the given pid
func (t *PTracer) StopTracing(pid int) error {
log.Printf("Detaching from %d", pid)
errors := make(chan error)
processes := make(chan *process)
t.ops <- func() {
// send sigstop to all threads
process, ok := t.processes[pid]
if !ok {
errors <- fmt.Errorf("PID %d not found", pid)
return
}
// This flag tells the thread to detach when it next stops
process.detaching = true
// Now send sigstop to all threads.
for _, thread := range process.threads {
log.Printf("sending SIGSTOP to %d", thread.tid)
if err := syscall.Tgkill(pid, thread.tid, syscall.SIGSTOP); err != nil {
errors <- err
return
}
}
processes <- process
}
select {
case err := <-errors:
return err
case process := <-processes:
<-process.detached
return nil
}
}
// AttachedPIDs list the currently attached processes.
func (t *PTracer) AttachedPIDs() []int {
result := make(chan []int)
t.ops <- func() {
var pids []int
for pid := range t.processes {
pids = append(pids, pid)
}
result <- pids
}
return <-result
}
func (t *PTracer) traceThread(pid int, process *process) *thread {
result := make(chan *thread)
t.ops <- func() {
thread := newThread(pid, process, t)
err := syscall.PtraceAttach(pid)
if err != nil {
log.Printf("Attach %d failed: %v", pid, err)
return
}
var status syscall.WaitStatus
if _, err = syscall.Wait4(pid, &status, 0, nil); err != nil {
log.Printf("Wait %d failed: %v", pid, err)
return
}
thread.attached = true
err = syscall.PtraceSetOptions(pid, ptraceOptions)
if err != nil {
log.Printf("SetOptions failed, pid=%d, err=%v", pid, err)
return
}
err = syscall.PtraceSyscall(pid, 0)
if err != nil {
log.Printf("PtraceSyscall failed, pid=%d, err=%v", pid, err)
return
}
t.threads[pid] = thread
result <- thread
t.childAttached <- struct{}{}
}
return <-result
}
func (t *PTracer) waitLoop() {
var (
status syscall.WaitStatus
pid int
err error
)
for {
pid, err = syscall.Wait4(-1, &status, syscall.WALL, nil)
if err != nil && err.(syscall.Errno) == syscall.ECHILD {
log.Printf("No children to wait4")
<-t.childAttached
continue
}
if err != nil {
log.Printf("Wait failed: %v %d", err, err.(syscall.Errno))
return
}
log.Printf("PID %d stopped", pid)
t.stopped <- stopped{pid, status}
}
}
func (t *PTracer) loop() {
runtime.LockOSThread()
for {
select {
case op := <-t.ops:
op()
case stopped := <-t.stopped:
t.handleStopped(stopped.pid, stopped.status)
case <-t.quit:
return
}
}
}
func (t *PTracer) handleStopped(pid int, status syscall.WaitStatus) {
signal := syscall.Signal(0)
target, err := t.thread(pid)
if err != nil {
log.Printf("thread failed: %v", err)
return
}
if status.Stopped() && status.StopSignal() == syscall.SIGTRAP|ptraceTracesysgoodBit {
// pid entered Syscall-enter-stop or syscall-exit-stop
target.syscallStopped()
} else if status.Stopped() && status.StopSignal() == syscall.SIGTRAP {
// pid entered PTRACE_EVENT stop
switch status.TrapCause() {
case syscall.PTRACE_EVENT_CLONE:
err := target.handleClone(pid)
if err != nil {
log.Printf("clone failed: %v", err)
return
}
default:
log.Printf("Unknown PTRACE_EVENT %d for pid %d", status.TrapCause(), pid)
}
} else if status.Exited() || status.Signaled() {
// "tracer can safely assume pid will exit"
t.threadExited(target)
return
} else if status.Stopped() {
// tracee recieved a non-trace related signal
signal = status.StopSignal()
if signal == syscall.SIGSTOP && target.process.detaching {
t.detachThread(target)
return
}
} else {
// unknown stop - shouldn't happen!
log.Printf("Pid %d random stop with status %x", pid, status)
}
// Restart stopped caller in syscall trap mode.
err = syscall.PtraceSyscall(pid, int(signal))
if err != nil {
log.Printf("PtraceSyscall failed, pid=%d, err=%v", pid, err)
}
}
func (t *PTracer) detachThread(thread *thread) {
syscall.PtraceDetach(thread.tid)
process := thread.process
delete(process.threads, thread.tid)
delete(t.threads, thread.tid)
if len(process.threads) == 0 {
delete(t.processes, process.pid)
close(process.detached)
log.Printf("Process %d detached", process.pid)
}
}
func pidForTid(tid int) (pid int, err error) {
var (
status *os.File
scanner *bufio.Scanner
splits []string
)
status, err = os.Open(fmt.Sprintf("/proc/%d/status", tid))
if err != nil {
return
}
defer status.Close()
scanner = bufio.NewScanner(status)
for scanner.Scan() {
splits = strings.Split(scanner.Text(), ":")
if splits[0] != "Tgid" {
continue
}
pid, err = strconv.Atoi(strings.TrimSpace(splits[1]))
return
}
if err = scanner.Err(); err != nil {
return
}
err = fmt.Errorf("Pid not found for proc %d", tid)
return
}
func (t *PTracer) thread(tid int) (*thread, error) {
thread, ok := t.threads[tid]
if !ok {
pid, err := pidForTid(tid)
if err != nil {
return nil, err
}
proc, ok := t.processes[pid]
if !ok {
return nil, fmt.Errorf("Got new thread %d for unknown process", tid)
}
thread = newThread(tid, proc, t)
t.threads[tid] = thread
log.Printf("New thread reported, tid=%d, pid=%d", tid, pid)
}
return thread, nil
}
func (t *PTracer) threadExited(thread *thread) {
thread.handleExit()
delete(t.threads, thread.tid)
if thread.process != nil {
delete(thread.process.threads, thread.tid)
}
}

View File

@@ -0,0 +1,253 @@
package ptrace
import (
"fmt"
"log"
"net"
"syscall"
"unsafe"
)
// Syscall numbers
const (
READ = 0
WRITE = 1
CLOSE = 3
MMAP = 9
MPROTECT = 10
MADVISE = 28
SOCKET = 41
CONNECT = 42
ACCEPT = 43
SENDTO = 44
RECVFROM = 45
CLONE = 56
GETID = 186
SETROBUSTLIST = 273
ACCEPT4 = 288
)
// States for a given thread
const (
NORMAL = iota
INSYSCALL
)
type thread struct {
tid int
attached bool
process *process // might be nil!
tracer *PTracer
state int
callRegs syscall.PtraceRegs
resultRegs syscall.PtraceRegs
currentConnection *Fd
}
func newThread(pid int, process *process, tracer *PTracer) *thread {
t := &thread{
tid: pid,
process: process,
tracer: tracer,
}
return t
}
// trace thread calls this
func (t *thread) syscallStopped() {
var err error
if t.state == NORMAL {
if err = syscall.PtraceGetRegs(t.tid, &t.callRegs); err != nil {
t.logf("GetRegs failed, pid=%d, err=%v", t.tid, err)
}
t.state = INSYSCALL
return
}
t.state = NORMAL
if err = syscall.PtraceGetRegs(t.tid, &t.resultRegs); err != nil {
t.logf("GetRegs failed, pid=%d, err=%v", t.tid, err)
return
}
if t.process == nil {
t.logf("Got syscall, but don't know parent process!")
return
}
switch t.callRegs.Orig_rax {
case ACCEPT, ACCEPT4:
t.handleAccept(&t.callRegs, &t.resultRegs)
case CLOSE:
t.handleClose(&t.callRegs, &t.resultRegs)
case CONNECT:
t.handleConnect(&t.callRegs, &t.resultRegs)
case READ, WRITE, RECVFROM, SENDTO:
t.handleIO(&t.callRegs, &t.resultRegs)
// we can ignore these syscalls
case SETROBUSTLIST, GETID, MMAP, MPROTECT, MADVISE, SOCKET, CLONE:
return
default:
t.logf("syscall(%d)", t.callRegs.Orig_rax)
}
}
func (t *thread) getSocketAddress(ptr uintptr) (addr net.IP, port uint16, err error) {
var (
buf = make([]byte, syscall.SizeofSockaddrAny)
read int
)
if ptr == 0 {
err = fmt.Errorf("Null ptr")
return
}
read, err = syscall.PtracePeekData(t.tid, ptr, buf)
if read != syscall.SizeofSockaddrAny || err != nil {
return
}
var sockaddr4 = (*syscall.RawSockaddrInet4)(unsafe.Pointer(&buf[0]))
if sockaddr4.Family != syscall.AF_INET {
return
}
addr = net.IP(sockaddr4.Addr[0:])
port = sockaddr4.Port
return
}
func (t *thread) handleAccept(call, result *syscall.PtraceRegs) {
var (
err error
ok bool
listeningFdNum int
connectionFdNum int
addrPtr uintptr
addr net.IP
port uint16
listeningFd *Fd
connection *Fd
)
listeningFdNum = int(result.Rdi)
connectionFdNum = int(result.Rax)
addrPtr = uintptr(result.Rsi)
addr, port, err = t.getSocketAddress(addrPtr)
if err != nil {
t.logf("failed to read sockaddr: %v", err)
return
}
listeningFd, ok = t.process.fds[listeningFdNum]
if !ok {
listeningFd, err = newListeningFd(t.process.pid, listeningFdNum)
if err != nil {
t.logf("Failed to read listening port: %v", err)
return
}
t.process.fds[listeningFdNum] = listeningFd
}
connection, err = listeningFd.newConnection(addr, port, connectionFdNum)
if err != nil {
t.logf("Failed to create connection fd: %v", err)
return
}
t.process.newFd(connection)
t.logf("Accepted connection from %s:%d -> %s:%d on fd %d, new fd %d",
addr, port, connection.toAddr, connection.toPort, listeningFdNum, connectionFdNum)
}
func (t *thread) handleConnect(call, result *syscall.PtraceRegs) {
fd := int(result.Rdi)
ptr := result.Rsi
addr, port, err := t.getSocketAddress(uintptr(ptr))
if err != nil {
t.logf("failed to read sockaddr: %v", err)
return
}
connection, err := newConnectionFd(t.process.pid, fd, addr, port)
if err != nil {
t.logf("Failed to create connection fd: %v", err)
return
}
t.process.newFd(connection)
if t.currentConnection != nil {
t.currentConnection.Children = append(t.currentConnection.Children, connection)
}
t.logf("Made connection from %s:%d -> %s:%d on fd %d",
connection.toAddr, connection.toPort, connection.FromAddr,
connection.FromPort, fd)
}
func (t *thread) handleClose(call, result *syscall.PtraceRegs) {
fdNum := int(call.Rdi)
fd, ok := t.process.fds[fdNum]
if !ok {
t.logf("Got close unknown fd %d", fdNum)
return
}
fd.close()
delete(t.process.fds, fdNum)
// clear current connection if we just closed it!
if t.currentConnection != nil && t.currentConnection.fd == fdNum {
t.currentConnection = nil
}
// if this connection was incoming, add it to 'the registry'
//if fd.direction == incoming {
// t.tracer.store.RecordConnection(t.process.pid, fd)
//}
}
func (t *thread) handleIO(call, result *syscall.PtraceRegs) {
fdNum := int(call.Rdi)
fd, ok := t.process.fds[fdNum]
if !ok {
t.logf("IO on unknown fd %d", fdNum)
return
}
if fd.direction == incoming {
t.logf("IO on incoming connection %d; setting affinity", fdNum)
t.currentConnection = fd
}
}
func (t *thread) handleClone(pid int) error {
// We can't use the pid in the process, as it may be in a namespace
newPid, err := syscall.PtraceGetEventMsg(pid)
if err != nil {
log.Printf("PtraceGetEventMsg failed: %v", err)
return err
}
t.logf("New thread clone'd, pid=%d", newPid)
return nil
}
func (t *thread) handleExit() {
t.logf("Exiting")
}
func (t *thread) logf(fmt string, args ...interface{}) {
log.Printf("[thread %d] "+fmt, append([]interface{}{t.tid}, args...)...)
}

50
experimental/tracer/tracer Executable file
View File

@@ -0,0 +1,50 @@
#!/bin/bash
set -eu
usage() {
echo "$0"
}
PORT=6060
CONTAINER_NAME=weavetracer
[ $# -gt 0 ] || usage
COMMAND=$1
shift 1
case "$COMMAND" in
launch)
docker run --privileged --net=host --pid=host -d -v /var/run/docker.sock:/var/run/docker.sock \
--name $CONTAINER_NAME weaveworks/tracer
for ip in $(hostname -I); do
echo http://$ip:6060
done
;;
stop)
docker stop $CONTAINER_NAME || true
docker rm $CONTAINER_NAME >/dev/null || true
;;
attach)
PID=$1
if [ -z "${PID##*[!0-9]*}" ]; then
PID=$(pgrep $PID)
fi
curl -X POST http://localhost:$PORT/pid/$PID
;;
detach)
PID=$1
if [ -z "${PID##*[!0-9]*}" ]; then
PID=$(pgrep $PID)
fi
curl -X DELETE http://localhost:$PORT/pid/$PID
;;
traces)
curl http://localhost:$PORT/trace
;;
esac

View File

@@ -0,0 +1,98 @@
<!DOCTYPE html>
<html lang="en">
<head>
<title>Weave Tracer</title>
<!-- Latest compiled and minified CSS -->
<link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap.min.css">
<!-- Optional theme -->
<link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap-theme.min.css">
<!-- Latest compiled and minified JavaScript -->
<script src="http://code.jquery.com/jquery-2.1.4.min.js"></script>
<script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/js/bootstrap.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/handlebars.js/3.0.3/handlebars.min.js"></script>
<script src="sprintf.min.js"></script>
<meta name="viewport" content="width=device-width, initial-scale=1">
<script>
$(function () {
var containers = {};
var traces = {}
$.get("/containers").done(function (data) {
$.each(data, function(i, container) {
containers[container.Id] = container
$("ol.containers").append(sprintf("<li id='%s'>%s</li>", container.Id, container.Name))
});
});
$("ol.containers").on("click", "li", function() {
var container = containers[$(this).attr("id")]
var containerTraces = traces[$(this).attr("id")] || []
var template = $('script#process-template').text();
template = Handlebars.compile(template);
var rendered = template({container: container, traces: containerTraces});
$('div.mainview').html(rendered);
})
$("div.mainview").on("click", "button.start", function() {
var id = $(this).parent().data("containerId")
var container = containers[id]
$.post(sprintf("/pid/%d", container.State.Pid))
})
$("div.mainview").on("click", "button.stop", function() {
var id = $(this).parent().data("containerId")
var container = containers[id]
$.ajax({
url: sprintf("/pid/%d", container.State.Pid),
type: 'DELETE',
});
})
})
</script>
<style>
ol.containers li {
cursor: pointer;
}
</style>
</head>
<body>
<script type="text/x-handlebars-template" id="process-template">
<h2>{{container.Name}}</h2>
<div class="btn-group" role="group" data-container-id="{{container.Id}}">
<button type="button" class="btn btn-default start">
<span class="glyphicon glyphicon-play" aria-hidden="true"></span> Start</button>
<button type="button" class="btn btn-default stop">
<span class="glyphicon glyphicon-stop" aria-hidden="true"></span> Stop</button>
<button type="button" class="btn btn-default fetch">
<span class="glyphicon glyphicon-cloud-download" aria-hidden="true"></span> Get Traces</button>
</div>
<table>
<thead><tr><th>Start time</th><th>Duration</th><th>Sub-traces</th></tr></thead>
<tbody>
{{#traces}}
<tr><td>{{.}}</td></tr>
{{/traces}}
</tbody>
</table>
</script>
<div class="container-fluid">
<div class="col-md-4">
<h1>Weave Tracer</h1>
<ol class="containers">
</ol>
</div>
<div class="col-md-8 mainview">
</div>
</div>
</body>
</html>

4
experimental/tracer/ui/sprintf.min.js vendored Normal file
View File

@@ -0,0 +1,4 @@
/*! sprintf-js | Alexandru Marasteanu <hello@alexei.ro> (http://alexei.ro/) | BSD-3-Clause */
!function(a){function b(){var a=arguments[0],c=b.cache;return c[a]&&c.hasOwnProperty(a)||(c[a]=b.parse(a)),b.format.call(null,c[a],arguments)}function c(a){return Object.prototype.toString.call(a).slice(8,-1).toLowerCase()}function d(a,b){return Array(b+1).join(a)}var e={not_string:/[^s]/,number:/[dief]/,json:/[j]/,not_json:/[^j]/,text:/^[^\x25]+/,modulo:/^\x25{2}/,placeholder:/^\x25(?:([1-9]\d*)\$|\(([^\)]+)\))?(\+)?(0|'[^$])?(-)?(\d+)?(?:\.(\d+))?([b-fijosuxX])/,key:/^([a-z_][a-z_\d]*)/i,key_access:/^\.([a-z_][a-z_\d]*)/i,index_access:/^\[(\d+)\]/,sign:/^[\+\-]/};b.format=function(a,f){var g,h,i,j,k,l,m,n=1,o=a.length,p="",q=[],r=!0,s="";for(h=0;o>h;h++)if(p=c(a[h]),"string"===p)q[q.length]=a[h];else if("array"===p){if(j=a[h],j[2])for(g=f[n],i=0;i<j[2].length;i++){if(!g.hasOwnProperty(j[2][i]))throw new Error(b("[sprintf] property '%s' does not exist",j[2][i]));g=g[j[2][i]]}else g=j[1]?f[j[1]]:f[n++];if("function"==c(g)&&(g=g()),e.not_string.test(j[8])&&e.not_json.test(j[8])&&"number"!=c(g)&&isNaN(g))throw new TypeError(b("[sprintf] expecting number but found %s",c(g)));switch(e.number.test(j[8])&&(r=g>=0),j[8]){case"b":g=g.toString(2);break;case"c":g=String.fromCharCode(g);break;case"d":case"i":g=parseInt(g,10);break;case"j":g=JSON.stringify(g,null,j[6]?parseInt(j[6]):0);break;case"e":g=j[7]?g.toExponential(j[7]):g.toExponential();break;case"f":g=j[7]?parseFloat(g).toFixed(j[7]):parseFloat(g);break;case"o":g=g.toString(8);break;case"s":g=(g=String(g))&&j[7]?g.substring(0,j[7]):g;break;case"u":g>>>=0;break;case"x":g=g.toString(16);break;case"X":g=g.toString(16).toUpperCase()}e.json.test(j[8])?q[q.length]=g:(!e.number.test(j[8])||r&&!j[3]?s="":(s=r?"+":"-",g=g.toString().replace(e.sign,"")),l=j[4]?"0"===j[4]?"0":j[4].charAt(1):" ",m=j[6]-(s+g).length,k=j[6]&&m>0?d(l,m):"",q[q.length]=j[5]?s+g+k:"0"===l?s+k+g:k+s+g)}return q.join("")},b.cache={},b.parse=function(a){for(var b=a,c=[],d=[],f=0;b;){if(null!==(c=e.text.exec(b)))d[d.length]=c[0];else if(null!==(c=e.modulo.exec(b)))d[d.length]="%";else{if(null===(c=e.placeholder.exec(b)))throw new SyntaxError("[sprintf] unexpected placeholder");if(c[2]){f|=1;var g=[],h=c[2],i=[];if(null===(i=e.key.exec(h)))throw new SyntaxError("[sprintf] failed to parse named argument key");for(g[g.length]=i[1];""!==(h=h.substring(i[0].length));)if(null!==(i=e.key_access.exec(h)))g[g.length]=i[1];else{if(null===(i=e.index_access.exec(h)))throw new SyntaxError("[sprintf] failed to parse named argument key");g[g.length]=i[1]}c[2]=g}else f|=2;if(3===f)throw new Error("[sprintf] mixing positional and named placeholders is not (yet) supported");d[d.length]=c}b=b.substring(c[0].length)}return d};var f=function(a,c,d){return d=(c||[]).slice(0),d.splice(0,0,a),b.apply(null,d)};"undefined"!=typeof exports?(exports.sprintf=b,exports.vsprintf=f):(a.sprintf=b,a.vsprintf=f,"function"==typeof define&&define.amd&&define(function(){return{sprintf:b,vsprintf:f}}))}("undefined"==typeof window?this:window);
//# sourceMappingURL=sprintf.min.js.map

View File

@@ -87,6 +87,8 @@ type Container interface {
State() string
HasTTY() bool
GetNode() report.Node
Container() *docker.Container
StartGatheringStats() error
StopGatheringStats()
}
@@ -146,6 +148,10 @@ func (c *container) State() string {
return StateStopped
}
func (c *container) Container() *docker.Container {
return c.container
}
func (c *container) StartGatheringStats() error {
c.Lock()
defer c.Unlock()