Merge pull request #515 from weaveworks/testing

Remove some duplicate functionality, add some basic tests.
This commit is contained in:
Tom Wilkie
2015-09-24 12:24:10 +08:00
11 changed files with 261 additions and 103 deletions

View File

@@ -65,8 +65,14 @@ type conntrack struct {
Flows []Flow `xml:"flow"`
}
// Conntracker is something that tracks connections.
type Conntracker interface {
WalkFlows(f func(Flow))
Stop()
}
// Conntracker uses the conntrack command to track network connections
type Conntracker struct {
type conntracker struct {
sync.Mutex
cmd exec.Cmd
activeFlows map[int64]Flow // active flows in state != TIME_WAIT
@@ -75,11 +81,11 @@ type Conntracker struct {
}
// NewConntracker creates and starts a new Conntracter
func NewConntracker(existingConns bool, args ...string) (*Conntracker, error) {
func NewConntracker(existingConns bool, args ...string) (Conntracker, error) {
if !ConntrackModulePresent() {
return nil, fmt.Errorf("No conntrack module")
}
result := &Conntracker{
result := &conntracker{
activeFlows: map[int64]Flow{},
existingConns: existingConns,
}
@@ -112,7 +118,7 @@ var ConntrackModulePresent = func() bool {
}
// NB this is not re-entrant!
func (c *Conntracker) run(args ...string) {
func (c *conntracker) run(args ...string) {
if c.existingConns {
// Fork another conntrack, just to capture existing connections
// for which we don't get events
@@ -178,7 +184,7 @@ func (c *Conntracker) run(args ...string) {
}
}
func (c *Conntracker) existingConnections(args ...string) ([]Flow, error) {
func (c *conntracker) existingConnections(args ...string) ([]Flow, error) {
args = append([]string{"-L", "-o", "xml", "-p", "tcp"}, args...)
cmd := exec.Command("conntrack", args...)
stdout, err := cmd.StdoutPipe()
@@ -203,7 +209,7 @@ func (c *Conntracker) existingConnections(args ...string) ([]Flow, error) {
}
// Stop stop stop
func (c *Conntracker) Stop() {
func (c *conntracker) Stop() {
c.Lock()
defer c.Unlock()
if c.cmd == nil {
@@ -215,7 +221,7 @@ func (c *Conntracker) Stop() {
}
}
func (c *Conntracker) handleFlow(f Flow, forceAdd bool) {
func (c *conntracker) handleFlow(f Flow, forceAdd bool) {
// A flow consists of 3 'metas' - the 'original' 4 tuple (as seen by this
// host) and the 'reply' 4 tuple, which is what it has been rewritten to.
// This code finds those metas, which are identified by a Direction
@@ -260,7 +266,7 @@ func (c *Conntracker) handleFlow(f Flow, forceAdd bool) {
// WalkFlows calls f with all active flows and flows that have come and gone
// since the last call to WalkFlows
func (c *Conntracker) WalkFlows(f func(Flow)) {
func (c *conntracker) WalkFlows(f func(Flow)) {
c.Lock()
defer c.Unlock()
for _, flow := range c.activeFlows {

View File

@@ -13,54 +13,62 @@ import (
testExec "github.com/weaveworks/scope/test/exec"
)
func makeFlow(id int64, srcIP, dstIP string, srcPort, dstPort int, ty, state string) Flow {
func makeFlow(ty string) Flow {
return Flow{
XMLName: xml.Name{
Local: "flow",
},
Type: ty,
Metas: []Meta{
{
XMLName: xml.Name{
Local: "meta",
},
Direction: "original",
Layer3: Layer3{
XMLName: xml.Name{
Local: "layer3",
},
SrcIP: srcIP,
DstIP: dstIP,
},
Layer4: Layer4{
XMLName: xml.Name{
Local: "layer4",
},
SrcPort: srcPort,
DstPort: dstPort,
Proto: TCP,
},
}
}
func addMeta(f *Flow, dir, srcIP, dstIP string, srcPort, dstPort int) *Meta {
meta := Meta{
XMLName: xml.Name{
Local: "meta",
},
Direction: dir,
Layer3: Layer3{
XMLName: xml.Name{
Local: "layer3",
},
{
XMLName: xml.Name{
Local: "meta",
},
Direction: "independent",
ID: id,
State: state,
Layer3: Layer3{
XMLName: xml.Name{
Local: "layer3",
},
},
Layer4: Layer4{
XMLName: xml.Name{
Local: "layer4",
},
},
SrcIP: srcIP,
DstIP: dstIP,
},
Layer4: Layer4{
XMLName: xml.Name{
Local: "layer4",
},
SrcPort: srcPort,
DstPort: dstPort,
Proto: TCP,
},
}
f.Metas = append(f.Metas, meta)
return &meta
}
func addIndependant(f *Flow, id int64, state string) *Meta {
meta := Meta{
XMLName: xml.Name{
Local: "meta",
},
Direction: "independent",
ID: id,
State: state,
Layer3: Layer3{
XMLName: xml.Name{
Local: "layer3",
},
},
Layer4: Layer4{
XMLName: xml.Name{
Local: "layer4",
},
},
}
f.Metas = append(f.Metas, meta)
return &meta
}
func TestConntracker(t *testing.T) {
@@ -121,7 +129,9 @@ func TestConntracker(t *testing.T) {
}
}
flow1 := makeFlow(1, "1.2.3.4", "2.3.4.5", 2, 3, New, "")
flow1 := makeFlow(New)
addMeta(&flow1, "original", "1.2.3.4", "2.3.4.5", 2, 3)
addIndependant(&flow1, 1, "")
writeFlow(flow1)
test.Poll(t, ts, []Flow{flow1}, have)

View File

@@ -16,16 +16,14 @@ type endpointMapping struct {
rewrittenPort int
}
type natmapper struct {
*Conntracker
// NATMapper rewrites a report to deal with NAT's connections
type NATMapper struct {
Conntracker
}
func newNATMapper() (*natmapper, error) {
ct, err := NewConntracker(true, "--any-nat")
if err != nil {
return nil, err
}
return &natmapper{ct}, nil
// NewNATMapper is exposed for testing
func NewNATMapper(ct Conntracker) NATMapper {
return NATMapper{ct}
}
func toMapping(f Flow) *endpointMapping {
@@ -49,9 +47,9 @@ func toMapping(f Flow) *endpointMapping {
return &mapping
}
// applyNAT duplicates Nodes in the endpoint topology of a
// ApplyNAT duplicates Nodes in the endpoint topology of a
// report, based on the NAT table as returns by natTable.
func (n *natmapper) applyNAT(rpt report.Report, scope string) {
func (n NATMapper) ApplyNAT(rpt report.Report, scope string) {
n.WalkFlows(func(f Flow) {
var (
mapping = toMapping(f)

View File

@@ -0,0 +1,97 @@
package endpoint_test
import (
"reflect"
"testing"
"github.com/weaveworks/scope/probe/endpoint"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/test"
)
type mockConntracker struct {
flows []endpoint.Flow
}
func (m *mockConntracker) WalkFlows(f func(endpoint.Flow)) {
for _, flow := range m.flows {
f(flow)
}
}
func (m *mockConntracker) Stop() {}
func TestNat(t *testing.T) {
// test that two containers, on the docker network, get their connections mapped
// correctly.
// the setup is this:
//
// container2 (10.0.47.2:222222), host2 (2.3.4.5:22223) ->
// host1 (1.2.3.4:80), container1 (10.0.47.2:80)
// from the PoV of host1
{
flow := makeFlow("")
addIndependant(&flow, 1, "")
flow.Original = addMeta(&flow, "original", "2.3.4.5", "1.2.3.4", 222222, 80)
flow.Reply = addMeta(&flow, "reply", "10.0.47.1", "2.3.4.5", 80, 222222)
ct := &mockConntracker{
flows: []endpoint.Flow{flow},
}
have := report.MakeReport()
originalID := report.MakeEndpointNodeID("host1", "10.0.47.1", "80")
have.Endpoint.AddNode(originalID, report.MakeNodeWith(report.Metadata{
endpoint.Addr: "10.0.47.1",
endpoint.Port: "80",
"foo": "bar",
}))
want := have.Copy()
want.Endpoint.AddNode(report.MakeEndpointNodeID("host1", "1.2.3.4", "80"), report.MakeNodeWith(report.Metadata{
endpoint.Addr: "1.2.3.4",
endpoint.Port: "80",
"copy_of": originalID,
"foo": "bar",
}))
natmapper := endpoint.NewNATMapper(ct)
natmapper.ApplyNAT(have, "host1")
if !reflect.DeepEqual(want, have) {
t.Fatal(test.Diff(want, have))
}
}
// form the PoV of host2
{
flow := makeFlow("")
addIndependant(&flow, 2, "")
flow.Original = addMeta(&flow, "original", "10.0.47.2", "1.2.3.4", 22222, 80)
flow.Reply = addMeta(&flow, "reply", "1.2.3.4", "2.3.4.5", 80, 22223)
ct := &mockConntracker{
flows: []endpoint.Flow{flow},
}
have := report.MakeReport()
originalID := report.MakeEndpointNodeID("host2", "10.0.47.2", "22222")
have.Endpoint.AddNode(originalID, report.MakeNodeWith(report.Metadata{
endpoint.Addr: "10.0.47.2",
endpoint.Port: "22222",
"foo": "baz",
}))
want := have.Copy()
want.Endpoint.AddNode(report.MakeEndpointNodeID("host2", "2.3.4.5", "22223"), report.MakeNodeWith(report.Metadata{
endpoint.Addr: "2.3.4.5",
endpoint.Port: "22223",
"copy_of": originalID,
"foo": "baz",
}))
natmapper := endpoint.NewNATMapper(ct)
natmapper.ApplyNAT(have, "host1")
if !reflect.DeepEqual(want, have) {
t.Fatal(test.Diff(want, have))
}
}
}

View File

@@ -26,8 +26,8 @@ type Reporter struct {
hostName string
includeProcesses bool
includeNAT bool
conntracker *Conntracker
natmapper *natmapper
conntracker Conntracker
natmapper *NATMapper
revResolver *ReverseResolver
}
@@ -51,8 +51,8 @@ var SpyDuration = prometheus.NewSummaryVec(
func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bool) *Reporter {
var (
conntrackModulePresent = ConntrackModulePresent()
conntracker *Conntracker
natmapper *natmapper
conntracker Conntracker
natmapper NATMapper
err error
)
if conntrackModulePresent && useConntrack {
@@ -62,17 +62,18 @@ func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bo
}
}
if conntrackModulePresent {
natmapper, err = newNATMapper()
ct, err := NewConntracker(true, "--any-nat")
if err != nil {
log.Printf("Failed to start natMapper: %v", err)
log.Printf("Failed to start conntracker for natmapper: %v", err)
}
natmapper = NewNATMapper(ct)
}
return &Reporter{
hostID: hostID,
hostName: hostName,
includeProcesses: includeProcesses,
conntracker: conntracker,
natmapper: natmapper,
natmapper: &natmapper,
revResolver: NewReverseResolver(),
}
}
@@ -139,7 +140,7 @@ func (r *Reporter) Report() (report.Report, error) {
}
if r.natmapper != nil {
r.natmapper.applyNAT(rpt, r.hostID)
r.natmapper.ApplyNAT(rpt, r.hostID)
}
return rpt, nil
@@ -165,7 +166,7 @@ func (r *Reporter) addConnection(rpt *report.Report, localAddr, remoteAddr strin
// In case we have a reverse resolution for the IP, we can use it for
// the name...
if revRemoteName, err := r.revResolver.Get(remoteAddr); err == nil {
remoteNode = remoteNode.AddMetadata(map[string]string{
remoteNode = remoteNode.WithMetadata(map[string]string{
"name": revRemoteName,
})
}
@@ -211,7 +212,7 @@ func (r *Reporter) addConnection(rpt *report.Report, localAddr, remoteAddr strin
// In case we have a reverse resolution for the IP, we can use it for
// the name...
if revRemoteName, err := r.revResolver.Get(remoteAddr); err == nil {
remoteNode = remoteNode.AddMetadata(map[string]string{
remoteNode = remoteNode.WithMetadata(map[string]string{
"name": revRemoteName,
})
}

View File

@@ -13,12 +13,12 @@ var (
uncontainedServerID = render.MakePseudoNodeID(render.UncontainedID, test.ServerHostName)
unknownPseudoNode1ID = render.MakePseudoNodeID("10.10.10.10", test.ServerIP, "80")
unknownPseudoNode2ID = render.MakePseudoNodeID("10.10.10.11", test.ServerIP, "80")
unknownPseudoNode1 = func(adjacency report.IDList) render.RenderableNode {
unknownPseudoNode1 = func(adjacent string) render.RenderableNode {
return render.RenderableNode{
ID: unknownPseudoNode1ID,
LabelMajor: "10.10.10.10",
Pseudo: true,
Node: report.MakeNode().WithAdjacency(adjacency),
Node: report.MakeNode().WithAdjacent(adjacent),
EdgeMetadata: report.EdgeMetadata{
EgressPacketCount: newu64(70),
EgressByteCount: newu64(700),
@@ -29,12 +29,12 @@ var (
),
}
}
unknownPseudoNode2 = func(adjacency report.IDList) render.RenderableNode {
unknownPseudoNode2 = func(adjacent string) render.RenderableNode {
return render.RenderableNode{
ID: unknownPseudoNode2ID,
LabelMajor: "10.10.10.11",
Pseudo: true,
Node: report.MakeNode().WithAdjacency(adjacency),
Node: report.MakeNode().WithAdjacent(adjacent),
EdgeMetadata: report.EdgeMetadata{
EgressPacketCount: newu64(50),
EgressByteCount: newu64(500),
@@ -44,12 +44,12 @@ var (
),
}
}
theInternetNode = func(adjacency report.IDList) render.RenderableNode {
theInternetNode = func(adjacent string) render.RenderableNode {
return render.RenderableNode{
ID: render.TheInternetID,
LabelMajor: render.TheInternetMajor,
Pseudo: true,
Node: report.MakeNode().WithAdjacency(adjacency),
Node: report.MakeNode().WithAdjacent(adjacent),
EdgeMetadata: report.EdgeMetadata{
EgressPacketCount: newu64(60),
EgressByteCount: newu64(600),
@@ -131,9 +131,9 @@ var (
Node: report.MakeNode().WithAdjacent(render.TheInternetID),
EdgeMetadata: report.EdgeMetadata{},
},
unknownPseudoNode1ID: unknownPseudoNode1(report.MakeIDList(ServerProcessID)),
unknownPseudoNode2ID: unknownPseudoNode2(report.MakeIDList(ServerProcessID)),
render.TheInternetID: theInternetNode(report.MakeIDList(ServerProcessID)),
unknownPseudoNode1ID: unknownPseudoNode1(ServerProcessID),
unknownPseudoNode2ID: unknownPseudoNode2(ServerProcessID),
render.TheInternetID: theInternetNode(ServerProcessID),
}).Prune()
RenderedProcessNames = (render.RenderableNodes{
@@ -187,9 +187,9 @@ var (
Node: report.MakeNode().WithAdjacent(render.TheInternetID),
EdgeMetadata: report.EdgeMetadata{},
},
unknownPseudoNode1ID: unknownPseudoNode1(report.MakeIDList("apache")),
unknownPseudoNode2ID: unknownPseudoNode2(report.MakeIDList("apache")),
render.TheInternetID: theInternetNode(report.MakeIDList("apache")),
unknownPseudoNode1ID: unknownPseudoNode1("apache"),
unknownPseudoNode2ID: unknownPseudoNode2("apache"),
render.TheInternetID: theInternetNode("apache"),
}).Prune()
RenderedContainers = (render.RenderableNodes{
@@ -247,7 +247,7 @@ var (
Node: report.MakeNode().WithAdjacent(render.TheInternetID),
EdgeMetadata: report.EdgeMetadata{},
},
render.TheInternetID: theInternetNode(report.MakeIDList(test.ServerContainerID)),
render.TheInternetID: theInternetNode(test.ServerContainerID),
}).Prune()
RenderedContainerImages = (render.RenderableNodes{
@@ -304,7 +304,7 @@ var (
Node: report.MakeNode().WithAdjacent(render.TheInternetID),
EdgeMetadata: report.EdgeMetadata{},
},
render.TheInternetID: theInternetNode(report.MakeIDList(test.ServerContainerImageName)),
render.TheInternetID: theInternetNode(test.ServerContainerImageName),
}).Prune()
ServerHostRenderedID = render.MakeHostID(test.ServerHostID)

View File

@@ -54,7 +54,7 @@ func TestMergeRenderableNode(t *testing.T) {
LabelMinor: "minor",
Rank: "rank",
Pseudo: false,
Node: report.MakeNode().WithAdjacency(report.MakeIDList("a1", "a2")),
Node: report.MakeNode().WithAdjacent("a1").WithAdjacent("a2"),
Origins: report.MakeIDList("o1", "o2"),
EdgeMetadata: report.EdgeMetadata{},
}

View File

@@ -216,6 +216,30 @@ func TestMergeNodes(t *testing.T) {
}),
},
},
"Counters": {
a: report.Nodes{
"1": report.MakeNode().WithCounters(map[string]int{
"a": 13,
"b": 57,
"c": 89,
}),
},
b: report.Nodes{
"1": report.MakeNode().WithCounters(map[string]int{
"a": 78,
"b": 3,
"d": 47,
}),
},
want: report.Nodes{
"1": report.MakeNode().WithCounters(map[string]int{
"a": 91,
"b": 60,
"c": 89,
"d": 47,
}),
},
},
} {
if have := c.a.Merge(c.b); !reflect.DeepEqual(c.want, have) {
t.Errorf("%s: want\n\t%#v, have\n\t%#v", name, c.want, have)

View File

@@ -25,3 +25,39 @@ func TestReportTopologies(t *testing.T) {
t.Errorf("want %d, have %d", want, have)
}
}
func TestNode(t *testing.T) {
{
node := report.MakeNode().WithMetadata(report.Metadata{
"foo": "bar",
})
if node.Metadata["foo"] != "bar" {
t.Errorf("want foo, have %s", node.Metadata["foo"])
}
}
{
node := report.MakeNode().WithCounters(report.Counters{
"foo": 1,
})
if node.Counters["foo"] != 1 {
t.Errorf("want foo, have %d", node.Counters["foo"])
}
}
{
node := report.MakeNode().WithAdjacent("foo")
if node.Adjacency[0] != "foo" {
t.Errorf("want foo, have %v", node.Adjacency)
}
}
{
node := report.MakeNode().WithEdge("foo", report.EdgeMetadata{
EgressPacketCount: newu64(13),
})
if node.Adjacency[0] != "foo" {
t.Errorf("want foo, have %v", node.Adjacency)
}
if *node.Edges["foo"].EgressPacketCount != 13 {
t.Errorf("want 13, have %v", node.Edges)
}
}
}

View File

@@ -106,24 +106,10 @@ func (n Node) WithMetadata(m map[string]string) Node {
return result
}
// AddMetadata returns a fresh copy of n, with Metadata set to the merge of n
// and the metadata provided.
func (n Node) AddMetadata(m map[string]string) Node {
additional := MakeNodeWith(m)
return n.Merge(additional)
}
// WithCounters returns a fresh copy of n, with Counters set to c.
// WithCounters returns a fresh copy of n, with Counters c merged in.
func (n Node) WithCounters(c map[string]int) Node {
result := n.Copy()
result.Counters = c
return result
}
// WithAdjacency returns a fresh copy of n, with Adjacency set to a.
func (n Node) WithAdjacency(a IDList) Node {
result := n.Copy()
result.Adjacency = a
result.Counters = result.Counters.Merge(c)
return result
}

View File

@@ -253,19 +253,19 @@ var (
UnknownAddress1NodeID: report.MakeNode().WithMetadata(map[string]string{
endpoint.Addr: UnknownClient1IP,
}).WithAdjacency(report.MakeIDList(ServerAddressNodeID)),
}).WithAdjacent(ServerAddressNodeID),
UnknownAddress2NodeID: report.MakeNode().WithMetadata(map[string]string{
endpoint.Addr: UnknownClient2IP,
}).WithAdjacency(report.MakeIDList(ServerAddressNodeID)),
}).WithAdjacent(ServerAddressNodeID),
UnknownAddress3NodeID: report.MakeNode().WithMetadata(map[string]string{
endpoint.Addr: UnknownClient3IP,
}).WithAdjacency(report.MakeIDList(ServerAddressNodeID)),
}).WithAdjacent(ServerAddressNodeID),
RandomAddressNodeID: report.MakeNode().WithMetadata(map[string]string{
endpoint.Addr: RandomClientIP,
}).WithAdjacency(report.MakeIDList(ServerAddressNodeID)),
}).WithAdjacent(ServerAddressNodeID),
},
},
Host: report.Topology{