adding direction to connections from conntrack

* Remove report.EdgeMetadata.MaxConnCountTCP, as we don't display it anywhere
* Remove hostname metadata from local end of connection. We should be using the hostnodeid
This commit is contained in:
Paul Bellamy
2016-02-16 10:24:16 +00:00
committed by Tom Wilkie
parent f24ebc96b4
commit 6cef1b10ca
9 changed files with 106 additions and 143 deletions

View File

@@ -1,7 +1,10 @@
package endpoint
import (
"fmt"
"sort"
"strconv"
"strings"
"time"
"github.com/prometheus/client_golang/prometheus"
@@ -71,6 +74,27 @@ func (r *Reporter) Stop() {
r.scanner.Stop()
}
type fourTuple struct {
fromAddr, toAddr string
fromPort, toPort uint16
}
// key is a sortable direction-independent key for tuples, used to look up a
// fourTuple, when you are unsure of it's direction.
func (t fourTuple) key() string {
key := []string{
fmt.Sprintf("%s:%d", t.fromAddr, t.fromPort),
fmt.Sprintf("%s:%d", t.toAddr, t.toPort),
}
sort.Strings(key)
return strings.Join(key, " ")
}
// reverse flips the direction of the tuple
func (t *fourTuple) reverse() {
t.fromAddr, t.fromPort, t.toAddr, t.toPort = t.toAddr, t.toPort, t.fromAddr, t.fromPort
}
// Report implements Reporter.
func (r *Reporter) Report() (report.Report, error) {
defer func(begin time.Time) {
@@ -79,32 +103,7 @@ func (r *Reporter) Report() (report.Report, error) {
hostNodeID := report.MakeHostNodeID(r.hostID)
rpt := report.MakeReport()
{
conns, err := r.scanner.Connections(r.includeProcesses)
if err != nil {
return rpt, err
}
commonNodeInfo := report.MakeNode().WithLatests(map[string]string{
Procspied: "true",
})
for conn := conns.Next(); conn != nil; conn = conns.Next() {
var (
localPort = conn.LocalPort
remotePort = conn.RemotePort
localAddr = conn.LocalAddress.String()
remoteAddr = conn.RemoteAddress.String()
)
extraNodeInfo := commonNodeInfo.Copy()
if conn.Proc.PID > 0 {
extraNodeInfo = extraNodeInfo.WithLatests(map[string]string{
process.PID: strconv.FormatUint(uint64(conn.Proc.PID), 10),
report.HostNodeID: hostNodeID,
})
}
r.addConnection(&rpt, localAddr, remoteAddr, localPort, remotePort, &extraNodeInfo, &commonNodeInfo)
}
}
seenTuples := map[string]fourTuple{}
// Consult the flowWalker for short-live connections
{
@@ -112,107 +111,115 @@ func (r *Reporter) Report() (report.Report, error) {
Conntracked: "true",
})
r.flowWalker.walkFlows(func(f flow) {
var (
localPort = uint16(f.Original.Layer4.SrcPort)
remotePort = uint16(f.Original.Layer4.DstPort)
localAddr = f.Original.Layer3.SrcIP
remoteAddr = f.Original.Layer3.DstIP
)
r.addConnection(&rpt, localAddr, remoteAddr, localPort, remotePort, &extraNodeInfo, &extraNodeInfo)
tuple := fourTuple{
f.Original.Layer3.SrcIP,
f.Original.Layer3.DstIP,
uint16(f.Original.Layer4.SrcPort),
uint16(f.Original.Layer4.DstPort),
}
seenTuples[tuple.key()] = tuple
r.addConnection(&rpt, tuple, &extraNodeInfo, &extraNodeInfo)
})
}
{
conns, err := r.scanner.Connections(r.includeProcesses)
if err != nil {
return rpt, err
}
extraNodeInfo := report.MakeNode().WithLatests(map[string]string{
Procspied: "true",
})
for conn := conns.Next(); conn != nil; conn = conns.Next() {
var (
tuple = fourTuple{
conn.LocalAddress.String(),
conn.RemoteAddress.String(),
conn.LocalPort,
conn.RemotePort,
}
toNodeInfo, fromNodeInfo = extraNodeInfo.Copy(), extraNodeInfo.Copy()
)
if conn.Proc.PID > 0 {
fromNodeInfo = fromNodeInfo.WithLatests(map[string]string{
process.PID: strconv.FormatUint(uint64(conn.Proc.PID), 10),
report.HostNodeID: hostNodeID,
})
}
// If we've already seen this connection, we should know the direction
// (or have already figured it out), so we normalize and use the
// canonical direction. Otherwise, we can use a port-heuristic to guess
// the direction.
canonical, ok := seenTuples[tuple.key()]
if (ok && canonical != tuple) || (!ok && tuple.fromPort < tuple.toPort) {
tuple.reverse()
toNodeInfo, fromNodeInfo = fromNodeInfo, toNodeInfo
}
r.addConnection(&rpt, tuple, &fromNodeInfo, &toNodeInfo)
}
}
r.natMapper.applyNAT(rpt, r.hostID)
return rpt, nil
}
func (r *Reporter) addConnection(rpt *report.Report, localAddr, remoteAddr string, localPort, remotePort uint16, extraLocalNode, extraRemoteNode *report.Node) {
localIsClient := int(localPort) > int(remotePort)
func (r *Reporter) addConnection(rpt *report.Report, t fourTuple, extraFromNode, extraToNode *report.Node) {
// Update address topology
{
var (
localAddressNodeID = report.MakeAddressNodeID(r.hostID, localAddr)
remoteAddressNodeID = report.MakeAddressNodeID(r.hostID, remoteAddr)
localNode = report.MakeNodeWith(map[string]string{
"name": r.hostName,
Addr: localAddr,
})
remoteNode = report.MakeNodeWith(map[string]string{
Addr: remoteAddr,
})
fromAddressNodeID = report.MakeAddressNodeID(r.hostID, t.fromAddr)
toAddressNodeID = report.MakeAddressNodeID(r.hostID, t.toAddr)
fromNode = report.MakeNodeWith(map[string]string{Addr: t.fromAddr}).WithEdge(toAddressNodeID, report.EdgeMetadata{})
toNode = report.MakeNodeWith(map[string]string{Addr: t.toAddr})
)
// In case we have a reverse resolution for the IP, we can use it for
// the name...
if remoteNames, err := r.reverseResolver.get(remoteAddr); err == nil {
remoteNode = remoteNode.WithSet("name", report.MakeStringSet(remoteNames...))
if toNames, err := r.reverseResolver.get(t.toAddr); err == nil {
toNode = toNode.WithSet("name", report.MakeStringSet(toNames...))
}
if localIsClient {
// New nodes are merged into the report so we don't need to do any
// counting here; the merge does it for us.
localNode = localNode.WithEdge(remoteAddressNodeID, report.EdgeMetadata{
MaxConnCountTCP: newu64(1),
})
} else {
remoteNode = localNode.WithEdge(localAddressNodeID, report.EdgeMetadata{
MaxConnCountTCP: newu64(1),
})
if extraFromNode != nil {
fromNode = fromNode.Merge(*extraFromNode)
}
if extraLocalNode != nil {
localNode = localNode.Merge(*extraLocalNode)
if extraToNode != nil {
toNode = toNode.Merge(*extraToNode)
}
if extraRemoteNode != nil {
remoteNode = remoteNode.Merge(*extraRemoteNode)
}
rpt.Address = rpt.Address.AddNode(localAddressNodeID, localNode)
rpt.Address = rpt.Address.AddNode(remoteAddressNodeID, remoteNode)
rpt.Address = rpt.Address.AddNode(fromAddressNodeID, fromNode)
rpt.Address = rpt.Address.AddNode(toAddressNodeID, toNode)
}
// Update endpoint topology
if r.includeProcesses {
var (
localEndpointNodeID = report.MakeEndpointNodeID(r.hostID, localAddr, strconv.Itoa(int(localPort)))
remoteEndpointNodeID = report.MakeEndpointNodeID(r.hostID, remoteAddr, strconv.Itoa(int(remotePort)))
fromEndpointNodeID = report.MakeEndpointNodeID(r.hostID, t.fromAddr, strconv.Itoa(int(t.fromPort)))
toEndpointNodeID = report.MakeEndpointNodeID(r.hostID, t.toAddr, strconv.Itoa(int(t.toPort)))
localNode = report.MakeNodeWith(map[string]string{
Addr: localAddr,
Port: strconv.Itoa(int(localPort)),
})
remoteNode = report.MakeNodeWith(map[string]string{
Addr: remoteAddr,
Port: strconv.Itoa(int(remotePort)),
fromNode = report.MakeNodeWith(map[string]string{
Addr: t.fromAddr,
Port: strconv.Itoa(int(t.fromPort)),
}).WithEdge(toEndpointNodeID, report.EdgeMetadata{})
toNode = report.MakeNodeWith(map[string]string{
Addr: t.toAddr,
Port: strconv.Itoa(int(t.toPort)),
})
)
// In case we have a reverse resolution for the IP, we can use it for
// the name...
if remoteNames, err := r.reverseResolver.get(remoteAddr); err == nil {
remoteNode = remoteNode.WithSet("name", report.MakeStringSet(remoteNames...))
if toNames, err := r.reverseResolver.get(t.toAddr); err == nil {
toNode = toNode.WithSet("name", report.MakeStringSet(toNames...))
}
if localIsClient {
// New nodes are merged into the report so we don't need to do any
// counting here; the merge does it for us.
localNode = localNode.WithEdge(remoteEndpointNodeID, report.EdgeMetadata{
MaxConnCountTCP: newu64(1),
})
} else {
remoteNode = remoteNode.WithEdge(localEndpointNodeID, report.EdgeMetadata{
MaxConnCountTCP: newu64(1),
})
if extraFromNode != nil {
fromNode = fromNode.Merge(*extraFromNode)
}
if extraLocalNode != nil {
localNode = localNode.Merge(*extraLocalNode)
if extraToNode != nil {
toNode = toNode.Merge(*extraToNode)
}
if extraRemoteNode != nil {
remoteNode = remoteNode.Merge(*extraRemoteNode)
}
rpt.Endpoint = rpt.Endpoint.AddNode(localEndpointNodeID, localNode)
rpt.Endpoint = rpt.Endpoint.AddNode(remoteEndpointNodeID, remoteNode)
rpt.Endpoint = rpt.Endpoint.AddNode(fromEndpointNodeID, fromNode)
rpt.Endpoint = rpt.Endpoint.AddNode(toEndpointNodeID, toNode)
}
}

View File

@@ -5,7 +5,6 @@ import (
"strconv"
"testing"
"github.com/weaveworks/scope/probe/docker"
"github.com/weaveworks/scope/probe/endpoint"
"github.com/weaveworks/scope/probe/endpoint/procspy"
"github.com/weaveworks/scope/report"
@@ -85,11 +84,6 @@ func TestSpyNoProcesses(t *testing.T) {
scopedRemote = report.MakeAddressNodeID(nodeID, fixRemoteAddress.String())
)
have, _ := r.Address.Nodes[scopedLocal].Latest.Lookup(docker.Name)
if want, have := nodeName, have; want != have {
t.Fatalf("want %q, have %q", want, have)
}
if want, have := 1, len(r.Address.Nodes[scopedRemote].Adjacency); want != have {
t.Fatalf("want %d, have %d", want, have)
}