Merge pull request #967 from weaveworks/539-connection-directions

Use connection directions from conntrack where possible
This commit is contained in:
Tom Wilkie
2016-02-23 14:52:21 +00:00
9 changed files with 106 additions and 143 deletions

View File

@@ -12,7 +12,6 @@
"EdgeMetadatas": {
";192.168.1.1;10746|theinternet": {
"WithConnCountTCP": true,
"MaxConnCountTCP": 19
}
},
"NodeMetadatas": {
@@ -43,7 +42,6 @@
"EdgeMetadatas": {
";192.168.1.1|theinternet": {
"WithConnCountTCP": true,
"MaxConnCountTCP": 12
}
},
"NodeMetadatas": {

View File

@@ -94,16 +94,12 @@ func demoReport(nodeCount int) report.Report {
process.PID: "4000",
"name": c.srcProc,
"domain": "node-" + src,
}).WithEdge(dstPortID, report.EdgeMetadata{
MaxConnCountTCP: newu64(uint64(rand.Intn(100) + 10)),
}))
}).WithEdge(dstPortID, report.EdgeMetadata{}))
r.Endpoint = r.Endpoint.AddNode(dstPortID, report.MakeNode().WithLatests(map[string]string{
process.PID: "4000",
"name": c.dstProc,
"domain": "node-" + dst,
}).WithEdge(srcPortID, report.EdgeMetadata{
MaxConnCountTCP: newu64(uint64(rand.Intn(100) + 10)),
}))
}).WithEdge(srcPortID, report.EdgeMetadata{}))
// Address topology
r.Address = r.Address.AddNode(srcAddressID, report.MakeNode().WithLatests(map[string]string{

View File

@@ -68,16 +68,12 @@ func DemoReport(nodeCount int) report.Report {
"pid": "4000",
"name": c.srcProc,
"domain": "node-" + src,
}).WithEdge(dstPortID, report.EdgeMetadata{
MaxConnCountTCP: newu64(uint64(rand.Intn(100) + 10)),
}))
}).WithEdge(dstPortID, report.EdgeMetadata{}))
r.Endpoint = r.Endpoint.AddNode(dstPortID, report.MakeNode().WithLatests(map[string]string{
"pid": "4000",
"name": c.dstProc,
"domain": "node-" + dst,
}).WithEdge(srcPortID, report.EdgeMetadata{
MaxConnCountTCP: newu64(uint64(rand.Intn(100) + 10)),
}))
}).WithEdge(srcPortID, report.EdgeMetadata{}))
// Address topology
r.Address = r.Address.AddNode(srcAddressID, report.MakeNode().WithLatests(map[string]string{

View File

@@ -1,7 +1,10 @@
package endpoint
import (
"fmt"
"sort"
"strconv"
"strings"
"time"
"github.com/prometheus/client_golang/prometheus"
@@ -71,6 +74,27 @@ func (r *Reporter) Stop() {
r.scanner.Stop()
}
type fourTuple struct {
fromAddr, toAddr string
fromPort, toPort uint16
}
// key is a sortable direction-independent key for tuples, used to look up a
// fourTuple, when you are unsure of it's direction.
func (t fourTuple) key() string {
key := []string{
fmt.Sprintf("%s:%d", t.fromAddr, t.fromPort),
fmt.Sprintf("%s:%d", t.toAddr, t.toPort),
}
sort.Strings(key)
return strings.Join(key, " ")
}
// reverse flips the direction of the tuple
func (t *fourTuple) reverse() {
t.fromAddr, t.fromPort, t.toAddr, t.toPort = t.toAddr, t.toPort, t.fromAddr, t.fromPort
}
// Report implements Reporter.
func (r *Reporter) Report() (report.Report, error) {
defer func(begin time.Time) {
@@ -79,32 +103,7 @@ func (r *Reporter) Report() (report.Report, error) {
hostNodeID := report.MakeHostNodeID(r.hostID)
rpt := report.MakeReport()
{
conns, err := r.scanner.Connections(r.includeProcesses)
if err != nil {
return rpt, err
}
commonNodeInfo := report.MakeNode().WithLatests(map[string]string{
Procspied: "true",
})
for conn := conns.Next(); conn != nil; conn = conns.Next() {
var (
localPort = conn.LocalPort
remotePort = conn.RemotePort
localAddr = conn.LocalAddress.String()
remoteAddr = conn.RemoteAddress.String()
)
extraNodeInfo := commonNodeInfo.Copy()
if conn.Proc.PID > 0 {
extraNodeInfo = extraNodeInfo.WithLatests(map[string]string{
process.PID: strconv.FormatUint(uint64(conn.Proc.PID), 10),
report.HostNodeID: hostNodeID,
})
}
r.addConnection(&rpt, localAddr, remoteAddr, localPort, remotePort, &extraNodeInfo, &commonNodeInfo)
}
}
seenTuples := map[string]fourTuple{}
// Consult the flowWalker for short-live connections
{
@@ -112,107 +111,115 @@ func (r *Reporter) Report() (report.Report, error) {
Conntracked: "true",
})
r.flowWalker.walkFlows(func(f flow) {
var (
localPort = uint16(f.Original.Layer4.SrcPort)
remotePort = uint16(f.Original.Layer4.DstPort)
localAddr = f.Original.Layer3.SrcIP
remoteAddr = f.Original.Layer3.DstIP
)
r.addConnection(&rpt, localAddr, remoteAddr, localPort, remotePort, &extraNodeInfo, &extraNodeInfo)
tuple := fourTuple{
f.Original.Layer3.SrcIP,
f.Original.Layer3.DstIP,
uint16(f.Original.Layer4.SrcPort),
uint16(f.Original.Layer4.DstPort),
}
seenTuples[tuple.key()] = tuple
r.addConnection(&rpt, tuple, &extraNodeInfo, &extraNodeInfo)
})
}
{
conns, err := r.scanner.Connections(r.includeProcesses)
if err != nil {
return rpt, err
}
extraNodeInfo := report.MakeNode().WithLatests(map[string]string{
Procspied: "true",
})
for conn := conns.Next(); conn != nil; conn = conns.Next() {
var (
tuple = fourTuple{
conn.LocalAddress.String(),
conn.RemoteAddress.String(),
conn.LocalPort,
conn.RemotePort,
}
toNodeInfo, fromNodeInfo = extraNodeInfo.Copy(), extraNodeInfo.Copy()
)
if conn.Proc.PID > 0 {
fromNodeInfo = fromNodeInfo.WithLatests(map[string]string{
process.PID: strconv.FormatUint(uint64(conn.Proc.PID), 10),
report.HostNodeID: hostNodeID,
})
}
// If we've already seen this connection, we should know the direction
// (or have already figured it out), so we normalize and use the
// canonical direction. Otherwise, we can use a port-heuristic to guess
// the direction.
canonical, ok := seenTuples[tuple.key()]
if (ok && canonical != tuple) || (!ok && tuple.fromPort < tuple.toPort) {
tuple.reverse()
toNodeInfo, fromNodeInfo = fromNodeInfo, toNodeInfo
}
r.addConnection(&rpt, tuple, &fromNodeInfo, &toNodeInfo)
}
}
r.natMapper.applyNAT(rpt, r.hostID)
return rpt, nil
}
func (r *Reporter) addConnection(rpt *report.Report, localAddr, remoteAddr string, localPort, remotePort uint16, extraLocalNode, extraRemoteNode *report.Node) {
localIsClient := int(localPort) > int(remotePort)
func (r *Reporter) addConnection(rpt *report.Report, t fourTuple, extraFromNode, extraToNode *report.Node) {
// Update address topology
{
var (
localAddressNodeID = report.MakeAddressNodeID(r.hostID, localAddr)
remoteAddressNodeID = report.MakeAddressNodeID(r.hostID, remoteAddr)
localNode = report.MakeNodeWith(map[string]string{
"name": r.hostName,
Addr: localAddr,
})
remoteNode = report.MakeNodeWith(map[string]string{
Addr: remoteAddr,
})
fromAddressNodeID = report.MakeAddressNodeID(r.hostID, t.fromAddr)
toAddressNodeID = report.MakeAddressNodeID(r.hostID, t.toAddr)
fromNode = report.MakeNodeWith(map[string]string{Addr: t.fromAddr}).WithEdge(toAddressNodeID, report.EdgeMetadata{})
toNode = report.MakeNodeWith(map[string]string{Addr: t.toAddr})
)
// In case we have a reverse resolution for the IP, we can use it for
// the name...
if remoteNames, err := r.reverseResolver.get(remoteAddr); err == nil {
remoteNode = remoteNode.WithSet("name", report.MakeStringSet(remoteNames...))
if toNames, err := r.reverseResolver.get(t.toAddr); err == nil {
toNode = toNode.WithSet("name", report.MakeStringSet(toNames...))
}
if localIsClient {
// New nodes are merged into the report so we don't need to do any
// counting here; the merge does it for us.
localNode = localNode.WithEdge(remoteAddressNodeID, report.EdgeMetadata{
MaxConnCountTCP: newu64(1),
})
} else {
remoteNode = localNode.WithEdge(localAddressNodeID, report.EdgeMetadata{
MaxConnCountTCP: newu64(1),
})
if extraFromNode != nil {
fromNode = fromNode.Merge(*extraFromNode)
}
if extraLocalNode != nil {
localNode = localNode.Merge(*extraLocalNode)
if extraToNode != nil {
toNode = toNode.Merge(*extraToNode)
}
if extraRemoteNode != nil {
remoteNode = remoteNode.Merge(*extraRemoteNode)
}
rpt.Address = rpt.Address.AddNode(localAddressNodeID, localNode)
rpt.Address = rpt.Address.AddNode(remoteAddressNodeID, remoteNode)
rpt.Address = rpt.Address.AddNode(fromAddressNodeID, fromNode)
rpt.Address = rpt.Address.AddNode(toAddressNodeID, toNode)
}
// Update endpoint topology
if r.includeProcesses {
var (
localEndpointNodeID = report.MakeEndpointNodeID(r.hostID, localAddr, strconv.Itoa(int(localPort)))
remoteEndpointNodeID = report.MakeEndpointNodeID(r.hostID, remoteAddr, strconv.Itoa(int(remotePort)))
fromEndpointNodeID = report.MakeEndpointNodeID(r.hostID, t.fromAddr, strconv.Itoa(int(t.fromPort)))
toEndpointNodeID = report.MakeEndpointNodeID(r.hostID, t.toAddr, strconv.Itoa(int(t.toPort)))
localNode = report.MakeNodeWith(map[string]string{
Addr: localAddr,
Port: strconv.Itoa(int(localPort)),
})
remoteNode = report.MakeNodeWith(map[string]string{
Addr: remoteAddr,
Port: strconv.Itoa(int(remotePort)),
fromNode = report.MakeNodeWith(map[string]string{
Addr: t.fromAddr,
Port: strconv.Itoa(int(t.fromPort)),
}).WithEdge(toEndpointNodeID, report.EdgeMetadata{})
toNode = report.MakeNodeWith(map[string]string{
Addr: t.toAddr,
Port: strconv.Itoa(int(t.toPort)),
})
)
// In case we have a reverse resolution for the IP, we can use it for
// the name...
if remoteNames, err := r.reverseResolver.get(remoteAddr); err == nil {
remoteNode = remoteNode.WithSet("name", report.MakeStringSet(remoteNames...))
if toNames, err := r.reverseResolver.get(t.toAddr); err == nil {
toNode = toNode.WithSet("name", report.MakeStringSet(toNames...))
}
if localIsClient {
// New nodes are merged into the report so we don't need to do any
// counting here; the merge does it for us.
localNode = localNode.WithEdge(remoteEndpointNodeID, report.EdgeMetadata{
MaxConnCountTCP: newu64(1),
})
} else {
remoteNode = remoteNode.WithEdge(localEndpointNodeID, report.EdgeMetadata{
MaxConnCountTCP: newu64(1),
})
if extraFromNode != nil {
fromNode = fromNode.Merge(*extraFromNode)
}
if extraLocalNode != nil {
localNode = localNode.Merge(*extraLocalNode)
if extraToNode != nil {
toNode = toNode.Merge(*extraToNode)
}
if extraRemoteNode != nil {
remoteNode = remoteNode.Merge(*extraRemoteNode)
}
rpt.Endpoint = rpt.Endpoint.AddNode(localEndpointNodeID, localNode)
rpt.Endpoint = rpt.Endpoint.AddNode(remoteEndpointNodeID, remoteNode)
rpt.Endpoint = rpt.Endpoint.AddNode(fromEndpointNodeID, fromNode)
rpt.Endpoint = rpt.Endpoint.AddNode(toEndpointNodeID, toNode)
}
}

View File

@@ -5,7 +5,6 @@ import (
"strconv"
"testing"
"github.com/weaveworks/scope/probe/docker"
"github.com/weaveworks/scope/probe/endpoint"
"github.com/weaveworks/scope/probe/endpoint/procspy"
"github.com/weaveworks/scope/report"
@@ -85,11 +84,6 @@ func TestSpyNoProcesses(t *testing.T) {
scopedRemote = report.MakeAddressNodeID(nodeID, fixRemoteAddress.String())
)
have, _ := r.Address.Nodes[scopedLocal].Latest.Lookup(docker.Name)
if want, have := nodeName, have; want != have {
t.Fatalf("want %q, have %q", want, have)
}
if want, have := 1, len(r.Address.Nodes[scopedRemote].Adjacency); want != have {
t.Fatalf("want %d, have %d", want, have)
}

View File

@@ -312,7 +312,6 @@ var (
EdgeMetadata: report.EdgeMetadata{
IngressPacketCount: newu64(210),
IngressByteCount: newu64(2100),
MaxConnCountTCP: newu64(3),
},
},
ClientHostRenderedID: {
@@ -331,7 +330,6 @@ var (
EdgeMetadata: report.EdgeMetadata{
EgressPacketCount: newu64(30),
EgressByteCount: newu64(300),
MaxConnCountTCP: newu64(3),
},
},
pseudoHostID1: {

View File

@@ -217,7 +217,6 @@ type EdgeMetadata struct {
IngressPacketCount *uint64 `json:"ingress_packet_count,omitempty"`
EgressByteCount *uint64 `json:"egress_byte_count,omitempty"` // Transport layer
IngressByteCount *uint64 `json:"ingress_byte_count,omitempty"` // Transport layer
MaxConnCountTCP *uint64 `json:"max_conn_count_tcp,omitempty"`
}
// Copy returns a value copy of the EdgeMetadata.
@@ -227,7 +226,6 @@ func (e EdgeMetadata) Copy() EdgeMetadata {
IngressPacketCount: cpu64ptr(e.IngressPacketCount),
EgressByteCount: cpu64ptr(e.EgressByteCount),
IngressByteCount: cpu64ptr(e.IngressByteCount),
MaxConnCountTCP: cpu64ptr(e.MaxConnCountTCP),
}
}
@@ -238,7 +236,6 @@ func (e EdgeMetadata) Reversed() EdgeMetadata {
IngressPacketCount: cpu64ptr(e.EgressPacketCount),
EgressByteCount: cpu64ptr(e.IngressByteCount),
IngressByteCount: cpu64ptr(e.EgressByteCount),
MaxConnCountTCP: cpu64ptr(e.MaxConnCountTCP),
}
}
@@ -259,7 +256,6 @@ func (e EdgeMetadata) Merge(other EdgeMetadata) EdgeMetadata {
cp.IngressPacketCount = merge(cp.IngressPacketCount, other.IngressPacketCount, sum)
cp.EgressByteCount = merge(cp.EgressByteCount, other.EgressByteCount, sum)
cp.IngressByteCount = merge(cp.IngressByteCount, other.IngressByteCount, sum)
cp.MaxConnCountTCP = merge(cp.MaxConnCountTCP, other.MaxConnCountTCP, max)
return cp
}
@@ -272,9 +268,6 @@ func (e EdgeMetadata) Flatten(other EdgeMetadata) EdgeMetadata {
cp.IngressPacketCount = merge(cp.IngressPacketCount, other.IngressPacketCount, sum)
cp.EgressByteCount = merge(cp.EgressByteCount, other.EgressByteCount, sum)
cp.IngressByteCount = merge(cp.IngressByteCount, other.IngressByteCount, sum)
// Note that summing of two maximums doesn't always give us the true
// maximum. But it's a best effort.
cp.MaxConnCountTCP = merge(cp.MaxConnCountTCP, other.MaxConnCountTCP, sum)
return cp
}

View File

@@ -71,13 +71,11 @@ func TestEdgeMetadatasMerge(t *testing.T) {
Add("hostA|:192.168.1.1:12345|:192.168.1.2:80",
EdgeMetadata{
EgressPacketCount: newu64(1),
MaxConnCountTCP: newu64(2),
}),
want: EmptyEdgeMetadatas.
Add("hostA|:192.168.1.1:12345|:192.168.1.2:80",
EdgeMetadata{
EgressPacketCount: newu64(1),
MaxConnCountTCP: newu64(2),
}),
},
"Empty b": {
@@ -101,27 +99,23 @@ func TestEdgeMetadatasMerge(t *testing.T) {
EdgeMetadata{
EgressPacketCount: newu64(12),
EgressByteCount: newu64(500),
MaxConnCountTCP: newu64(4),
}),
b: EmptyEdgeMetadatas.
Add("hostQ|:192.168.1.1:12345|:192.168.1.2:80",
EdgeMetadata{
EgressPacketCount: newu64(1),
EgressByteCount: newu64(2),
MaxConnCountTCP: newu64(6),
}),
want: EmptyEdgeMetadatas.
Add("hostA|:192.168.1.1:12345|:192.168.1.2:80",
EdgeMetadata{
EgressPacketCount: newu64(12),
EgressByteCount: newu64(500),
MaxConnCountTCP: newu64(4),
}).
Add("hostQ|:192.168.1.1:12345|:192.168.1.2:80",
EdgeMetadata{
EgressPacketCount: newu64(1),
EgressByteCount: newu64(2),
MaxConnCountTCP: newu64(6),
}),
},
"Overlapping a & b": {
@@ -130,7 +124,6 @@ func TestEdgeMetadatasMerge(t *testing.T) {
EdgeMetadata{
EgressPacketCount: newu64(12),
EgressByteCount: newu64(1000),
MaxConnCountTCP: newu64(7),
}),
b: EmptyEdgeMetadatas.
Add("hostA|:192.168.1.1:12345|:192.168.1.2:80",
@@ -138,7 +131,6 @@ func TestEdgeMetadatasMerge(t *testing.T) {
EgressPacketCount: newu64(1),
IngressByteCount: newu64(123),
EgressByteCount: newu64(2),
MaxConnCountTCP: newu64(9),
}),
want: EmptyEdgeMetadatas.
Add("hostA|:192.168.1.1:12345|:192.168.1.2:80",
@@ -146,7 +138,6 @@ func TestEdgeMetadatasMerge(t *testing.T) {
EgressPacketCount: newu64(13),
IngressByteCount: newu64(123),
EgressByteCount: newu64(1002),
MaxConnCountTCP: newu64(9),
}),
},
} {
@@ -161,16 +152,13 @@ func TestEdgeMetadataFlatten(t *testing.T) {
{
have := (EdgeMetadata{
EgressPacketCount: newu64(1),
MaxConnCountTCP: newu64(2),
}).Flatten(EdgeMetadata{
EgressPacketCount: newu64(4),
EgressByteCount: newu64(8),
MaxConnCountTCP: newu64(16),
})
want := EdgeMetadata{
EgressPacketCount: newu64(1 + 4),
EgressByteCount: newu64(8),
MaxConnCountTCP: newu64(2 + 16), // flatten should sum MaxConnCountTCP
}
if !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
@@ -183,15 +171,12 @@ func TestEdgeMetadataFlatten(t *testing.T) {
have := EmptyEdgeMetadatas.
Add("foo", EdgeMetadata{
EgressPacketCount: newu64(1),
MaxConnCountTCP: newu64(2),
}).
Add("bar", EdgeMetadata{
EgressPacketCount: newu64(3),
MaxConnCountTCP: newu64(5),
}).Flatten()
want := EdgeMetadata{
EgressPacketCount: newu64(1 + 3),
MaxConnCountTCP: newu64(2 + 5),
}
if !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
@@ -224,11 +209,9 @@ func TestEdgeMetadatasEncoding(t *testing.T) {
want := EmptyEdgeMetadatas.
Add("foo", EdgeMetadata{
EgressPacketCount: newu64(1),
MaxConnCountTCP: newu64(2),
}).
Add("bar", EdgeMetadata{
EgressPacketCount: newu64(3),
MaxConnCountTCP: newu64(5),
})
{

View File

@@ -320,9 +320,7 @@ var (
ClientAddressNodeID: report.MakeNode().WithLatests(map[string]string{
endpoint.Addr: ClientIP,
report.HostNodeID: ClientHostNodeID,
}).WithEdge(ServerAddressNodeID, report.EdgeMetadata{
MaxConnCountTCP: newu64(3),
}),
}).WithEdge(ServerAddressNodeID, report.EdgeMetadata{}),
ServerAddressNodeID: report.MakeNode().WithLatests(map[string]string{
endpoint.Addr: ServerIP,