From e58f0f756fc6aa89ae865dcfb33718391658ab08 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Fri, 18 Sep 2015 09:32:54 +0000 Subject: [PATCH] Make temporal join work - now can have traces with more than one child! --- experimental/tracer/main/store.go | 96 +++++++++++++++++----------- experimental/tracer/ptrace/thread.go | 16 +++-- experimental/tracer/ui/index.html | 20 ++++-- 3 files changed, 83 insertions(+), 49 deletions(-) diff --git a/experimental/tracer/main/store.go b/experimental/tracer/main/store.go index dd1637037..ad8c137db 100644 --- a/experimental/tracer/main/store.go +++ b/experimental/tracer/main/store.go @@ -1,11 +1,10 @@ package main import ( - "math/rand" - "sync" - "log" "fmt" + "math/rand" "sort" + "sync" "github.com/msackman/skiplist" @@ -26,12 +25,12 @@ func (k key) MarshalJSON() ([]byte, error) { } type trace struct { - PID int - Key key + PID int + Key key ServerDetails *ptrace.ConnectionDetails ClientDetails *ptrace.ConnectionDetails - Children []*trace - Level int + Children []*trace + Level int } type byKey []*trace @@ -57,7 +56,20 @@ func newKey(fd *ptrace.Fd) key { func (l key) LessThan(other skiplist.Comparable) bool { r := other.(key) - return l.fromAddr < r.fromAddr && l.fromPort < r.fromPort && l.startTime < r.startTime + + if l.fromAddr != r.fromAddr { + return l.fromAddr > r.fromAddr + } + + if l.fromPort != r.fromPort { + return l.fromPort < r.fromPort + } + + if l.Equal(other) { + return false + } + + return l.startTime < r.startTime } func (l key) Equal(other skiplist.Comparable) bool { @@ -74,50 +86,61 @@ func newStore() *store { return &store{traces: skiplist.New(rand.New(rand.NewSource(0)))} } +func (t *trace) addChild(child *trace) { + // find the child we're supposed to be replacing + for i, candidate := range t.Children { + if !candidate.Key.Equal(skiplist.Comparable(child.Key)) { + continue + } + + // Fix up some fields + child.ClientDetails = candidate.ClientDetails + child.PID = candidate.PID + IncrementLevel(child, t.Level+1) + + // Overwrite old record + t.Children[i] = child + return + } +} + func (s *store) RecordConnection(pid int, connection *ptrace.Fd) { s.Lock() defer s.Unlock() newTrace := &trace{ - PID: pid, - Key: newKey(connection), + PID: pid, + Key: newKey(connection), ServerDetails: &connection.ConnectionDetails, } for _, child := range connection.Children { newTrace.Children = append(newTrace.Children, &trace{ - Level: 1, + Level: 1, + Key: newKey(child), ClientDetails: &child.ConnectionDetails, }) } - newTraceKey := newTrace.Key - // First, see if this new conneciton is a child of an existing connection. // This indicates we have a parent connection to attach to. // If not, insert this connection. - if parentNode := s.traces.Get(newTraceKey); parentNode != nil { - parentNode.Remove() + if parentNode := s.traces.Get(newTrace.Key); parentNode != nil { parentTrace := parentNode.Value.(*trace) - log.Printf(" Found parent trace: %+v", parentTrace) - newTrace.Level = parentTrace.Level + 1 - parentTrace.Children = append(parentTrace.Children, newTrace) + parentTrace.addChild(newTrace) + parentNode.Remove() } else { - s.traces.Insert(newTraceKey, newTrace) + s.traces.Insert(newTrace.Key, newTrace) } // Next, see if we already know about the child connections // If not, insert each of our children. - for _, childConnection := range connection.Children { - childTraceKey := newKey(childConnection) - - if childNode := s.traces.Get(childTraceKey); childNode != nil { - childNode.Remove() + for _, child := range newTrace.Children { + if childNode := s.traces.Get(child.Key); childNode != nil { childTrace := childNode.Value.(*trace) - log.Printf(" Found child trace: %+v", childTrace) - IncrementLevel(childTrace, newTrace.Level) - newTrace.Children = append(newTrace.Children, childTrace) + newTrace.addChild(childTrace) + childNode.Remove() } else { - s.traces.Insert(childTraceKey, newTrace) + s.traces.Insert(child.Key, newTrace) } } } @@ -133,20 +156,17 @@ func (s *store) Traces() []*trace { s.RLock() defer s.RUnlock() - traces := map[key]*trace{} + traces := []*trace{} var cur = s.traces.First() for cur != nil { + key := cur.Key.(key) trace := cur.Value.(*trace) - traces[trace.Key] = trace + if trace.Key == key { + traces = append(traces, trace) + } cur = cur.Next() } - result := []*trace{} - for _, trace := range traces { - result = append(result, trace) - } - - sort.Sort(byKey(result)) - - return result + sort.Sort(byKey(traces)) + return traces } diff --git a/experimental/tracer/ptrace/thread.go b/experimental/tracer/ptrace/thread.go index ca67de89c..530c94928 100644 --- a/experimental/tracer/ptrace/thread.go +++ b/experimental/tracer/ptrace/thread.go @@ -17,6 +17,7 @@ const ( STAT = 4 MMAP = 9 MPROTECT = 10 + MUNMAP = 11 SELECT = 23 MADVISE = 28 SOCKET = 41 @@ -24,8 +25,11 @@ const ( ACCEPT = 43 SENDTO = 44 RECVFROM = 45 + SHUTDOWN = 48 CLONE = 56 + GETTIMEOFDAY = 96 GETID = 186 + FUTEX = 202 SETROBUSTLIST = 273 ACCEPT4 = 288 ) @@ -102,7 +106,7 @@ func (t *thread) syscallStopped() { case SETROBUSTLIST, GETID, MMAP, MPROTECT, MADVISE, SOCKET, CLONE, STAT, SELECT: return - case OPEN: + case OPEN, FUTEX, SHUTDOWN, GETTIMEOFDAY, MUNMAP: return default: @@ -214,7 +218,7 @@ func (t *thread) handleClose(call, result *syscall.PtraceRegs) { return } - t.logf("Closing fd %d", fdNum) + //t.logf("Closing fd %d", fdNum) fd.close() // if this connection was incoming, add it to 'the registry' @@ -222,7 +226,7 @@ func (t *thread) handleClose(call, result *syscall.PtraceRegs) { // collect all the outgoing connections this thread has made // and treat them as caused by this incoming connections for _, outgoing := range t.currentOutgoing { - t.logf("Fd %d caused %d", fdNum, outgoing.fd) + //t.logf("Fd %d caused %d", fdNum, outgoing.fd) fd.Children = append(fd.Children, outgoing) } t.currentOutgoing = map[int]*Fd{} @@ -241,15 +245,15 @@ func (t *thread) handleIO(call, result *syscall.PtraceRegs) { fdNum := int(call.Rdi) fd, ok := t.process.fds[fdNum] if !ok { - t.logf("IO on unknown fd %d", fdNum) + //t.logf("IO on unknown fd %d", fdNum) return } if fd.direction == incoming { - t.logf("IO on incoming connection %d; setting affinity", fdNum) + //t.logf("IO on incoming connection %d; setting affinity", fdNum) t.currentIncoming[fdNum] = fd } else { - t.logf("IO on outgoing connection %d; setting affinity", fdNum) + //t.logf("IO on outgoing connection %d; setting affinity", fdNum) t.currentOutgoing[fdNum] = fd } } diff --git a/experimental/tracer/ui/index.html b/experimental/tracer/ui/index.html index b479f377c..35e0b32da 100644 --- a/experimental/tracer/ui/index.html +++ b/experimental/tracer/ui/index.html @@ -53,8 +53,18 @@ return new Handlebars.SafeString(ds); }); + function numChildren(input) { + if (input.Children === null) { + return 0 + } + var count = input.Children.length + $.each(input.Children, function(i, child) { + count += numChildren(child) + }) + return count + } Handlebars.registerHelper('count', function(input) { - return new Handlebars.SafeString(input === null ? '0' : sprintf("%d", input.length)); + return sprintf("%d", numChildren(input)); }); Handlebars.registerPartial('traces', $("#traces").html()); @@ -214,13 +224,13 @@ {{#with ServerDetails}} {{spaces ../Level}}{{ts Start}}{{duration .}} {{../PID}}{{FromAddr}}:{{FromPort}} - {{ToAddr}}:{{ToPort}}{{count ../Children}} + {{ToAddr}}:{{ToPort}}{{count ../.}} {{/with}} {{else}} {{#with ClientDetails}} {{spaces ../Level}}{{ts Start}}{{duration .}} {{../PID}}{{FromAddr}}:{{FromPort}} - {{ToAddr}}:{{ToPort}}{{count ../Children}} + {{ToAddr}}:{{ToPort}}{{count ../.}} {{/with}} {{/if}} @@ -236,14 +246,14 @@ {{spaces ../Level}}{{ts Start}}{{duration .}} {{../PID}}{{FromAddr}}:{{FromPort}} - {{ToAddr}}:{{ToPort}}{{count ../Children}} + {{ToAddr}}:{{ToPort}}{{count ../.}} {{/with}} {{else}} {{#with ClientDetails}} {{spaces ../Level}}{{ts Start}}{{duration .}} {{../PID}}{{FromAddr}}:{{FromPort}} - {{ToAddr}}:{{ToPort}}{{count ../Children}} + {{ToAddr}}:{{ToPort}}{{count ../.}} {{/with}} {{/if}} {{>children Children}}