From 6cef1b10cae4bbbb458ed96128dd3c183bdebe88 Mon Sep 17 00:00:00 2001 From: Paul Bellamy Date: Tue, 16 Feb 2016 10:24:16 +0000 Subject: [PATCH] adding direction to connections from conntrack * Remove report.EdgeMetadata.MaxConnCountTCP, as we don't display it anywhere * Remove hostname metadata from local end of connection. We should be using the hostnodeid --- .../_integration/test_single_report.json | 2 - experimental/demoprobe/main.go | 8 +- experimental/genreport/generate.go | 8 +- probe/endpoint/reporter.go | 195 +++++++++--------- probe/endpoint/reporter_test.go | 6 - render/expected/expected.go | 2 - report/edge_metadatas.go | 7 - report/edge_metadatas_internal_test.go | 17 -- test/fixture/report_fixture.go | 4 +- 9 files changed, 106 insertions(+), 143 deletions(-) diff --git a/experimental/_integration/test_single_report.json b/experimental/_integration/test_single_report.json index 8a9c1161d..bb8b75c7e 100644 --- a/experimental/_integration/test_single_report.json +++ b/experimental/_integration/test_single_report.json @@ -12,7 +12,6 @@ "EdgeMetadatas": { ";192.168.1.1;10746|theinternet": { "WithConnCountTCP": true, - "MaxConnCountTCP": 19 } }, "NodeMetadatas": { @@ -43,7 +42,6 @@ "EdgeMetadatas": { ";192.168.1.1|theinternet": { "WithConnCountTCP": true, - "MaxConnCountTCP": 12 } }, "NodeMetadatas": { diff --git a/experimental/demoprobe/main.go b/experimental/demoprobe/main.go index 7e0d12c3c..fbee76232 100644 --- a/experimental/demoprobe/main.go +++ b/experimental/demoprobe/main.go @@ -94,16 +94,12 @@ func demoReport(nodeCount int) report.Report { process.PID: "4000", "name": c.srcProc, "domain": "node-" + src, - }).WithEdge(dstPortID, report.EdgeMetadata{ - MaxConnCountTCP: newu64(uint64(rand.Intn(100) + 10)), - })) + }).WithEdge(dstPortID, report.EdgeMetadata{})) r.Endpoint = r.Endpoint.AddNode(dstPortID, report.MakeNode().WithLatests(map[string]string{ process.PID: "4000", "name": c.dstProc, "domain": "node-" + dst, - }).WithEdge(srcPortID, report.EdgeMetadata{ - MaxConnCountTCP: newu64(uint64(rand.Intn(100) + 10)), - })) + }).WithEdge(srcPortID, report.EdgeMetadata{})) // Address topology r.Address = r.Address.AddNode(srcAddressID, report.MakeNode().WithLatests(map[string]string{ diff --git a/experimental/genreport/generate.go b/experimental/genreport/generate.go index 7f7735882..b7e3016ac 100644 --- a/experimental/genreport/generate.go +++ b/experimental/genreport/generate.go @@ -68,16 +68,12 @@ func DemoReport(nodeCount int) report.Report { "pid": "4000", "name": c.srcProc, "domain": "node-" + src, - }).WithEdge(dstPortID, report.EdgeMetadata{ - MaxConnCountTCP: newu64(uint64(rand.Intn(100) + 10)), - })) + }).WithEdge(dstPortID, report.EdgeMetadata{})) r.Endpoint = r.Endpoint.AddNode(dstPortID, report.MakeNode().WithLatests(map[string]string{ "pid": "4000", "name": c.dstProc, "domain": "node-" + dst, - }).WithEdge(srcPortID, report.EdgeMetadata{ - MaxConnCountTCP: newu64(uint64(rand.Intn(100) + 10)), - })) + }).WithEdge(srcPortID, report.EdgeMetadata{})) // Address topology r.Address = r.Address.AddNode(srcAddressID, report.MakeNode().WithLatests(map[string]string{ diff --git a/probe/endpoint/reporter.go b/probe/endpoint/reporter.go index 5999312c6..6b7e9be6f 100644 --- a/probe/endpoint/reporter.go +++ b/probe/endpoint/reporter.go @@ -1,7 +1,10 @@ package endpoint import ( + "fmt" + "sort" "strconv" + "strings" "time" "github.com/prometheus/client_golang/prometheus" @@ -71,6 +74,27 @@ func (r *Reporter) Stop() { r.scanner.Stop() } +type fourTuple struct { + fromAddr, toAddr string + fromPort, toPort uint16 +} + +// key is a sortable direction-independent key for tuples, used to look up a +// fourTuple, when you are unsure of it's direction. +func (t fourTuple) key() string { + key := []string{ + fmt.Sprintf("%s:%d", t.fromAddr, t.fromPort), + fmt.Sprintf("%s:%d", t.toAddr, t.toPort), + } + sort.Strings(key) + return strings.Join(key, " ") +} + +// reverse flips the direction of the tuple +func (t *fourTuple) reverse() { + t.fromAddr, t.fromPort, t.toAddr, t.toPort = t.toAddr, t.toPort, t.fromAddr, t.fromPort +} + // Report implements Reporter. func (r *Reporter) Report() (report.Report, error) { defer func(begin time.Time) { @@ -79,32 +103,7 @@ func (r *Reporter) Report() (report.Report, error) { hostNodeID := report.MakeHostNodeID(r.hostID) rpt := report.MakeReport() - - { - conns, err := r.scanner.Connections(r.includeProcesses) - if err != nil { - return rpt, err - } - commonNodeInfo := report.MakeNode().WithLatests(map[string]string{ - Procspied: "true", - }) - for conn := conns.Next(); conn != nil; conn = conns.Next() { - var ( - localPort = conn.LocalPort - remotePort = conn.RemotePort - localAddr = conn.LocalAddress.String() - remoteAddr = conn.RemoteAddress.String() - ) - extraNodeInfo := commonNodeInfo.Copy() - if conn.Proc.PID > 0 { - extraNodeInfo = extraNodeInfo.WithLatests(map[string]string{ - process.PID: strconv.FormatUint(uint64(conn.Proc.PID), 10), - report.HostNodeID: hostNodeID, - }) - } - r.addConnection(&rpt, localAddr, remoteAddr, localPort, remotePort, &extraNodeInfo, &commonNodeInfo) - } - } + seenTuples := map[string]fourTuple{} // Consult the flowWalker for short-live connections { @@ -112,107 +111,115 @@ func (r *Reporter) Report() (report.Report, error) { Conntracked: "true", }) r.flowWalker.walkFlows(func(f flow) { - var ( - localPort = uint16(f.Original.Layer4.SrcPort) - remotePort = uint16(f.Original.Layer4.DstPort) - localAddr = f.Original.Layer3.SrcIP - remoteAddr = f.Original.Layer3.DstIP - ) - r.addConnection(&rpt, localAddr, remoteAddr, localPort, remotePort, &extraNodeInfo, &extraNodeInfo) + tuple := fourTuple{ + f.Original.Layer3.SrcIP, + f.Original.Layer3.DstIP, + uint16(f.Original.Layer4.SrcPort), + uint16(f.Original.Layer4.DstPort), + } + seenTuples[tuple.key()] = tuple + r.addConnection(&rpt, tuple, &extraNodeInfo, &extraNodeInfo) }) } + { + conns, err := r.scanner.Connections(r.includeProcesses) + if err != nil { + return rpt, err + } + extraNodeInfo := report.MakeNode().WithLatests(map[string]string{ + Procspied: "true", + }) + for conn := conns.Next(); conn != nil; conn = conns.Next() { + var ( + tuple = fourTuple{ + conn.LocalAddress.String(), + conn.RemoteAddress.String(), + conn.LocalPort, + conn.RemotePort, + } + toNodeInfo, fromNodeInfo = extraNodeInfo.Copy(), extraNodeInfo.Copy() + ) + if conn.Proc.PID > 0 { + fromNodeInfo = fromNodeInfo.WithLatests(map[string]string{ + process.PID: strconv.FormatUint(uint64(conn.Proc.PID), 10), + report.HostNodeID: hostNodeID, + }) + } + + // If we've already seen this connection, we should know the direction + // (or have already figured it out), so we normalize and use the + // canonical direction. Otherwise, we can use a port-heuristic to guess + // the direction. + canonical, ok := seenTuples[tuple.key()] + if (ok && canonical != tuple) || (!ok && tuple.fromPort < tuple.toPort) { + tuple.reverse() + toNodeInfo, fromNodeInfo = fromNodeInfo, toNodeInfo + } + r.addConnection(&rpt, tuple, &fromNodeInfo, &toNodeInfo) + } + } + r.natMapper.applyNAT(rpt, r.hostID) return rpt, nil } -func (r *Reporter) addConnection(rpt *report.Report, localAddr, remoteAddr string, localPort, remotePort uint16, extraLocalNode, extraRemoteNode *report.Node) { - localIsClient := int(localPort) > int(remotePort) - +func (r *Reporter) addConnection(rpt *report.Report, t fourTuple, extraFromNode, extraToNode *report.Node) { // Update address topology { var ( - localAddressNodeID = report.MakeAddressNodeID(r.hostID, localAddr) - remoteAddressNodeID = report.MakeAddressNodeID(r.hostID, remoteAddr) - localNode = report.MakeNodeWith(map[string]string{ - "name": r.hostName, - Addr: localAddr, - }) - remoteNode = report.MakeNodeWith(map[string]string{ - Addr: remoteAddr, - }) + fromAddressNodeID = report.MakeAddressNodeID(r.hostID, t.fromAddr) + toAddressNodeID = report.MakeAddressNodeID(r.hostID, t.toAddr) + fromNode = report.MakeNodeWith(map[string]string{Addr: t.fromAddr}).WithEdge(toAddressNodeID, report.EdgeMetadata{}) + toNode = report.MakeNodeWith(map[string]string{Addr: t.toAddr}) ) // In case we have a reverse resolution for the IP, we can use it for // the name... - if remoteNames, err := r.reverseResolver.get(remoteAddr); err == nil { - remoteNode = remoteNode.WithSet("name", report.MakeStringSet(remoteNames...)) + if toNames, err := r.reverseResolver.get(t.toAddr); err == nil { + toNode = toNode.WithSet("name", report.MakeStringSet(toNames...)) } - if localIsClient { - // New nodes are merged into the report so we don't need to do any - // counting here; the merge does it for us. - localNode = localNode.WithEdge(remoteAddressNodeID, report.EdgeMetadata{ - MaxConnCountTCP: newu64(1), - }) - } else { - remoteNode = localNode.WithEdge(localAddressNodeID, report.EdgeMetadata{ - MaxConnCountTCP: newu64(1), - }) + if extraFromNode != nil { + fromNode = fromNode.Merge(*extraFromNode) } - - if extraLocalNode != nil { - localNode = localNode.Merge(*extraLocalNode) + if extraToNode != nil { + toNode = toNode.Merge(*extraToNode) } - if extraRemoteNode != nil { - remoteNode = remoteNode.Merge(*extraRemoteNode) - } - rpt.Address = rpt.Address.AddNode(localAddressNodeID, localNode) - rpt.Address = rpt.Address.AddNode(remoteAddressNodeID, remoteNode) + rpt.Address = rpt.Address.AddNode(fromAddressNodeID, fromNode) + rpt.Address = rpt.Address.AddNode(toAddressNodeID, toNode) } // Update endpoint topology if r.includeProcesses { var ( - localEndpointNodeID = report.MakeEndpointNodeID(r.hostID, localAddr, strconv.Itoa(int(localPort))) - remoteEndpointNodeID = report.MakeEndpointNodeID(r.hostID, remoteAddr, strconv.Itoa(int(remotePort))) + fromEndpointNodeID = report.MakeEndpointNodeID(r.hostID, t.fromAddr, strconv.Itoa(int(t.fromPort))) + toEndpointNodeID = report.MakeEndpointNodeID(r.hostID, t.toAddr, strconv.Itoa(int(t.toPort))) - localNode = report.MakeNodeWith(map[string]string{ - Addr: localAddr, - Port: strconv.Itoa(int(localPort)), - }) - remoteNode = report.MakeNodeWith(map[string]string{ - Addr: remoteAddr, - Port: strconv.Itoa(int(remotePort)), + fromNode = report.MakeNodeWith(map[string]string{ + Addr: t.fromAddr, + Port: strconv.Itoa(int(t.fromPort)), + }).WithEdge(toEndpointNodeID, report.EdgeMetadata{}) + toNode = report.MakeNodeWith(map[string]string{ + Addr: t.toAddr, + Port: strconv.Itoa(int(t.toPort)), }) ) // In case we have a reverse resolution for the IP, we can use it for // the name... - if remoteNames, err := r.reverseResolver.get(remoteAddr); err == nil { - remoteNode = remoteNode.WithSet("name", report.MakeStringSet(remoteNames...)) + if toNames, err := r.reverseResolver.get(t.toAddr); err == nil { + toNode = toNode.WithSet("name", report.MakeStringSet(toNames...)) } - if localIsClient { - // New nodes are merged into the report so we don't need to do any - // counting here; the merge does it for us. - localNode = localNode.WithEdge(remoteEndpointNodeID, report.EdgeMetadata{ - MaxConnCountTCP: newu64(1), - }) - } else { - remoteNode = remoteNode.WithEdge(localEndpointNodeID, report.EdgeMetadata{ - MaxConnCountTCP: newu64(1), - }) + if extraFromNode != nil { + fromNode = fromNode.Merge(*extraFromNode) } - - if extraLocalNode != nil { - localNode = localNode.Merge(*extraLocalNode) + if extraToNode != nil { + toNode = toNode.Merge(*extraToNode) } - if extraRemoteNode != nil { - remoteNode = remoteNode.Merge(*extraRemoteNode) - } - rpt.Endpoint = rpt.Endpoint.AddNode(localEndpointNodeID, localNode) - rpt.Endpoint = rpt.Endpoint.AddNode(remoteEndpointNodeID, remoteNode) + rpt.Endpoint = rpt.Endpoint.AddNode(fromEndpointNodeID, fromNode) + rpt.Endpoint = rpt.Endpoint.AddNode(toEndpointNodeID, toNode) } } diff --git a/probe/endpoint/reporter_test.go b/probe/endpoint/reporter_test.go index ba3c2e738..5d8d6de3c 100644 --- a/probe/endpoint/reporter_test.go +++ b/probe/endpoint/reporter_test.go @@ -5,7 +5,6 @@ import ( "strconv" "testing" - "github.com/weaveworks/scope/probe/docker" "github.com/weaveworks/scope/probe/endpoint" "github.com/weaveworks/scope/probe/endpoint/procspy" "github.com/weaveworks/scope/report" @@ -85,11 +84,6 @@ func TestSpyNoProcesses(t *testing.T) { scopedRemote = report.MakeAddressNodeID(nodeID, fixRemoteAddress.String()) ) - have, _ := r.Address.Nodes[scopedLocal].Latest.Lookup(docker.Name) - if want, have := nodeName, have; want != have { - t.Fatalf("want %q, have %q", want, have) - } - if want, have := 1, len(r.Address.Nodes[scopedRemote].Adjacency); want != have { t.Fatalf("want %d, have %d", want, have) } diff --git a/render/expected/expected.go b/render/expected/expected.go index b06079e7b..026da4157 100644 --- a/render/expected/expected.go +++ b/render/expected/expected.go @@ -312,7 +312,6 @@ var ( EdgeMetadata: report.EdgeMetadata{ IngressPacketCount: newu64(210), IngressByteCount: newu64(2100), - MaxConnCountTCP: newu64(3), }, }, ClientHostRenderedID: { @@ -331,7 +330,6 @@ var ( EdgeMetadata: report.EdgeMetadata{ EgressPacketCount: newu64(30), EgressByteCount: newu64(300), - MaxConnCountTCP: newu64(3), }, }, pseudoHostID1: { diff --git a/report/edge_metadatas.go b/report/edge_metadatas.go index 00c5344a1..7d4ee3861 100644 --- a/report/edge_metadatas.go +++ b/report/edge_metadatas.go @@ -217,7 +217,6 @@ type EdgeMetadata struct { IngressPacketCount *uint64 `json:"ingress_packet_count,omitempty"` EgressByteCount *uint64 `json:"egress_byte_count,omitempty"` // Transport layer IngressByteCount *uint64 `json:"ingress_byte_count,omitempty"` // Transport layer - MaxConnCountTCP *uint64 `json:"max_conn_count_tcp,omitempty"` } // Copy returns a value copy of the EdgeMetadata. @@ -227,7 +226,6 @@ func (e EdgeMetadata) Copy() EdgeMetadata { IngressPacketCount: cpu64ptr(e.IngressPacketCount), EgressByteCount: cpu64ptr(e.EgressByteCount), IngressByteCount: cpu64ptr(e.IngressByteCount), - MaxConnCountTCP: cpu64ptr(e.MaxConnCountTCP), } } @@ -238,7 +236,6 @@ func (e EdgeMetadata) Reversed() EdgeMetadata { IngressPacketCount: cpu64ptr(e.EgressPacketCount), EgressByteCount: cpu64ptr(e.IngressByteCount), IngressByteCount: cpu64ptr(e.EgressByteCount), - MaxConnCountTCP: cpu64ptr(e.MaxConnCountTCP), } } @@ -259,7 +256,6 @@ func (e EdgeMetadata) Merge(other EdgeMetadata) EdgeMetadata { cp.IngressPacketCount = merge(cp.IngressPacketCount, other.IngressPacketCount, sum) cp.EgressByteCount = merge(cp.EgressByteCount, other.EgressByteCount, sum) cp.IngressByteCount = merge(cp.IngressByteCount, other.IngressByteCount, sum) - cp.MaxConnCountTCP = merge(cp.MaxConnCountTCP, other.MaxConnCountTCP, max) return cp } @@ -272,9 +268,6 @@ func (e EdgeMetadata) Flatten(other EdgeMetadata) EdgeMetadata { cp.IngressPacketCount = merge(cp.IngressPacketCount, other.IngressPacketCount, sum) cp.EgressByteCount = merge(cp.EgressByteCount, other.EgressByteCount, sum) cp.IngressByteCount = merge(cp.IngressByteCount, other.IngressByteCount, sum) - // Note that summing of two maximums doesn't always give us the true - // maximum. But it's a best effort. - cp.MaxConnCountTCP = merge(cp.MaxConnCountTCP, other.MaxConnCountTCP, sum) return cp } diff --git a/report/edge_metadatas_internal_test.go b/report/edge_metadatas_internal_test.go index c5f0411b4..c9ffefd88 100644 --- a/report/edge_metadatas_internal_test.go +++ b/report/edge_metadatas_internal_test.go @@ -71,13 +71,11 @@ func TestEdgeMetadatasMerge(t *testing.T) { Add("hostA|:192.168.1.1:12345|:192.168.1.2:80", EdgeMetadata{ EgressPacketCount: newu64(1), - MaxConnCountTCP: newu64(2), }), want: EmptyEdgeMetadatas. Add("hostA|:192.168.1.1:12345|:192.168.1.2:80", EdgeMetadata{ EgressPacketCount: newu64(1), - MaxConnCountTCP: newu64(2), }), }, "Empty b": { @@ -101,27 +99,23 @@ func TestEdgeMetadatasMerge(t *testing.T) { EdgeMetadata{ EgressPacketCount: newu64(12), EgressByteCount: newu64(500), - MaxConnCountTCP: newu64(4), }), b: EmptyEdgeMetadatas. Add("hostQ|:192.168.1.1:12345|:192.168.1.2:80", EdgeMetadata{ EgressPacketCount: newu64(1), EgressByteCount: newu64(2), - MaxConnCountTCP: newu64(6), }), want: EmptyEdgeMetadatas. Add("hostA|:192.168.1.1:12345|:192.168.1.2:80", EdgeMetadata{ EgressPacketCount: newu64(12), EgressByteCount: newu64(500), - MaxConnCountTCP: newu64(4), }). Add("hostQ|:192.168.1.1:12345|:192.168.1.2:80", EdgeMetadata{ EgressPacketCount: newu64(1), EgressByteCount: newu64(2), - MaxConnCountTCP: newu64(6), }), }, "Overlapping a & b": { @@ -130,7 +124,6 @@ func TestEdgeMetadatasMerge(t *testing.T) { EdgeMetadata{ EgressPacketCount: newu64(12), EgressByteCount: newu64(1000), - MaxConnCountTCP: newu64(7), }), b: EmptyEdgeMetadatas. Add("hostA|:192.168.1.1:12345|:192.168.1.2:80", @@ -138,7 +131,6 @@ func TestEdgeMetadatasMerge(t *testing.T) { EgressPacketCount: newu64(1), IngressByteCount: newu64(123), EgressByteCount: newu64(2), - MaxConnCountTCP: newu64(9), }), want: EmptyEdgeMetadatas. Add("hostA|:192.168.1.1:12345|:192.168.1.2:80", @@ -146,7 +138,6 @@ func TestEdgeMetadatasMerge(t *testing.T) { EgressPacketCount: newu64(13), IngressByteCount: newu64(123), EgressByteCount: newu64(1002), - MaxConnCountTCP: newu64(9), }), }, } { @@ -161,16 +152,13 @@ func TestEdgeMetadataFlatten(t *testing.T) { { have := (EdgeMetadata{ EgressPacketCount: newu64(1), - MaxConnCountTCP: newu64(2), }).Flatten(EdgeMetadata{ EgressPacketCount: newu64(4), EgressByteCount: newu64(8), - MaxConnCountTCP: newu64(16), }) want := EdgeMetadata{ EgressPacketCount: newu64(1 + 4), EgressByteCount: newu64(8), - MaxConnCountTCP: newu64(2 + 16), // flatten should sum MaxConnCountTCP } if !reflect.DeepEqual(want, have) { t.Error(test.Diff(want, have)) @@ -183,15 +171,12 @@ func TestEdgeMetadataFlatten(t *testing.T) { have := EmptyEdgeMetadatas. Add("foo", EdgeMetadata{ EgressPacketCount: newu64(1), - MaxConnCountTCP: newu64(2), }). Add("bar", EdgeMetadata{ EgressPacketCount: newu64(3), - MaxConnCountTCP: newu64(5), }).Flatten() want := EdgeMetadata{ EgressPacketCount: newu64(1 + 3), - MaxConnCountTCP: newu64(2 + 5), } if !reflect.DeepEqual(want, have) { t.Error(test.Diff(want, have)) @@ -224,11 +209,9 @@ func TestEdgeMetadatasEncoding(t *testing.T) { want := EmptyEdgeMetadatas. Add("foo", EdgeMetadata{ EgressPacketCount: newu64(1), - MaxConnCountTCP: newu64(2), }). Add("bar", EdgeMetadata{ EgressPacketCount: newu64(3), - MaxConnCountTCP: newu64(5), }) { diff --git a/test/fixture/report_fixture.go b/test/fixture/report_fixture.go index 9ec2243ca..fde47d497 100644 --- a/test/fixture/report_fixture.go +++ b/test/fixture/report_fixture.go @@ -320,9 +320,7 @@ var ( ClientAddressNodeID: report.MakeNode().WithLatests(map[string]string{ endpoint.Addr: ClientIP, report.HostNodeID: ClientHostNodeID, - }).WithEdge(ServerAddressNodeID, report.EdgeMetadata{ - MaxConnCountTCP: newu64(3), - }), + }).WithEdge(ServerAddressNodeID, report.EdgeMetadata{}), ServerAddressNodeID: report.MakeNode().WithLatests(map[string]string{ endpoint.Addr: ServerIP,