Continued un-exporting of symbols; renames

- Unexport consts, types, vars, etc.
- Rename Conntracker (interface) to FlowWalker, to match its definition.
- Rename conntracker (type) to conntrackWalker, to match the interface.
- Move conntrack_test.go to conntrack_internal_test.go and package endpoint
This commit is contained in:
Peter Bourgon
2015-10-02 18:38:04 +02:00
committed by Tom Wilkie
parent 6ae5077515
commit cb40ad3a90
5 changed files with 116 additions and 183 deletions

View File

@@ -14,82 +14,79 @@ import (
"github.com/weaveworks/scope/common/exec"
)
// Constants exported for testing
const (
modules = "/proc/modules"
conntrackModule = "nf_conntrack"
XMLHeader = "<?xml version=\"1.0\" encoding=\"utf-8\"?>\n"
ConntrackOpenTag = "<conntrack>\n"
TimeWait = "TIME_WAIT"
TCP = "tcp"
New = "new"
Update = "update"
Destroy = "destroy"
xmlHeader = "<?xml version=\"1.0\" encoding=\"utf-8\"?>\n"
conntrackOpenTag = "<conntrack>\n"
timeWait = "TIME_WAIT"
tcpProto = "tcp"
newType = "new"
updateType = "update"
destroyType = "destroy"
)
// Layer3 - these structs are for the parsed conntrack output
type Layer3 struct {
type layer3 struct {
XMLName xml.Name `xml:"layer3"`
SrcIP string `xml:"src"`
DstIP string `xml:"dst"`
}
// Layer4 - these structs are for the parsed conntrack output
type Layer4 struct {
type layer4 struct {
XMLName xml.Name `xml:"layer4"`
SrcPort int `xml:"sport"`
DstPort int `xml:"dport"`
Proto string `xml:"protoname,attr"`
}
// Meta - these structs are for the parsed conntrack output
type Meta struct {
type meta struct {
XMLName xml.Name `xml:"meta"`
Direction string `xml:"direction,attr"`
Layer3 Layer3 `xml:"layer3"`
Layer4 Layer4 `xml:"layer4"`
Layer3 layer3 `xml:"layer3"`
Layer4 layer4 `xml:"layer4"`
ID int64 `xml:"id"`
State string `xml:"state"`
}
// Flow - these structs are for the parsed conntrack output
type Flow struct {
type flow struct {
XMLName xml.Name `xml:"flow"`
Metas []Meta `xml:"meta"`
Metas []meta `xml:"meta"`
Type string `xml:"type,attr"`
Original, Reply, Independent *Meta `xml:"-"`
Original, Reply, Independent *meta `xml:"-"`
}
type conntrack struct {
XMLName xml.Name `xml:"conntrack"`
Flows []Flow `xml:"flow"`
Flows []flow `xml:"flow"`
}
// Conntracker is something that tracks connections.
type Conntracker interface {
WalkFlows(f func(Flow))
Stop()
// flowWalker is something that maintains flows, and provides an accessor
// method to walk them.
type flowWalker interface {
walkFlows(f func(flow))
stop()
}
// Conntracker uses the conntrack command to track network connections
type conntracker struct {
// conntrackWalker uses the conntrack command to track network connections and
// implement flowWalker.
type conntrackWalker struct {
sync.Mutex
cmd exec.Cmd
activeFlows map[int64]Flow // active flows in state != TIME_WAIT
bufferedFlows []Flow // flows coming out of activeFlows spend 1 walk cycle here
activeFlows map[int64]flow // active flows in state != TIME_WAIT
bufferedFlows []flow // flows coming out of activeFlows spend 1 walk cycle here
existingConns bool
args []string
quit chan struct{}
}
// NewConntracker creates and starts a new Conntracter
func NewConntracker(existingConns bool, args ...string) (Conntracker, error) {
// newConntracker creates and starts a new conntracker.
func newConntrackFlowWalker(existingConns bool, args ...string) (flowWalker, error) {
if !ConntrackModulePresent() {
return nil, fmt.Errorf("No conntrack module")
}
result := &conntracker{
activeFlows: map[int64]Flow{},
result := &conntrackWalker{
activeFlows: map[int64]flow{},
existingConns: existingConns,
args: args,
}
@@ -121,7 +118,7 @@ var ConntrackModulePresent = func() bool {
return false
}
func (c *conntracker) loop() {
func (c *conntrackWalker) loop() {
// conntrack can sometimes fail with ENOBUFS, when there is a particularly
// high connection rate. In these cases just retry in a loop, so we can
// survive the spike. For sustained loads this degrades nicely, as we
@@ -139,7 +136,7 @@ func (c *conntracker) loop() {
}
}
func (c *conntracker) clearFlows() {
func (c *conntrackWalker) clearFlows() {
c.Lock()
defer c.Unlock()
@@ -147,7 +144,7 @@ func (c *conntracker) clearFlows() {
c.bufferedFlows = append(c.bufferedFlows, f)
}
c.activeFlows = map[int64]Flow{}
c.activeFlows = map[int64]flow{}
}
func logPipe(prefix string, reader io.Reader) {
@@ -160,7 +157,7 @@ func logPipe(prefix string, reader io.Reader) {
}
}
func (c *conntracker) run() {
func (c *conntrackWalker) run() {
if c.existingConns {
// Fork another conntrack, just to capture existing connections
// for which we don't get events
@@ -217,14 +214,14 @@ func (c *conntracker) run() {
if line, err := reader.ReadString('\n'); err != nil {
log.Printf("conntrack error: %v", err)
return
} else if line != XMLHeader {
} else if line != xmlHeader {
log.Printf("conntrack invalid output: '%s'", line)
return
}
if line, err := reader.ReadString('\n'); err != nil {
log.Printf("conntrack error: %v", err)
return
} else if line != ConntrackOpenTag {
} else if line != conntrackOpenTag {
log.Printf("conntrack invalid output: '%s'", line)
return
}
@@ -234,7 +231,7 @@ func (c *conntracker) run() {
// Now loop on the output stream
decoder := xml.NewDecoder(reader)
for {
var f Flow
var f flow
if err := decoder.Decode(&f); err != nil {
log.Printf("conntrack error: %v", err)
return
@@ -243,15 +240,15 @@ func (c *conntracker) run() {
}
}
func (c *conntracker) existingConnections() ([]Flow, error) {
func (c *conntrackWalker) existingConnections() ([]flow, error) {
args := append([]string{"-L", "-o", "xml", "-p", "tcp"}, c.args...)
cmd := exec.Command("conntrack", args...)
stdout, err := cmd.StdoutPipe()
if err != nil {
return []Flow{}, err
return []flow{}, err
}
if err := cmd.Start(); err != nil {
return []Flow{}, err
return []flow{}, err
}
defer func() {
if err := cmd.Wait(); err != nil {
@@ -260,15 +257,14 @@ func (c *conntracker) existingConnections() ([]Flow, error) {
}()
var result conntrack
if err := xml.NewDecoder(stdout).Decode(&result); err == io.EOF {
return []Flow{}, nil
return []flow{}, nil
} else if err != nil {
return []Flow{}, err
return []flow{}, err
}
return result.Flows, nil
}
// Stop stop stop
func (c *conntracker) Stop() {
func (c *conntrackWalker) stop() {
c.Lock()
defer c.Unlock()
close(c.quit)
@@ -277,7 +273,7 @@ func (c *conntracker) Stop() {
}
}
func (c *conntracker) handleFlow(f Flow, forceAdd bool) {
func (c *conntrackWalker) handleFlow(f flow, forceAdd bool) {
// A flow consists of 3 'metas' - the 'original' 4 tuple (as seen by this
// host) and the 'reply' 4 tuple, which is what it has been rewritten to.
// This code finds those metas, which are identified by a Direction
@@ -297,7 +293,7 @@ func (c *conntracker) handleFlow(f Flow, forceAdd bool) {
// For not, I'm only interested in tcp connections - there is too much udp
// traffic going on (every container talking to weave dns, for example) to
// render nicely. TODO: revisit this.
if f.Original.Layer4.Proto != TCP {
if f.Original.Layer4.Proto != tcpProto {
return
}
@@ -305,14 +301,14 @@ func (c *conntracker) handleFlow(f Flow, forceAdd bool) {
defer c.Unlock()
switch {
case forceAdd || f.Type == New || f.Type == Update:
if f.Independent.State != TimeWait {
case forceAdd || f.Type == newType || f.Type == updateType:
if f.Independent.State != timeWait {
c.activeFlows[f.Independent.ID] = f
} else if _, ok := c.activeFlows[f.Independent.ID]; ok {
delete(c.activeFlows, f.Independent.ID)
c.bufferedFlows = append(c.bufferedFlows, f)
}
case f.Type == Destroy:
case f.Type == destroyType:
if _, ok := c.activeFlows[f.Independent.ID]; ok {
delete(c.activeFlows, f.Independent.ID)
c.bufferedFlows = append(c.bufferedFlows, f)
@@ -320,9 +316,9 @@ func (c *conntracker) handleFlow(f Flow, forceAdd bool) {
}
}
// WalkFlows calls f with all active flows and flows that have come and gone
// since the last call to WalkFlows
func (c *conntracker) WalkFlows(f func(Flow)) {
// walkFlows calls f with all active flows and flows that have come and gone
// since the last call to walkFlows
func (c *conntrackWalker) walkFlows(f func(flow)) {
c.Lock()
defer c.Unlock()
for _, flow := range c.activeFlows {

View File

@@ -1,4 +1,4 @@
package endpoint_test
package endpoint
import (
"bufio"
@@ -8,13 +8,12 @@ import (
"time"
"github.com/weaveworks/scope/common/exec"
. "github.com/weaveworks/scope/probe/endpoint"
"github.com/weaveworks/scope/test"
testExec "github.com/weaveworks/scope/test/exec"
testexec "github.com/weaveworks/scope/test/exec"
)
func makeFlow(ty string) Flow {
return Flow{
func makeFlow(ty string) flow {
return flow{
XMLName: xml.Name{
Local: "flow",
},
@@ -22,46 +21,46 @@ func makeFlow(ty string) Flow {
}
}
func addMeta(f *Flow, dir, srcIP, dstIP string, srcPort, dstPort int) *Meta {
meta := Meta{
func addMeta(f *flow, dir, srcIP, dstIP string, srcPort, dstPort int) *meta {
meta := meta{
XMLName: xml.Name{
Local: "meta",
},
Direction: dir,
Layer3: Layer3{
Layer3: layer3{
XMLName: xml.Name{
Local: "layer3",
},
SrcIP: srcIP,
DstIP: dstIP,
},
Layer4: Layer4{
Layer4: layer4{
XMLName: xml.Name{
Local: "layer4",
},
SrcPort: srcPort,
DstPort: dstPort,
Proto: TCP,
Proto: tcpProto,
},
}
f.Metas = append(f.Metas, meta)
return &meta
}
func addIndependant(f *Flow, id int64, state string) *Meta {
meta := Meta{
func addIndependant(f *flow, id int64, state string) *meta {
meta := meta{
XMLName: xml.Name{
Local: "meta",
},
Direction: "independent",
ID: id,
State: state,
Layer3: Layer3{
Layer3: layer3{
XMLName: xml.Name{
Local: "layer3",
},
},
Layer4: Layer4{
Layer4: layer4{
XMLName: xml.Name{
Local: "layer4",
},
@@ -81,19 +80,19 @@ func TestConntracker(t *testing.T) {
reader, writer := io.Pipe()
exec.Command = func(name string, args ...string) exec.Cmd {
return testExec.NewMockCmd(reader)
return testexec.NewMockCmd(reader)
}
conntracker, err := NewConntracker(false)
flowWalker, err := newConntrackFlowWalker(false)
if err != nil {
t.Fatal(err)
}
bw := bufio.NewWriter(writer)
if _, err := bw.WriteString(XMLHeader); err != nil {
if _, err := bw.WriteString(xmlHeader); err != nil {
t.Fatal(err)
}
if _, err := bw.WriteString(ConntrackOpenTag); err != nil {
if _, err := bw.WriteString(conntrackOpenTag); err != nil {
t.Fatal(err)
}
if err := bw.Flush(); err != nil {
@@ -101,8 +100,8 @@ func TestConntracker(t *testing.T) {
}
have := func() interface{} {
result := []Flow{}
conntracker.WalkFlows(func(f Flow) {
result := []flow{}
flowWalker.walkFlows(func(f flow) {
f.Original = nil
f.Reply = nil
f.Independent = nil
@@ -113,11 +112,11 @@ func TestConntracker(t *testing.T) {
ts := 100 * time.Millisecond
// First, assert we have no flows
test.Poll(t, ts, []Flow{}, have)
test.Poll(t, ts, []flow{}, have)
// Now add some flows
xmlEncoder := xml.NewEncoder(bw)
writeFlow := func(f Flow) {
writeFlow := func(f flow) {
if err := xmlEncoder.Encode(f); err != nil {
t.Fatal(err)
}
@@ -129,25 +128,25 @@ func TestConntracker(t *testing.T) {
}
}
flow1 := makeFlow(New)
flow1 := makeFlow(newType)
addMeta(&flow1, "original", "1.2.3.4", "2.3.4.5", 2, 3)
addIndependant(&flow1, 1, "")
writeFlow(flow1)
test.Poll(t, ts, []Flow{flow1}, have)
test.Poll(t, ts, []flow{flow1}, have)
// Now check when we remove the flow, we still get it in the next Walk
flow1.Type = Destroy
flow1.Type = destroyType
writeFlow(flow1)
test.Poll(t, ts, []Flow{flow1}, have)
test.Poll(t, ts, []Flow{}, have)
test.Poll(t, ts, []flow{flow1}, have)
test.Poll(t, ts, []flow{}, have)
// This time we're not going to remove it, but put it in state TIME_WAIT
flow1.Type = New
flow1.Type = newType
writeFlow(flow1)
test.Poll(t, ts, []Flow{flow1}, have)
test.Poll(t, ts, []flow{flow1}, have)
flow1.Metas[1].State = TimeWait
flow1.Metas[1].State = timeWait
writeFlow(flow1)
test.Poll(t, ts, []Flow{flow1}, have)
test.Poll(t, ts, []Flow{}, have)
test.Poll(t, ts, []flow{flow1}, have)
test.Poll(t, ts, []flow{}, have)
}

View File

@@ -18,14 +18,14 @@ type endpointMapping struct {
// natMapper rewrites a report to deal with NAT'd connections.
type natMapper struct {
Conntracker
flowWalker
}
func makeNATMapper(ct Conntracker) natMapper {
return natMapper{ct}
func makeNATMapper(fw flowWalker) natMapper {
return natMapper{fw}
}
func toMapping(f Flow) *endpointMapping {
func toMapping(f flow) *endpointMapping {
var mapping endpointMapping
if f.Original.Layer3.SrcIP == f.Reply.Layer3.DstIP {
mapping = endpointMapping{
@@ -49,10 +49,10 @@ func toMapping(f Flow) *endpointMapping {
// applyNAT duplicates Nodes in the endpoint topology of a report, based on
// the NAT table.
func (n natMapper) applyNAT(rpt report.Report, scope string) {
if n.Conntracker == nil { // TODO(pb)
if n.flowWalker == nil { // TODO(pb)
return
}
n.Conntracker.WalkFlows(func(f Flow) {
n.flowWalker.walkFlows(func(f flow) {
var (
mapping = toMapping(f)
realEndpointID = report.MakeEndpointNodeID(scope, mapping.originalIP, strconv.Itoa(mapping.originalPort))

View File

@@ -1,7 +1,6 @@
package endpoint
import (
"encoding/xml"
"reflect"
"testing"
@@ -9,78 +8,17 @@ import (
"github.com/weaveworks/scope/test"
)
type mockConntracker struct {
flows []Flow
type mockFlowWalker struct {
flows []flow
}
func (m *mockConntracker) WalkFlows(f func(Flow)) {
func (m *mockFlowWalker) walkFlows(f func(flow)) {
for _, flow := range m.flows {
f(flow)
}
}
func (m *mockConntracker) Stop() {}
// TODO(pb): dedupe later
func makeFlow(ty string) Flow {
return Flow{
XMLName: xml.Name{
Local: "flow",
},
Type: ty,
}
}
// TODO(pb): dedupe later
func addMeta(f *Flow, dir, srcIP, dstIP string, srcPort, dstPort int) *Meta {
meta := Meta{
XMLName: xml.Name{
Local: "meta",
},
Direction: dir,
Layer3: Layer3{
XMLName: xml.Name{
Local: "layer3",
},
SrcIP: srcIP,
DstIP: dstIP,
},
Layer4: Layer4{
XMLName: xml.Name{
Local: "layer4",
},
SrcPort: srcPort,
DstPort: dstPort,
Proto: TCP,
},
}
f.Metas = append(f.Metas, meta)
return &meta
}
// TODO(pb): dedupe later
func addIndependant(f *Flow, id int64, state string) *Meta {
meta := Meta{
XMLName: xml.Name{
Local: "meta",
},
Direction: "independent",
ID: id,
State: state,
Layer3: Layer3{
XMLName: xml.Name{
Local: "layer3",
},
},
Layer4: Layer4{
XMLName: xml.Name{
Local: "layer4",
},
},
}
f.Metas = append(f.Metas, meta)
return &meta
}
func (m *mockFlowWalker) stop() {}
func TestNat(t *testing.T) {
// test that two containers, on the docker network, get their connections mapped
@@ -92,12 +30,12 @@ func TestNat(t *testing.T) {
// from the PoV of host1
{
flow := makeFlow("")
addIndependant(&flow, 1, "")
flow.Original = addMeta(&flow, "original", "2.3.4.5", "1.2.3.4", 222222, 80)
flow.Reply = addMeta(&flow, "reply", "10.0.47.1", "2.3.4.5", 80, 222222)
ct := &mockConntracker{
flows: []Flow{flow},
f := makeFlow("")
addIndependant(&f, 1, "")
f.Original = addMeta(&f, "original", "2.3.4.5", "1.2.3.4", 222222, 80)
f.Reply = addMeta(&f, "reply", "10.0.47.1", "2.3.4.5", 80, 222222)
ct := &mockFlowWalker{
flows: []flow{f},
}
have := report.MakeReport()
@@ -124,12 +62,12 @@ func TestNat(t *testing.T) {
// form the PoV of host2
{
flow := makeFlow("")
addIndependant(&flow, 2, "")
flow.Original = addMeta(&flow, "original", "10.0.47.2", "1.2.3.4", 22222, 80)
flow.Reply = addMeta(&flow, "reply", "1.2.3.4", "2.3.4.5", 80, 22223)
ct := &mockConntracker{
flows: []Flow{flow},
f := makeFlow("")
addIndependant(&f, 2, "")
f.Original = addMeta(&f, "original", "10.0.47.2", "1.2.3.4", 22222, 80)
f.Reply = addMeta(&f, "reply", "1.2.3.4", "2.3.4.5", 80, 22223)
ct := &mockFlowWalker{
flows: []flow{f},
}
have := report.MakeReport()

View File

@@ -26,7 +26,7 @@ type Reporter struct {
hostName string
includeProcesses bool
includeNAT bool
conntracker Conntracker
flowWalker flowWalker // interface
natMapper *natMapper
reverseResolver *reverseResolver
}
@@ -50,18 +50,18 @@ var SpyDuration = prometheus.NewSummaryVec(
// with process (PID) information.
func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bool) *Reporter {
var (
conntracker Conntracker
natMapper *natMapper
flowWalker flowWalker
natMapper *natMapper
)
if ConntrackModulePresent() { // TODO(pb)
if useConntrack {
var err error
if conntracker, err = NewConntracker(true); err != nil {
if flowWalker, err = newConntrackFlowWalker(true); err != nil {
log.Printf("Failed to start conntracker for endpoint reporter: %v", err)
}
}
if natmapperConntracker, err := NewConntracker(true, "--any-nat"); err == nil {
m := makeNATMapper(natmapperConntracker)
if natmapperFlowWalker, err := newConntrackFlowWalker(true, "--any-nat"); err == nil {
m := makeNATMapper(natmapperFlowWalker)
natMapper = &m // TODO(pb): if we only ever use this as a pointer, newNATMapper
} else {
log.Printf("Failed to start conntracker for NAT mapper: %v", err)
@@ -71,7 +71,7 @@ func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bo
hostID: hostID,
hostName: hostName,
includeProcesses: includeProcesses,
conntracker: conntracker,
flowWalker: flowWalker,
natMapper: natMapper,
reverseResolver: newReverseResolver(),
}
@@ -79,11 +79,11 @@ func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bo
// Stop stop stop
func (r *Reporter) Stop() {
if r.conntracker != nil { // TODO(pb): this should never be nil (implies interface)
r.conntracker.Stop()
if r.flowWalker != nil { // TODO(pb): this should never be nil (implies interface)
r.flowWalker.stop()
}
if r.natMapper != nil { // TODO(pb): this should never be nil (implies interface)
r.natMapper.Stop()
r.natMapper.stop()
}
r.reverseResolver.stop()
}
@@ -123,11 +123,11 @@ func (r *Reporter) Report() (report.Report, error) {
}
}
if r.conntracker != nil {
if r.flowWalker != nil {
extraNodeInfo := report.MakeNode().WithMetadata(report.Metadata{
Conntracked: "true",
})
r.conntracker.WalkFlows(func(f Flow) {
r.flowWalker.walkFlows(func(f flow) {
var (
localPort = uint16(f.Original.Layer4.SrcPort)
remotePort = uint16(f.Original.Layer4.DstPort)