Make temporal join work - now can have traces with more than one child!

This commit is contained in:
Tom Wilkie
2015-09-18 09:32:54 +00:00
committed by Tom Wilkie
parent baf7aa106c
commit e58f0f756f
3 changed files with 83 additions and 49 deletions

View File

@@ -1,11 +1,10 @@
package main
import (
"math/rand"
"sync"
"log"
"fmt"
"math/rand"
"sort"
"sync"
"github.com/msackman/skiplist"
@@ -26,12 +25,12 @@ func (k key) MarshalJSON() ([]byte, error) {
}
type trace struct {
PID int
Key key
PID int
Key key
ServerDetails *ptrace.ConnectionDetails
ClientDetails *ptrace.ConnectionDetails
Children []*trace
Level int
Children []*trace
Level int
}
type byKey []*trace
@@ -57,7 +56,20 @@ func newKey(fd *ptrace.Fd) key {
func (l key) LessThan(other skiplist.Comparable) bool {
r := other.(key)
return l.fromAddr < r.fromAddr && l.fromPort < r.fromPort && l.startTime < r.startTime
if l.fromAddr != r.fromAddr {
return l.fromAddr > r.fromAddr
}
if l.fromPort != r.fromPort {
return l.fromPort < r.fromPort
}
if l.Equal(other) {
return false
}
return l.startTime < r.startTime
}
func (l key) Equal(other skiplist.Comparable) bool {
@@ -74,50 +86,61 @@ func newStore() *store {
return &store{traces: skiplist.New(rand.New(rand.NewSource(0)))}
}
func (t *trace) addChild(child *trace) {
// find the child we're supposed to be replacing
for i, candidate := range t.Children {
if !candidate.Key.Equal(skiplist.Comparable(child.Key)) {
continue
}
// Fix up some fields
child.ClientDetails = candidate.ClientDetails
child.PID = candidate.PID
IncrementLevel(child, t.Level+1)
// Overwrite old record
t.Children[i] = child
return
}
}
func (s *store) RecordConnection(pid int, connection *ptrace.Fd) {
s.Lock()
defer s.Unlock()
newTrace := &trace{
PID: pid,
Key: newKey(connection),
PID: pid,
Key: newKey(connection),
ServerDetails: &connection.ConnectionDetails,
}
for _, child := range connection.Children {
newTrace.Children = append(newTrace.Children, &trace{
Level: 1,
Level: 1,
Key: newKey(child),
ClientDetails: &child.ConnectionDetails,
})
}
newTraceKey := newTrace.Key
// First, see if this new conneciton is a child of an existing connection.
// This indicates we have a parent connection to attach to.
// If not, insert this connection.
if parentNode := s.traces.Get(newTraceKey); parentNode != nil {
parentNode.Remove()
if parentNode := s.traces.Get(newTrace.Key); parentNode != nil {
parentTrace := parentNode.Value.(*trace)
log.Printf(" Found parent trace: %+v", parentTrace)
newTrace.Level = parentTrace.Level + 1
parentTrace.Children = append(parentTrace.Children, newTrace)
parentTrace.addChild(newTrace)
parentNode.Remove()
} else {
s.traces.Insert(newTraceKey, newTrace)
s.traces.Insert(newTrace.Key, newTrace)
}
// Next, see if we already know about the child connections
// If not, insert each of our children.
for _, childConnection := range connection.Children {
childTraceKey := newKey(childConnection)
if childNode := s.traces.Get(childTraceKey); childNode != nil {
childNode.Remove()
for _, child := range newTrace.Children {
if childNode := s.traces.Get(child.Key); childNode != nil {
childTrace := childNode.Value.(*trace)
log.Printf(" Found child trace: %+v", childTrace)
IncrementLevel(childTrace, newTrace.Level)
newTrace.Children = append(newTrace.Children, childTrace)
newTrace.addChild(childTrace)
childNode.Remove()
} else {
s.traces.Insert(childTraceKey, newTrace)
s.traces.Insert(child.Key, newTrace)
}
}
}
@@ -133,20 +156,17 @@ func (s *store) Traces() []*trace {
s.RLock()
defer s.RUnlock()
traces := map[key]*trace{}
traces := []*trace{}
var cur = s.traces.First()
for cur != nil {
key := cur.Key.(key)
trace := cur.Value.(*trace)
traces[trace.Key] = trace
if trace.Key == key {
traces = append(traces, trace)
}
cur = cur.Next()
}
result := []*trace{}
for _, trace := range traces {
result = append(result, trace)
}
sort.Sort(byKey(result))
return result
sort.Sort(byKey(traces))
return traces
}

View File

@@ -17,6 +17,7 @@ const (
STAT = 4
MMAP = 9
MPROTECT = 10
MUNMAP = 11
SELECT = 23
MADVISE = 28
SOCKET = 41
@@ -24,8 +25,11 @@ const (
ACCEPT = 43
SENDTO = 44
RECVFROM = 45
SHUTDOWN = 48
CLONE = 56
GETTIMEOFDAY = 96
GETID = 186
FUTEX = 202
SETROBUSTLIST = 273
ACCEPT4 = 288
)
@@ -102,7 +106,7 @@ func (t *thread) syscallStopped() {
case SETROBUSTLIST, GETID, MMAP, MPROTECT, MADVISE, SOCKET, CLONE, STAT, SELECT:
return
case OPEN:
case OPEN, FUTEX, SHUTDOWN, GETTIMEOFDAY, MUNMAP:
return
default:
@@ -214,7 +218,7 @@ func (t *thread) handleClose(call, result *syscall.PtraceRegs) {
return
}
t.logf("Closing fd %d", fdNum)
//t.logf("Closing fd %d", fdNum)
fd.close()
// if this connection was incoming, add it to 'the registry'
@@ -222,7 +226,7 @@ func (t *thread) handleClose(call, result *syscall.PtraceRegs) {
// collect all the outgoing connections this thread has made
// and treat them as caused by this incoming connections
for _, outgoing := range t.currentOutgoing {
t.logf("Fd %d caused %d", fdNum, outgoing.fd)
//t.logf("Fd %d caused %d", fdNum, outgoing.fd)
fd.Children = append(fd.Children, outgoing)
}
t.currentOutgoing = map[int]*Fd{}
@@ -241,15 +245,15 @@ func (t *thread) handleIO(call, result *syscall.PtraceRegs) {
fdNum := int(call.Rdi)
fd, ok := t.process.fds[fdNum]
if !ok {
t.logf("IO on unknown fd %d", fdNum)
//t.logf("IO on unknown fd %d", fdNum)
return
}
if fd.direction == incoming {
t.logf("IO on incoming connection %d; setting affinity", fdNum)
//t.logf("IO on incoming connection %d; setting affinity", fdNum)
t.currentIncoming[fdNum] = fd
} else {
t.logf("IO on outgoing connection %d; setting affinity", fdNum)
//t.logf("IO on outgoing connection %d; setting affinity", fdNum)
t.currentOutgoing[fdNum] = fd
}
}

View File

@@ -53,8 +53,18 @@
return new Handlebars.SafeString(ds);
});
function numChildren(input) {
if (input.Children === null) {
return 0
}
var count = input.Children.length
$.each(input.Children, function(i, child) {
count += numChildren(child)
})
return count
}
Handlebars.registerHelper('count', function(input) {
return new Handlebars.SafeString(input === null ? '0' : sprintf("%d", input.length));
return sprintf("%d", numChildren(input));
});
Handlebars.registerPartial('traces', $("#traces").html());
@@ -214,13 +224,13 @@
{{#with ServerDetails}}
<td>{{spaces ../Level}}{{ts Start}}</td><td>{{duration .}}</td>
<td>{{../PID}}</td><td>{{FromAddr}}:{{FromPort}}</td>
<td>{{ToAddr}}:{{ToPort}}</td><td>{{count ../Children}}</td>
<td>{{ToAddr}}:{{ToPort}}</td><td>{{count ../.}}</td>
{{/with}}
{{else}}
{{#with ClientDetails}}
<td>{{spaces ../Level}}{{ts Start}}</td><td>{{duration .}}</td>
<td>{{../PID}}</td><td>{{FromAddr}}:{{FromPort}}</td>
<td>{{ToAddr}}:{{ToPort}}</td><td>{{count ../Children}}</td>
<td>{{ToAddr}}:{{ToPort}}</td><td>{{count ../.}}</td>
{{/with}}
{{/if}}
</tr>
@@ -236,14 +246,14 @@
<tr>
<td>{{spaces ../Level}}{{ts Start}}</td><td>{{duration .}}</td>
<td>{{../PID}}</td><td>{{FromAddr}}:{{FromPort}}</td>
<td>{{ToAddr}}:{{ToPort}}</td><td>{{count ../Children}}</tr>
<td>{{ToAddr}}:{{ToPort}}</td><td>{{count ../.}}</tr>
{{/with}}
{{else}}
{{#with ClientDetails}}
<tr>
<td>{{spaces ../Level}}{{ts Start}}</td><td>{{duration .}}</td>
<td>{{../PID}}</td><td>{{FromAddr}}:{{FromPort}}</td>
<td>{{ToAddr}}:{{ToPort}}</td><td>{{count ../Children}}</tr>
<td>{{ToAddr}}:{{ToPort}}</td><td>{{count ../.}}</tr>
{{/with}}
{{/if}}
{{>children Children}}