Test the nat mapper.

This commit is contained in:
Tom Wilkie
2015-09-23 11:01:29 +00:00
parent cdf051ffe9
commit 7da599194d
5 changed files with 142 additions and 60 deletions

View File

@@ -65,8 +65,14 @@ type conntrack struct {
Flows []Flow `xml:"flow"`
}
// Conntracker is somethin that tracks connections.
type Conntracker interface {
WalkFlows(f func(Flow))
Stop()
}
// Conntracker uses the conntrack command to track network connections
type Conntracker struct {
type conntracker struct {
sync.Mutex
cmd exec.Cmd
activeFlows map[int64]Flow // active flows in state != TIME_WAIT
@@ -75,11 +81,11 @@ type Conntracker struct {
}
// NewConntracker creates and starts a new Conntracter
func NewConntracker(existingConns bool, args ...string) (*Conntracker, error) {
var NewConntracker = func(existingConns bool, args ...string) (Conntracker, error) {
if !ConntrackModulePresent() {
return nil, fmt.Errorf("No conntrack module")
}
result := &Conntracker{
result := &conntracker{
activeFlows: map[int64]Flow{},
existingConns: existingConns,
}
@@ -112,7 +118,7 @@ var ConntrackModulePresent = func() bool {
}
// NB this is not re-entrant!
func (c *Conntracker) run(args ...string) {
func (c *conntracker) run(args ...string) {
if c.existingConns {
// Fork another conntrack, just to capture existing connections
// for which we don't get events
@@ -178,7 +184,7 @@ func (c *Conntracker) run(args ...string) {
}
}
func (c *Conntracker) existingConnections(args ...string) ([]Flow, error) {
func (c *conntracker) existingConnections(args ...string) ([]Flow, error) {
args = append([]string{"-L", "-o", "xml", "-p", "tcp"}, args...)
cmd := exec.Command("conntrack", args...)
stdout, err := cmd.StdoutPipe()
@@ -203,7 +209,7 @@ func (c *Conntracker) existingConnections(args ...string) ([]Flow, error) {
}
// Stop stop stop
func (c *Conntracker) Stop() {
func (c *conntracker) Stop() {
c.Lock()
defer c.Unlock()
if c.cmd == nil {
@@ -215,7 +221,7 @@ func (c *Conntracker) Stop() {
}
}
func (c *Conntracker) handleFlow(f Flow, forceAdd bool) {
func (c *conntracker) handleFlow(f Flow, forceAdd bool) {
// A flow consists of 3 'metas' - the 'original' 4 tuple (as seen by this
// host) and the 'reply' 4 tuple, which is what it has been rewritten to.
// This code finds those metas, which are identified by a Direction
@@ -260,7 +266,7 @@ func (c *Conntracker) handleFlow(f Flow, forceAdd bool) {
// WalkFlows calls f with all active flows and flows that have come and gone
// since the last call to WalkFlows
func (c *Conntracker) WalkFlows(f func(Flow)) {
func (c *conntracker) WalkFlows(f func(Flow)) {
c.Lock()
defer c.Unlock()
for _, flow := range c.activeFlows {

View File

@@ -13,54 +13,62 @@ import (
testExec "github.com/weaveworks/scope/test/exec"
)
func makeFlow(id int64, srcIP, dstIP string, srcPort, dstPort int, ty, state string) Flow {
func makeFlow(ty string) Flow {
return Flow{
XMLName: xml.Name{
Local: "flow",
},
Type: ty,
Metas: []Meta{
{
XMLName: xml.Name{
Local: "meta",
},
Direction: "original",
Layer3: Layer3{
XMLName: xml.Name{
Local: "layer3",
},
SrcIP: srcIP,
DstIP: dstIP,
},
Layer4: Layer4{
XMLName: xml.Name{
Local: "layer4",
},
SrcPort: srcPort,
DstPort: dstPort,
Proto: TCP,
},
}
}
func addMeta(f *Flow, dir, srcIP, dstIP string, srcPort, dstPort int) *Meta {
meta := Meta{
XMLName: xml.Name{
Local: "meta",
},
Direction: dir,
Layer3: Layer3{
XMLName: xml.Name{
Local: "layer3",
},
{
XMLName: xml.Name{
Local: "meta",
},
Direction: "independent",
ID: id,
State: state,
Layer3: Layer3{
XMLName: xml.Name{
Local: "layer3",
},
},
Layer4: Layer4{
XMLName: xml.Name{
Local: "layer4",
},
},
SrcIP: srcIP,
DstIP: dstIP,
},
Layer4: Layer4{
XMLName: xml.Name{
Local: "layer4",
},
SrcPort: srcPort,
DstPort: dstPort,
Proto: TCP,
},
}
f.Metas = append(f.Metas, meta)
return &meta
}
func addIndependant(f *Flow, id int64, state string) *Meta {
meta := Meta{
XMLName: xml.Name{
Local: "meta",
},
Direction: "independent",
ID: id,
State: state,
Layer3: Layer3{
XMLName: xml.Name{
Local: "layer3",
},
},
Layer4: Layer4{
XMLName: xml.Name{
Local: "layer4",
},
},
}
f.Metas = append(f.Metas, meta)
return &meta
}
func TestConntracker(t *testing.T) {
@@ -121,7 +129,9 @@ func TestConntracker(t *testing.T) {
}
}
flow1 := makeFlow(1, "1.2.3.4", "2.3.4.5", 2, 3, New, "")
flow1 := makeFlow(New)
addMeta(&flow1, "original", "1.2.3.4", "2.3.4.5", 2, 3)
addIndependant(&flow1, 1, "")
writeFlow(flow1)
test.Poll(t, ts, []Flow{flow1}, have)

View File

@@ -16,16 +16,18 @@ type endpointMapping struct {
rewrittenPort int
}
type natmapper struct {
*Conntracker
// NATMapper rewrites a report to deal with NAT's connections
type NATMapper struct {
Conntracker
}
func newNATMapper() (*natmapper, error) {
// NewNATMapper is exposed for testing
func NewNATMapper() (*NATMapper, error) {
ct, err := NewConntracker(true, "--any-nat")
if err != nil {
return nil, err
}
return &natmapper{ct}, nil
return &NATMapper{ct}, nil
}
func toMapping(f Flow) *endpointMapping {
@@ -49,9 +51,9 @@ func toMapping(f Flow) *endpointMapping {
return &mapping
}
// applyNAT duplicates Nodes in the endpoint topology of a
// ApplyNAT duplicates Nodes in the endpoint topology of a
// report, based on the NAT table as returns by natTable.
func (n *natmapper) applyNAT(rpt report.Report, scope string) {
func (n *NATMapper) ApplyNAT(rpt report.Report, scope string) {
n.WalkFlows(func(f Flow) {
var (
mapping = toMapping(f)

View File

@@ -0,0 +1,64 @@
package endpoint_test
import (
"reflect"
"testing"
"github.com/weaveworks/scope/probe/endpoint"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/test"
)
type mockConntracker struct {
flows []endpoint.Flow
}
func (m *mockConntracker) WalkFlows(f func(endpoint.Flow)) {
for _, flow := range m.flows {
f(flow)
}
}
func (m *mockConntracker) Stop() {}
func TestNat(t *testing.T) {
oldNewConntracker := endpoint.NewConntracker
defer func() { endpoint.NewConntracker = oldNewConntracker }()
endpoint.NewConntracker = func(existingConns bool, args ...string) (endpoint.Conntracker, error) {
flow := makeFlow("")
addIndependant(&flow, 1, "")
flow.Original = addMeta(&flow, "original", "10.0.47.1", "2.3.4.5", 80, 22222)
flow.Reply = addMeta(&flow, "reply", "2.3.4.5", "1.2.3.4", 22222, 80)
return &mockConntracker{
flows: []endpoint.Flow{flow},
}, nil
}
have := report.MakeReport()
originalID := report.MakeEndpointNodeID("host1", "10.0.47.1", "80")
have.Endpoint.AddNode(originalID, report.MakeNodeWith(report.Metadata{
endpoint.Addr: "10.0.47.1",
endpoint.Port: "80",
"foo": "bar",
}))
want := have.Copy()
want.Endpoint.AddNode(report.MakeEndpointNodeID("host1", "1.2.3.4", "80"), report.MakeNodeWith(report.Metadata{
endpoint.Addr: "1.2.3.4",
endpoint.Port: "80",
"copy_of": originalID,
"foo": "bar",
}))
natmapper, err := endpoint.NewNATMapper()
if err != nil {
t.Fatal(err)
}
natmapper.ApplyNAT(have, "host1")
if !reflect.DeepEqual(want, have) {
t.Fatal(test.Diff(want, have))
}
}

View File

@@ -26,8 +26,8 @@ type Reporter struct {
hostName string
includeProcesses bool
includeNAT bool
conntracker *Conntracker
natmapper *natmapper
conntracker Conntracker
natmapper *NATMapper
revResolver *ReverseResolver
}
@@ -51,8 +51,8 @@ var SpyDuration = prometheus.NewSummaryVec(
func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bool) *Reporter {
var (
conntrackModulePresent = ConntrackModulePresent()
conntracker *Conntracker
natmapper *natmapper
conntracker Conntracker
natmapper *NATMapper
err error
)
if conntrackModulePresent && useConntrack {
@@ -62,7 +62,7 @@ func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bo
}
}
if conntrackModulePresent {
natmapper, err = newNATMapper()
natmapper, err = NewNATMapper()
if err != nil {
log.Printf("Failed to start natMapper: %v", err)
}
@@ -139,7 +139,7 @@ func (r *Reporter) Report() (report.Report, error) {
}
if r.natmapper != nil {
r.natmapper.applyNAT(rpt, r.hostID)
r.natmapper.ApplyNAT(rpt, r.hostID)
}
return rpt, nil