Merge pull request #526 from weaveworks/fix-natmapper-conntracker

Refactor probe/endpoint for export and dependency cleanliness
This commit is contained in:
Tom Wilkie
2015-10-27 14:11:55 +00:00
16 changed files with 429 additions and 439 deletions

View File

@@ -11,7 +11,7 @@ import (
"github.com/weaveworks/scope/probe/kubernetes"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/test"
"github.com/weaveworks/scope/test/fixture"
)
func TestAPITopology(t *testing.T) {
@@ -62,7 +62,7 @@ func TestAPITopologyAddsKubernetes(t *testing.T) {
// Enable the kubernetes topologies
rpt := report.MakeReport()
rpt.Pod = report.MakeTopology()
rpt.Pod.Nodes[test.ClientPodNodeID] = kubernetes.NewPod(&api.Pod{
rpt.Pod.Nodes[fixture.ClientPodNodeID] = kubernetes.NewPod(&api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pong-a",
Namespace: "ping",

View File

@@ -14,6 +14,7 @@ import (
"github.com/weaveworks/scope/render/expected"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/test"
"github.com/weaveworks/scope/test/fixture"
)
func TestAll(t *testing.T) {
@@ -78,7 +79,7 @@ func TestAPITopologyApplications(t *testing.T) {
}
equals(t, expected.ServerProcessID, node.Node.ID)
equals(t, "apache", node.Node.LabelMajor)
equals(t, fmt.Sprintf("%s (server:%s)", test.ServerHostID, test.ServerPID), node.Node.LabelMinor)
equals(t, fmt.Sprintf("%s (server:%s)", fixture.ServerHostID, fixture.ServerPID), node.Node.LabelMinor)
equals(t, false, node.Node.Pseudo)
// Let's not unit-test the specific content of the detail tables
}

View File

@@ -2,12 +2,12 @@ package main
import (
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/test"
"github.com/weaveworks/scope/test/fixture"
)
// StaticReport is used as a fixture in tests. It emulates an xfer.Collector.
type StaticReport struct{}
func (s StaticReport) Report() report.Report { return test.Report }
func (s StaticReport) Report() report.Report { return fixture.Report }
func (s StaticReport) Add(report.Report) {}

View File

@@ -6,7 +6,7 @@ import (
"net/http/httptest"
"testing"
"github.com/weaveworks/scope/test"
"github.com/weaveworks/scope/test/fixture"
)
func TestAPIOriginHost(t *testing.T) {
@@ -18,7 +18,7 @@ func TestAPIOriginHost(t *testing.T) {
{
// Origin
body := getRawJSON(t, ts, fmt.Sprintf("/api/origin/host/%s", test.ServerHostNodeID))
body := getRawJSON(t, ts, fmt.Sprintf("/api/origin/host/%s", fixture.ServerHostNodeID))
var o OriginHost
if err := json.Unmarshal(body, &o); err != nil {
t.Fatalf("JSON parse error: %s", err)

View File

@@ -3,7 +3,6 @@ package endpoint
import (
"bufio"
"encoding/xml"
"fmt"
"io"
"log"
"os"
@@ -14,87 +13,90 @@ import (
"github.com/weaveworks/scope/common/exec"
)
// Constants exported for testing
const (
modules = "/proc/modules"
conntrackModule = "nf_conntrack"
XMLHeader = "<?xml version=\"1.0\" encoding=\"utf-8\"?>\n"
ConntrackOpenTag = "<conntrack>\n"
TimeWait = "TIME_WAIT"
TCP = "tcp"
New = "new"
Update = "update"
Destroy = "destroy"
xmlHeader = "<?xml version=\"1.0\" encoding=\"utf-8\"?>\n"
conntrackOpenTag = "<conntrack>\n"
timeWait = "TIME_WAIT"
tcpProto = "tcp"
newType = "new"
updateType = "update"
destroyType = "destroy"
)
// Layer3 - these structs are for the parsed conntrack output
type Layer3 struct {
type layer3 struct {
XMLName xml.Name `xml:"layer3"`
SrcIP string `xml:"src"`
DstIP string `xml:"dst"`
}
// Layer4 - these structs are for the parsed conntrack output
type Layer4 struct {
type layer4 struct {
XMLName xml.Name `xml:"layer4"`
SrcPort int `xml:"sport"`
DstPort int `xml:"dport"`
Proto string `xml:"protoname,attr"`
}
// Meta - these structs are for the parsed conntrack output
type Meta struct {
type meta struct {
XMLName xml.Name `xml:"meta"`
Direction string `xml:"direction,attr"`
Layer3 Layer3 `xml:"layer3"`
Layer4 Layer4 `xml:"layer4"`
Layer3 layer3 `xml:"layer3"`
Layer4 layer4 `xml:"layer4"`
ID int64 `xml:"id"`
State string `xml:"state"`
}
// Flow - these structs are for the parsed conntrack output
type Flow struct {
type flow struct {
XMLName xml.Name `xml:"flow"`
Metas []Meta `xml:"meta"`
Metas []meta `xml:"meta"`
Type string `xml:"type,attr"`
Original, Reply, Independent *Meta `xml:"-"`
Original, Reply, Independent *meta `xml:"-"`
}
type conntrack struct {
XMLName xml.Name `xml:"conntrack"`
Flows []Flow `xml:"flow"`
Flows []flow `xml:"flow"`
}
// Conntracker is something that tracks connections.
type Conntracker interface {
WalkFlows(f func(Flow))
Stop()
// flowWalker is something that maintains flows, and provides an accessor
// method to walk them.
type flowWalker interface {
walkFlows(f func(flow))
stop()
}
// Conntracker uses the conntrack command to track network connections
type conntracker struct {
type nilFlowWalker struct{}
func (n nilFlowWalker) stop() {}
func (n nilFlowWalker) walkFlows(f func(flow)) {}
// conntrackWalker uses the conntrack command to track network connections and
// implement flowWalker.
type conntrackWalker struct {
sync.Mutex
cmd exec.Cmd
activeFlows map[int64]Flow // active flows in state != TIME_WAIT
bufferedFlows []Flow // flows coming out of activeFlows spend 1 walk cycle here
existingConns bool
activeFlows map[int64]flow // active flows in state != TIME_WAIT
bufferedFlows []flow // flows coming out of activeFlows spend 1 walk cycle here
args []string
quit chan struct{}
}
// NewConntracker creates and starts a new Conntracter
func NewConntracker(existingConns bool, args ...string) (Conntracker, error) {
// newConntracker creates and starts a new conntracker.
func newConntrackFlowWalker(useConntrack bool, args ...string) flowWalker {
if !ConntrackModulePresent() {
return nil, fmt.Errorf("No conntrack module")
log.Printf("Not using conntrack: module not present")
return nilFlowWalker{}
} else if !useConntrack {
return nilFlowWalker{}
}
result := &conntracker{
activeFlows: map[int64]Flow{},
existingConns: existingConns,
args: args,
result := &conntrackWalker{
activeFlows: map[int64]flow{},
args: args,
}
go result.loop()
return result, nil
return result
}
// ConntrackModulePresent returns true if the kernel has the conntrack module
@@ -121,7 +123,7 @@ var ConntrackModulePresent = func() bool {
return false
}
func (c *conntracker) loop() {
func (c *conntrackWalker) loop() {
// conntrack can sometimes fail with ENOBUFS, when there is a particularly
// high connection rate. In these cases just retry in a loop, so we can
// survive the spike. For sustained loads this degrades nicely, as we
@@ -139,7 +141,7 @@ func (c *conntracker) loop() {
}
}
func (c *conntracker) clearFlows() {
func (c *conntrackWalker) clearFlows() {
c.Lock()
defer c.Unlock()
@@ -147,7 +149,7 @@ func (c *conntracker) clearFlows() {
c.bufferedFlows = append(c.bufferedFlows, f)
}
c.activeFlows = map[int64]Flow{}
c.activeFlows = map[int64]flow{}
}
func logPipe(prefix string, reader io.Reader) {
@@ -160,18 +162,16 @@ func logPipe(prefix string, reader io.Reader) {
}
}
func (c *conntracker) run() {
if c.existingConns {
// Fork another conntrack, just to capture existing connections
// for which we don't get events
existingFlows, err := c.existingConnections()
if err != nil {
log.Printf("conntrack existingConnections error: %v", err)
return
}
for _, flow := range existingFlows {
c.handleFlow(flow, true)
}
func (c *conntrackWalker) run() {
// Fork another conntrack, just to capture existing connections
// for which we don't get events
existingFlows, err := c.existingConnections()
if err != nil {
log.Printf("conntrack existingConnections error: %v", err)
return
}
for _, flow := range existingFlows {
c.handleFlow(flow, true)
}
args := append([]string{"-E", "-o", "xml", "-p", "tcp"}, c.args...)
@@ -217,14 +217,14 @@ func (c *conntracker) run() {
if line, err := reader.ReadString('\n'); err != nil {
log.Printf("conntrack error: %v", err)
return
} else if line != XMLHeader {
} else if line != xmlHeader {
log.Printf("conntrack invalid output: '%s'", line)
return
}
if line, err := reader.ReadString('\n'); err != nil {
log.Printf("conntrack error: %v", err)
return
} else if line != ConntrackOpenTag {
} else if line != conntrackOpenTag {
log.Printf("conntrack invalid output: '%s'", line)
return
}
@@ -234,7 +234,7 @@ func (c *conntracker) run() {
// Now loop on the output stream
decoder := xml.NewDecoder(reader)
for {
var f Flow
var f flow
if err := decoder.Decode(&f); err != nil {
log.Printf("conntrack error: %v", err)
return
@@ -243,15 +243,15 @@ func (c *conntracker) run() {
}
}
func (c *conntracker) existingConnections() ([]Flow, error) {
func (c *conntrackWalker) existingConnections() ([]flow, error) {
args := append([]string{"-L", "-o", "xml", "-p", "tcp"}, c.args...)
cmd := exec.Command("conntrack", args...)
stdout, err := cmd.StdoutPipe()
if err != nil {
return []Flow{}, err
return []flow{}, err
}
if err := cmd.Start(); err != nil {
return []Flow{}, err
return []flow{}, err
}
defer func() {
if err := cmd.Wait(); err != nil {
@@ -260,15 +260,14 @@ func (c *conntracker) existingConnections() ([]Flow, error) {
}()
var result conntrack
if err := xml.NewDecoder(stdout).Decode(&result); err == io.EOF {
return []Flow{}, nil
return []flow{}, nil
} else if err != nil {
return []Flow{}, err
return []flow{}, err
}
return result.Flows, nil
}
// Stop stop stop
func (c *conntracker) Stop() {
func (c *conntrackWalker) stop() {
c.Lock()
defer c.Unlock()
close(c.quit)
@@ -277,7 +276,7 @@ func (c *conntracker) Stop() {
}
}
func (c *conntracker) handleFlow(f Flow, forceAdd bool) {
func (c *conntrackWalker) handleFlow(f flow, forceAdd bool) {
// A flow consists of 3 'metas' - the 'original' 4 tuple (as seen by this
// host) and the 'reply' 4 tuple, which is what it has been rewritten to.
// This code finds those metas, which are identified by a Direction
@@ -297,7 +296,7 @@ func (c *conntracker) handleFlow(f Flow, forceAdd bool) {
// For not, I'm only interested in tcp connections - there is too much udp
// traffic going on (every container talking to weave dns, for example) to
// render nicely. TODO: revisit this.
if f.Original.Layer4.Proto != TCP {
if f.Original.Layer4.Proto != tcpProto {
return
}
@@ -305,14 +304,14 @@ func (c *conntracker) handleFlow(f Flow, forceAdd bool) {
defer c.Unlock()
switch {
case forceAdd || f.Type == New || f.Type == Update:
if f.Independent.State != TimeWait {
case forceAdd || f.Type == newType || f.Type == updateType:
if f.Independent.State != timeWait {
c.activeFlows[f.Independent.ID] = f
} else if _, ok := c.activeFlows[f.Independent.ID]; ok {
delete(c.activeFlows, f.Independent.ID)
c.bufferedFlows = append(c.bufferedFlows, f)
}
case f.Type == Destroy:
case f.Type == destroyType:
if _, ok := c.activeFlows[f.Independent.ID]; ok {
delete(c.activeFlows, f.Independent.ID)
c.bufferedFlows = append(c.bufferedFlows, f)
@@ -320,9 +319,9 @@ func (c *conntracker) handleFlow(f Flow, forceAdd bool) {
}
}
// WalkFlows calls f with all active flows and flows that have come and gone
// since the last call to WalkFlows
func (c *conntracker) WalkFlows(f func(Flow)) {
// walkFlows calls f with all active flows and flows that have come and gone
// since the last call to walkFlows
func (c *conntrackWalker) walkFlows(f func(flow)) {
c.Lock()
defer c.Unlock()
for _, flow := range c.activeFlows {

View File

@@ -1,4 +1,4 @@
package endpoint_test
package endpoint
import (
"bufio"
@@ -8,13 +8,14 @@ import (
"time"
"github.com/weaveworks/scope/common/exec"
. "github.com/weaveworks/scope/probe/endpoint"
"github.com/weaveworks/scope/test"
testExec "github.com/weaveworks/scope/test/exec"
testexec "github.com/weaveworks/scope/test/exec"
)
func makeFlow(ty string) Flow {
return Flow{
const conntrackCloseTag = "</conntrack>\n"
func makeFlow(ty string) flow {
return flow{
XMLName: xml.Name{
Local: "flow",
},
@@ -22,46 +23,46 @@ func makeFlow(ty string) Flow {
}
}
func addMeta(f *Flow, dir, srcIP, dstIP string, srcPort, dstPort int) *Meta {
meta := Meta{
func addMeta(f *flow, dir, srcIP, dstIP string, srcPort, dstPort int) *meta {
meta := meta{
XMLName: xml.Name{
Local: "meta",
},
Direction: dir,
Layer3: Layer3{
Layer3: layer3{
XMLName: xml.Name{
Local: "layer3",
},
SrcIP: srcIP,
DstIP: dstIP,
},
Layer4: Layer4{
Layer4: layer4{
XMLName: xml.Name{
Local: "layer4",
},
SrcPort: srcPort,
DstPort: dstPort,
Proto: TCP,
Proto: tcpProto,
},
}
f.Metas = append(f.Metas, meta)
return &meta
}
func addIndependant(f *Flow, id int64, state string) *Meta {
meta := Meta{
func addIndependant(f *flow, id int64, state string) *meta {
meta := meta{
XMLName: xml.Name{
Local: "meta",
},
Direction: "independent",
ID: id,
State: state,
Layer3: Layer3{
Layer3: layer3{
XMLName: xml.Name{
Local: "layer3",
},
},
Layer4: Layer4{
Layer4: layer4{
XMLName: xml.Name{
Local: "layer4",
},
@@ -79,21 +80,40 @@ func TestConntracker(t *testing.T) {
return true
}
first := true
existingConnectionsReader, existingConnectionsWriter := io.Pipe()
reader, writer := io.Pipe()
exec.Command = func(name string, args ...string) exec.Cmd {
return testExec.NewMockCmd(reader)
if first {
first = false
return testexec.NewMockCmd(existingConnectionsReader)
}
return testexec.NewMockCmd(reader)
}
conntracker, err := NewConntracker(false)
if err != nil {
flowWalker := newConntrackFlowWalker(true)
// First write out some empty xml for the existing connections
ecbw := bufio.NewWriter(existingConnectionsWriter)
if _, err := ecbw.WriteString(xmlHeader); err != nil {
t.Fatal(err)
}
if _, err := ecbw.WriteString(conntrackOpenTag); err != nil {
t.Fatal(err)
}
if _, err := ecbw.WriteString(conntrackCloseTag); err != nil {
t.Fatal(err)
}
if err := ecbw.Flush(); err != nil {
t.Fatal(err)
}
// Then write out eventa
bw := bufio.NewWriter(writer)
if _, err := bw.WriteString(XMLHeader); err != nil {
if _, err := bw.WriteString(xmlHeader); err != nil {
t.Fatal(err)
}
if _, err := bw.WriteString(ConntrackOpenTag); err != nil {
if _, err := bw.WriteString(conntrackOpenTag); err != nil {
t.Fatal(err)
}
if err := bw.Flush(); err != nil {
@@ -101,8 +121,8 @@ func TestConntracker(t *testing.T) {
}
have := func() interface{} {
result := []Flow{}
conntracker.WalkFlows(func(f Flow) {
result := []flow{}
flowWalker.walkFlows(func(f flow) {
f.Original = nil
f.Reply = nil
f.Independent = nil
@@ -113,11 +133,11 @@ func TestConntracker(t *testing.T) {
ts := 100 * time.Millisecond
// First, assert we have no flows
test.Poll(t, ts, []Flow{}, have)
test.Poll(t, ts, []flow{}, have)
// Now add some flows
xmlEncoder := xml.NewEncoder(bw)
writeFlow := func(f Flow) {
writeFlow := func(f flow) {
if err := xmlEncoder.Encode(f); err != nil {
t.Fatal(err)
}
@@ -129,25 +149,25 @@ func TestConntracker(t *testing.T) {
}
}
flow1 := makeFlow(New)
flow1 := makeFlow(newType)
addMeta(&flow1, "original", "1.2.3.4", "2.3.4.5", 2, 3)
addIndependant(&flow1, 1, "")
writeFlow(flow1)
test.Poll(t, ts, []Flow{flow1}, have)
test.Poll(t, ts, []flow{flow1}, have)
// Now check when we remove the flow, we still get it in the next Walk
flow1.Type = Destroy
flow1.Type = destroyType
writeFlow(flow1)
test.Poll(t, ts, []Flow{flow1}, have)
test.Poll(t, ts, []Flow{}, have)
test.Poll(t, ts, []flow{flow1}, have)
test.Poll(t, ts, []flow{}, have)
// This time we're not going to remove it, but put it in state TIME_WAIT
flow1.Type = New
flow1.Type = newType
writeFlow(flow1)
test.Poll(t, ts, []Flow{flow1}, have)
test.Poll(t, ts, []flow{flow1}, have)
flow1.Metas[1].State = TimeWait
flow1.Metas[1].State = timeWait
writeFlow(flow1)
test.Poll(t, ts, []Flow{flow1}, have)
test.Poll(t, ts, []Flow{}, have)
test.Poll(t, ts, []flow{flow1}, have)
test.Poll(t, ts, []flow{}, have)
}

View File

@@ -16,17 +16,16 @@ type endpointMapping struct {
rewrittenPort int
}
// NATMapper rewrites a report to deal with NAT's connections
type NATMapper struct {
Conntracker
// natMapper rewrites a report to deal with NAT'd connections.
type natMapper struct {
flowWalker
}
// NewNATMapper is exposed for testing
func NewNATMapper(ct Conntracker) NATMapper {
return NATMapper{ct}
func makeNATMapper(fw flowWalker) natMapper {
return natMapper{fw}
}
func toMapping(f Flow) *endpointMapping {
func toMapping(f flow) *endpointMapping {
var mapping endpointMapping
if f.Original.Layer3.SrcIP == f.Reply.Layer3.DstIP {
mapping = endpointMapping{
@@ -47,10 +46,10 @@ func toMapping(f Flow) *endpointMapping {
return &mapping
}
// ApplyNAT duplicates Nodes in the endpoint topology of a
// report, based on the NAT table as returns by natTable.
func (n NATMapper) ApplyNAT(rpt report.Report, scope string) {
n.WalkFlows(func(f Flow) {
// applyNAT duplicates Nodes in the endpoint topology of a report, based on
// the NAT table.
func (n natMapper) applyNAT(rpt report.Report, scope string) {
n.flowWalker.walkFlows(func(f flow) {
var (
mapping = toMapping(f)
realEndpointID = report.MakeEndpointNodeID(scope, mapping.originalIP, strconv.Itoa(mapping.originalPort))

View File

@@ -1,25 +1,24 @@
package endpoint_test
package endpoint
import (
"reflect"
"testing"
"github.com/weaveworks/scope/probe/endpoint"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/test"
)
type mockConntracker struct {
flows []endpoint.Flow
type mockFlowWalker struct {
flows []flow
}
func (m *mockConntracker) WalkFlows(f func(endpoint.Flow)) {
func (m *mockFlowWalker) walkFlows(f func(flow)) {
for _, flow := range m.flows {
f(flow)
}
}
func (m *mockConntracker) Stop() {}
func (m *mockFlowWalker) stop() {}
func TestNat(t *testing.T) {
// test that two containers, on the docker network, get their connections mapped
@@ -31,32 +30,31 @@ func TestNat(t *testing.T) {
// from the PoV of host1
{
flow := makeFlow("")
addIndependant(&flow, 1, "")
flow.Original = addMeta(&flow, "original", "2.3.4.5", "1.2.3.4", 222222, 80)
flow.Reply = addMeta(&flow, "reply", "10.0.47.1", "2.3.4.5", 80, 222222)
ct := &mockConntracker{
flows: []endpoint.Flow{flow},
f := makeFlow("")
addIndependant(&f, 1, "")
f.Original = addMeta(&f, "original", "2.3.4.5", "1.2.3.4", 222222, 80)
f.Reply = addMeta(&f, "reply", "10.0.47.1", "2.3.4.5", 80, 222222)
ct := &mockFlowWalker{
flows: []flow{f},
}
have := report.MakeReport()
originalID := report.MakeEndpointNodeID("host1", "10.0.47.1", "80")
have.Endpoint.AddNode(originalID, report.MakeNodeWith(report.Metadata{
endpoint.Addr: "10.0.47.1",
endpoint.Port: "80",
"foo": "bar",
Addr: "10.0.47.1",
Port: "80",
"foo": "bar",
}))
want := have.Copy()
want.Endpoint.AddNode(report.MakeEndpointNodeID("host1", "1.2.3.4", "80"), report.MakeNodeWith(report.Metadata{
endpoint.Addr: "1.2.3.4",
endpoint.Port: "80",
"copy_of": originalID,
"foo": "bar",
Addr: "1.2.3.4",
Port: "80",
"copy_of": originalID,
"foo": "bar",
}))
natmapper := endpoint.NewNATMapper(ct)
natmapper.ApplyNAT(have, "host1")
makeNATMapper(ct).applyNAT(have, "host1")
if !reflect.DeepEqual(want, have) {
t.Fatal(test.Diff(want, have))
}
@@ -64,32 +62,31 @@ func TestNat(t *testing.T) {
// form the PoV of host2
{
flow := makeFlow("")
addIndependant(&flow, 2, "")
flow.Original = addMeta(&flow, "original", "10.0.47.2", "1.2.3.4", 22222, 80)
flow.Reply = addMeta(&flow, "reply", "1.2.3.4", "2.3.4.5", 80, 22223)
ct := &mockConntracker{
flows: []endpoint.Flow{flow},
f := makeFlow("")
addIndependant(&f, 2, "")
f.Original = addMeta(&f, "original", "10.0.47.2", "1.2.3.4", 22222, 80)
f.Reply = addMeta(&f, "reply", "1.2.3.4", "2.3.4.5", 80, 22223)
ct := &mockFlowWalker{
flows: []flow{f},
}
have := report.MakeReport()
originalID := report.MakeEndpointNodeID("host2", "10.0.47.2", "22222")
have.Endpoint.AddNode(originalID, report.MakeNodeWith(report.Metadata{
endpoint.Addr: "10.0.47.2",
endpoint.Port: "22222",
"foo": "baz",
Addr: "10.0.47.2",
Port: "22222",
"foo": "baz",
}))
want := have.Copy()
want.Endpoint.AddNode(report.MakeEndpointNodeID("host2", "2.3.4.5", "22223"), report.MakeNodeWith(report.Metadata{
endpoint.Addr: "2.3.4.5",
endpoint.Port: "22223",
"copy_of": originalID,
"foo": "baz",
Addr: "2.3.4.5",
Port: "22223",
"copy_of": originalID,
"foo": "baz",
}))
natmapper := endpoint.NewNATMapper(ct)
natmapper.ApplyNAT(have, "host1")
makeNATMapper(ct).applyNAT(have, "host1")
if !reflect.DeepEqual(want, have) {
t.Fatal(test.Diff(want, have))
}

View File

@@ -1,7 +1,6 @@
package endpoint
import (
"log"
"strconv"
"time"
@@ -26,9 +25,9 @@ type Reporter struct {
hostName string
includeProcesses bool
includeNAT bool
conntracker Conntracker
natmapper *NATMapper
revResolver *ReverseResolver
flowWalker flowWalker // interface
natMapper natMapper
reverseResolver *reverseResolver
}
// SpyDuration is an exported prometheus metric
@@ -49,44 +48,21 @@ var SpyDuration = prometheus.NewSummaryVec(
// is stored in the Endpoint topology. It optionally enriches that topology
// with process (PID) information.
func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bool) *Reporter {
var (
conntrackModulePresent = ConntrackModulePresent()
conntracker Conntracker
natmapper NATMapper
err error
)
if conntrackModulePresent && useConntrack {
conntracker, err = NewConntracker(true)
if err != nil {
log.Printf("Failed to start conntracker: %v", err)
}
}
if conntrackModulePresent {
ct, err := NewConntracker(true, "--any-nat")
if err != nil {
log.Printf("Failed to start conntracker for natmapper: %v", err)
}
natmapper = NewNATMapper(ct)
}
return &Reporter{
hostID: hostID,
hostName: hostName,
includeProcesses: includeProcesses,
conntracker: conntracker,
natmapper: &natmapper,
revResolver: NewReverseResolver(),
flowWalker: newConntrackFlowWalker(useConntrack),
natMapper: makeNATMapper(newConntrackFlowWalker(useConntrack, "--any-nat")),
reverseResolver: newReverseResolver(),
}
}
// Stop stop stop
func (r *Reporter) Stop() {
if r.conntracker != nil {
r.conntracker.Stop()
}
if r.natmapper != nil {
r.natmapper.Stop()
}
r.revResolver.Stop()
r.flowWalker.stop()
r.natMapper.stop()
r.reverseResolver.stop()
}
// Report implements Reporter.
@@ -124,11 +100,12 @@ func (r *Reporter) Report() (report.Report, error) {
}
}
if r.conntracker != nil {
// Consult the flowWalker for short-live connections
{
extraNodeInfo := report.MakeNode().WithMetadata(report.Metadata{
Conntracked: "true",
})
r.conntracker.WalkFlows(func(f Flow) {
r.flowWalker.walkFlows(func(f flow) {
var (
localPort = uint16(f.Original.Layer4.SrcPort)
remotePort = uint16(f.Original.Layer4.DstPort)
@@ -139,10 +116,7 @@ func (r *Reporter) Report() (report.Report, error) {
})
}
if r.natmapper != nil {
r.natmapper.ApplyNAT(rpt, r.hostID)
}
r.natMapper.applyNAT(rpt, r.hostID)
return rpt, nil
}
@@ -165,9 +139,9 @@ func (r *Reporter) addConnection(rpt *report.Report, localAddr, remoteAddr strin
// In case we have a reverse resolution for the IP, we can use it for
// the name...
if revRemoteName, err := r.revResolver.Get(remoteAddr); err == nil {
if remoteName, err := r.reverseResolver.get(remoteAddr); err == nil {
remoteNode = remoteNode.WithMetadata(map[string]string{
"name": revRemoteName,
"name": remoteName,
})
}
@@ -211,9 +185,9 @@ func (r *Reporter) addConnection(rpt *report.Report, localAddr, remoteAddr strin
// In case we have a reverse resolution for the IP, we can use it for
// the name...
if revRemoteName, err := r.revResolver.Get(remoteAddr); err == nil {
if remoteName, err := r.reverseResolver.get(remoteAddr); err == nil {
remoteNode = remoteNode.WithMetadata(map[string]string{
"name": revRemoteName,
"name": remoteName,
})
}

View File

@@ -16,22 +16,22 @@ const (
rAddrCacheExpiration = 30 * time.Minute
)
var errNotFound = fmt.Errorf("Not found")
var errNotFound = fmt.Errorf("not found")
type revResFunc func(addr string) (names []string, err error)
// ReverseResolver is a caching, reverse resolver.
type ReverseResolver struct {
// A caching, reverse resolver.
type reverseResolver struct {
addresses chan string
cache gcache.Cache
Throttle <-chan time.Time // Made public for mocking
Resolver revResFunc
}
// NewReverseResolver starts a new reverse resolver that performs reverse
// newReverseResolver starts a new reverse resolver that performs reverse
// resolutions and caches the result.
func NewReverseResolver() *ReverseResolver {
r := ReverseResolver{
func newReverseResolver() *reverseResolver {
r := reverseResolver{
addresses: make(chan string, rAddrBacklog),
cache: gcache.New(rAddrCacheLen).LRU().Expiration(rAddrCacheExpiration).Build(),
Throttle: time.Tick(time.Second / 10),
@@ -41,10 +41,10 @@ func NewReverseResolver() *ReverseResolver {
return &r
}
// Get the reverse resolution for an IP address if already in the cache, a
// get the reverse resolution for an IP address if already in the cache, a
// gcache.NotFoundKeyError error otherwise. Note: it returns one of the
// possible names that can be obtained for that IP.
func (r *ReverseResolver) Get(address string) (string, error) {
func (r *reverseResolver) get(address string) (string, error) {
val, err := r.cache.Get(address)
if hostname, ok := val.(string); err == nil && ok {
return hostname, nil
@@ -53,7 +53,7 @@ func (r *ReverseResolver) Get(address string) (string, error) {
return "", errNotFound
}
if err == gcache.NotFoundKeyError {
// We trigger a asynchronous reverse resolution when not cached
// We trigger a asynchronous reverse resolution when not cached.
select {
case r.addresses <- address:
default:
@@ -62,7 +62,7 @@ func (r *ReverseResolver) Get(address string) (string, error) {
return "", errNotFound
}
func (r *ReverseResolver) loop() {
func (r *reverseResolver) loop() {
for request := range r.addresses {
// check if the answer is already in the cache
if _, err := r.cache.Get(request); err == nil {
@@ -80,7 +80,6 @@ func (r *ReverseResolver) loop() {
}
}
// Stop the async reverse resolver.
func (r *ReverseResolver) Stop() {
func (r *reverseResolver) stop() {
close(r.addresses)
}

View File

@@ -1,11 +1,10 @@
package endpoint_test
package endpoint
import (
"errors"
"testing"
"time"
. "github.com/weaveworks/scope/probe/endpoint"
"github.com/weaveworks/scope/test"
)
@@ -15,8 +14,8 @@ func TestReverseResolver(t *testing.T) {
"4.3.2.1": {"im.a.little.tea.pot"},
}
revRes := NewReverseResolver()
defer revRes.Stop()
revRes := newReverseResolver()
defer revRes.stop()
// Use a mocked resolver function.
revRes.Resolver = func(addr string) (names []string, err error) {
@@ -31,7 +30,7 @@ func TestReverseResolver(t *testing.T) {
for ip, names := range tests {
test.Poll(t, 100*time.Millisecond, names[0], func() interface{} {
result, _ := revRes.Get(ip)
result, _ := revRes.get(ip)
return result
})
}

View File

@@ -7,20 +7,21 @@ import (
"github.com/weaveworks/scope/render"
"github.com/weaveworks/scope/test"
"github.com/weaveworks/scope/test/fixture"
)
func TestOriginTable(t *testing.T) {
if _, ok := render.OriginTable(test.Report, "not-found", false, false); ok {
if _, ok := render.OriginTable(fixture.Report, "not-found", false, false); ok {
t.Errorf("unknown origin ID gave unexpected success")
}
for originID, want := range map[string]render.Table{test.ServerProcessNodeID: {
Title: fmt.Sprintf(`Process "apache" (%s)`, test.ServerPID),
for originID, want := range map[string]render.Table{fixture.ServerProcessNodeID: {
Title: fmt.Sprintf(`Process "apache" (%s)`, fixture.ServerPID),
Numeric: false,
Rank: 2,
Rows: []render.Row{},
},
test.ServerHostNodeID: {
Title: fmt.Sprintf("Host %q", test.ServerHostName),
fixture.ServerHostNodeID: {
Title: fmt.Sprintf("Host %q", fixture.ServerHostName),
Numeric: false,
Rank: 1,
Rows: []render.Row{
@@ -29,7 +30,7 @@ func TestOriginTable(t *testing.T) {
},
},
} {
have, ok := render.OriginTable(test.Report, originID, false, false)
have, ok := render.OriginTable(fixture.Report, originID, false, false)
if !ok {
t.Errorf("%q: not OK", originID)
continue
@@ -41,23 +42,23 @@ func TestOriginTable(t *testing.T) {
// Test host/container tags
for originID, want := range map[string]render.Table{
test.ServerProcessNodeID: {
Title: fmt.Sprintf(`Process "apache" (%s)`, test.ServerPID),
fixture.ServerProcessNodeID: {
Title: fmt.Sprintf(`Process "apache" (%s)`, fixture.ServerPID),
Numeric: false,
Rank: 2,
Rows: []render.Row{
{"Host", test.ServerHostID, "", false},
{"Container ID", test.ServerContainerID, "", false},
{"Host", fixture.ServerHostID, "", false},
{"Container ID", fixture.ServerContainerID, "", false},
},
},
test.ServerContainerNodeID: {
fixture.ServerContainerNodeID: {
Title: `Container "server"`,
Numeric: false,
Rank: 3,
Rows: []render.Row{
{"Host", test.ServerHostID, "", false},
{"ID", test.ServerContainerID, "", false},
{"Image ID", test.ServerContainerImageID, "", false},
{"Host", fixture.ServerHostID, "", false},
{"ID", fixture.ServerContainerID, "", false},
{"Image ID", fixture.ServerContainerImageID, "", false},
{fmt.Sprintf(`Label %q`, render.AmazonECSContainerNameLabel), `server`, "", false},
{`Label "foo1"`, `bar1`, "", false},
{`Label "foo2"`, `bar2`, "", false},
@@ -65,7 +66,7 @@ func TestOriginTable(t *testing.T) {
},
},
} {
have, ok := render.OriginTable(test.Report, originID, true, true)
have, ok := render.OriginTable(fixture.Report, originID, true, true)
if !ok {
t.Errorf("%q: not OK", originID)
continue
@@ -78,16 +79,16 @@ func TestOriginTable(t *testing.T) {
}
func TestMakeDetailedHostNode(t *testing.T) {
renderableNode := render.HostRenderer.Render(test.Report)[render.MakeHostID(test.ClientHostID)]
have := render.MakeDetailedNode(test.Report, renderableNode)
renderableNode := render.HostRenderer.Render(fixture.Report)[render.MakeHostID(fixture.ClientHostID)]
have := render.MakeDetailedNode(fixture.Report, renderableNode)
want := render.DetailedNode{
ID: render.MakeHostID(test.ClientHostID),
ID: render.MakeHostID(fixture.ClientHostID),
LabelMajor: "client",
LabelMinor: "hostname.com",
Pseudo: false,
Tables: []render.Table{
{
Title: fmt.Sprintf("Host %q", test.ClientHostName),
Title: fmt.Sprintf("Host %q", fixture.ClientHostName),
Numeric: false,
Rank: 1,
Rows: []render.Row{
@@ -135,12 +136,12 @@ func TestMakeDetailedHostNode(t *testing.T) {
}
func TestMakeDetailedContainerNode(t *testing.T) {
renderableNode := render.ContainerRenderer.Render(test.Report)[test.ServerContainerID]
have := render.MakeDetailedNode(test.Report, renderableNode)
renderableNode := render.ContainerRenderer.Render(fixture.Report)[fixture.ServerContainerID]
have := render.MakeDetailedNode(fixture.Report, renderableNode)
want := render.DetailedNode{
ID: test.ServerContainerID,
ID: fixture.ServerContainerID,
LabelMajor: "server",
LabelMinor: test.ServerHostName,
LabelMinor: fixture.ServerHostName,
Pseudo: false,
Tables: []render.Table{
{
@@ -148,7 +149,7 @@ func TestMakeDetailedContainerNode(t *testing.T) {
Numeric: false,
Rank: 4,
Rows: []render.Row{
{"Image ID", test.ServerContainerImageID, "", false},
{"Image ID", fixture.ServerContainerImageID, "", false},
{`Label "foo1"`, `bar1`, "", false},
{`Label "foo2"`, `bar2`, "", false},
},
@@ -158,8 +159,8 @@ func TestMakeDetailedContainerNode(t *testing.T) {
Numeric: false,
Rank: 3,
Rows: []render.Row{
{"ID", test.ServerContainerID, "", false},
{"Image ID", test.ServerContainerImageID, "", false},
{"ID", fixture.ServerContainerID, "", false},
{"Image ID", fixture.ServerContainerImageID, "", false},
{fmt.Sprintf(`Label %q`, render.AmazonECSContainerNameLabel), `server`, "", false},
{`Label "foo1"`, `bar1`, "", false},
{`Label "foo2"`, `bar2`, "", false},
@@ -167,13 +168,13 @@ func TestMakeDetailedContainerNode(t *testing.T) {
},
},
{
Title: fmt.Sprintf(`Process "apache" (%s)`, test.ServerPID),
Title: fmt.Sprintf(`Process "apache" (%s)`, fixture.ServerPID),
Numeric: false,
Rank: 2,
Rows: []render.Row{},
},
{
Title: fmt.Sprintf("Host %q", test.ServerHostName),
Title: fmt.Sprintf("Host %q", fixture.ServerHostName),
Numeric: false,
Rank: 1,
Rows: []render.Row{
@@ -190,38 +191,38 @@ func TestMakeDetailedContainerNode(t *testing.T) {
{"Ingress byte rate", "1.0", "KBps", false},
{"Client", "Server", "", true},
{
fmt.Sprintf("%s:%s", test.UnknownClient1IP, test.UnknownClient1Port),
fmt.Sprintf("%s:%s", test.ServerIP, test.ServerPort),
fmt.Sprintf("%s:%s", fixture.UnknownClient1IP, fixture.UnknownClient1Port),
fmt.Sprintf("%s:%s", fixture.ServerIP, fixture.ServerPort),
"",
true,
},
{
fmt.Sprintf("%s:%s", test.UnknownClient2IP, test.UnknownClient2Port),
fmt.Sprintf("%s:%s", test.ServerIP, test.ServerPort),
fmt.Sprintf("%s:%s", fixture.UnknownClient2IP, fixture.UnknownClient2Port),
fmt.Sprintf("%s:%s", fixture.ServerIP, fixture.ServerPort),
"",
true,
},
{
fmt.Sprintf("%s:%s", test.UnknownClient3IP, test.UnknownClient3Port),
fmt.Sprintf("%s:%s", test.ServerIP, test.ServerPort),
fmt.Sprintf("%s:%s", fixture.UnknownClient3IP, fixture.UnknownClient3Port),
fmt.Sprintf("%s:%s", fixture.ServerIP, fixture.ServerPort),
"",
true,
},
{
fmt.Sprintf("%s:%s", test.ClientIP, test.ClientPort54001),
fmt.Sprintf("%s:%s", test.ServerIP, test.ServerPort),
fmt.Sprintf("%s:%s", fixture.ClientIP, fixture.ClientPort54001),
fmt.Sprintf("%s:%s", fixture.ServerIP, fixture.ServerPort),
"",
true,
},
{
fmt.Sprintf("%s:%s", test.ClientIP, test.ClientPort54002),
fmt.Sprintf("%s:%s", test.ServerIP, test.ServerPort),
fmt.Sprintf("%s:%s", fixture.ClientIP, fixture.ClientPort54002),
fmt.Sprintf("%s:%s", fixture.ServerIP, fixture.ServerPort),
"",
true,
},
{
fmt.Sprintf("%s:%s", test.RandomClientIP, test.RandomClientPort),
fmt.Sprintf("%s:%s", test.ServerIP, test.ServerPort),
fmt.Sprintf("%s:%s", fixture.RandomClientIP, fixture.RandomClientPort),
fmt.Sprintf("%s:%s", fixture.ServerIP, fixture.ServerPort),
"",
true,
},

View File

@@ -5,14 +5,14 @@ import (
"github.com/weaveworks/scope/render"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/test"
"github.com/weaveworks/scope/test/fixture"
)
// Exported for testing.
var (
uncontainedServerID = render.MakePseudoNodeID(render.UncontainedID, test.ServerHostName)
unknownPseudoNode1ID = render.MakePseudoNodeID("10.10.10.10", test.ServerIP, "80")
unknownPseudoNode2ID = render.MakePseudoNodeID("10.10.10.11", test.ServerIP, "80")
uncontainedServerID = render.MakePseudoNodeID(render.UncontainedID, fixture.ServerHostName)
unknownPseudoNode1ID = render.MakePseudoNodeID("10.10.10.10", fixture.ServerIP, "80")
unknownPseudoNode2ID = render.MakePseudoNodeID("10.10.10.11", fixture.ServerIP, "80")
unknownPseudoNode1 = func(adjacent string) render.RenderableNode {
return render.RenderableNode{
ID: unknownPseudoNode1ID,
@@ -24,8 +24,8 @@ var (
EgressByteCount: newu64(700),
},
Origins: report.MakeIDList(
test.UnknownClient1NodeID,
test.UnknownClient2NodeID,
fixture.UnknownClient1NodeID,
fixture.UnknownClient2NodeID,
),
}
}
@@ -40,7 +40,7 @@ var (
EgressByteCount: newu64(500),
},
Origins: report.MakeIDList(
test.UnknownClient3NodeID,
fixture.UnknownClient3NodeID,
),
}
}
@@ -55,27 +55,27 @@ var (
EgressByteCount: newu64(600),
},
Origins: report.MakeIDList(
test.RandomClientNodeID,
test.GoogleEndpointNodeID,
fixture.RandomClientNodeID,
fixture.GoogleEndpointNodeID,
),
}
}
ClientProcess1ID = render.MakeProcessID(test.ClientHostID, test.Client1PID)
ClientProcess2ID = render.MakeProcessID(test.ClientHostID, test.Client2PID)
ServerProcessID = render.MakeProcessID(test.ServerHostID, test.ServerPID)
nonContainerProcessID = render.MakeProcessID(test.ServerHostID, test.NonContainerPID)
ClientProcess1ID = render.MakeProcessID(fixture.ClientHostID, fixture.Client1PID)
ClientProcess2ID = render.MakeProcessID(fixture.ClientHostID, fixture.Client2PID)
ServerProcessID = render.MakeProcessID(fixture.ServerHostID, fixture.ServerPID)
nonContainerProcessID = render.MakeProcessID(fixture.ServerHostID, fixture.NonContainerPID)
RenderedProcesses = (render.RenderableNodes{
ClientProcess1ID: {
ID: ClientProcess1ID,
LabelMajor: test.Client1Comm,
LabelMinor: fmt.Sprintf("%s (%s)", test.ClientHostID, test.Client1PID),
Rank: test.Client1Comm,
LabelMajor: fixture.Client1Comm,
LabelMinor: fmt.Sprintf("%s (%s)", fixture.ClientHostID, fixture.Client1PID),
Rank: fixture.Client1Comm,
Pseudo: false,
Origins: report.MakeIDList(
test.Client54001NodeID,
test.ClientProcess1NodeID,
test.ClientHostNodeID,
fixture.Client54001NodeID,
fixture.ClientProcess1NodeID,
fixture.ClientHostNodeID,
),
Node: report.MakeNode().WithAdjacent(ServerProcessID),
EdgeMetadata: report.EdgeMetadata{
@@ -85,14 +85,14 @@ var (
},
ClientProcess2ID: {
ID: ClientProcess2ID,
LabelMajor: test.Client2Comm,
LabelMinor: fmt.Sprintf("%s (%s)", test.ClientHostID, test.Client2PID),
Rank: test.Client2Comm,
LabelMajor: fixture.Client2Comm,
LabelMinor: fmt.Sprintf("%s (%s)", fixture.ClientHostID, fixture.Client2PID),
Rank: fixture.Client2Comm,
Pseudo: false,
Origins: report.MakeIDList(
test.Client54002NodeID,
test.ClientProcess2NodeID,
test.ClientHostNodeID,
fixture.Client54002NodeID,
fixture.ClientProcess2NodeID,
fixture.ClientHostNodeID,
),
Node: report.MakeNode().WithAdjacent(ServerProcessID),
EdgeMetadata: report.EdgeMetadata{
@@ -103,13 +103,13 @@ var (
ServerProcessID: {
ID: ServerProcessID,
LabelMajor: "apache",
LabelMinor: fmt.Sprintf("%s (%s)", test.ServerHostID, test.ServerPID),
Rank: test.ServerComm,
LabelMinor: fmt.Sprintf("%s (%s)", fixture.ServerHostID, fixture.ServerPID),
Rank: fixture.ServerComm,
Pseudo: false,
Origins: report.MakeIDList(
test.Server80NodeID,
test.ServerProcessNodeID,
test.ServerHostNodeID,
fixture.Server80NodeID,
fixture.ServerProcessNodeID,
fixture.ServerHostNodeID,
),
Node: report.MakeNode(),
EdgeMetadata: report.EdgeMetadata{
@@ -119,14 +119,14 @@ var (
},
nonContainerProcessID: {
ID: nonContainerProcessID,
LabelMajor: test.NonContainerComm,
LabelMinor: fmt.Sprintf("%s (%s)", test.ServerHostID, test.NonContainerPID),
Rank: test.NonContainerComm,
LabelMajor: fixture.NonContainerComm,
LabelMinor: fmt.Sprintf("%s (%s)", fixture.ServerHostID, fixture.NonContainerPID),
Rank: fixture.NonContainerComm,
Pseudo: false,
Origins: report.MakeIDList(
test.NonContainerProcessNodeID,
test.ServerHostNodeID,
test.NonContainerNodeID,
fixture.NonContainerProcessNodeID,
fixture.ServerHostNodeID,
fixture.NonContainerNodeID,
),
Node: report.MakeNode().WithAdjacent(render.TheInternetID),
EdgeMetadata: report.EdgeMetadata{},
@@ -144,11 +144,11 @@ var (
Rank: "curl",
Pseudo: false,
Origins: report.MakeIDList(
test.Client54001NodeID,
test.Client54002NodeID,
test.ClientProcess1NodeID,
test.ClientProcess2NodeID,
test.ClientHostNodeID,
fixture.Client54001NodeID,
fixture.Client54002NodeID,
fixture.ClientProcess1NodeID,
fixture.ClientProcess2NodeID,
fixture.ClientHostNodeID,
),
Node: report.MakeNode().WithAdjacent("apache"),
EdgeMetadata: report.EdgeMetadata{
@@ -163,9 +163,9 @@ var (
Rank: "apache",
Pseudo: false,
Origins: report.MakeIDList(
test.Server80NodeID,
test.ServerProcessNodeID,
test.ServerHostNodeID,
fixture.Server80NodeID,
fixture.ServerProcessNodeID,
fixture.ServerHostNodeID,
),
Node: report.MakeNode(),
EdgeMetadata: report.EdgeMetadata{
@@ -173,16 +173,16 @@ var (
IngressByteCount: newu64(2100),
},
},
test.NonContainerComm: {
ID: test.NonContainerComm,
LabelMajor: test.NonContainerComm,
fixture.NonContainerComm: {
ID: fixture.NonContainerComm,
LabelMajor: fixture.NonContainerComm,
LabelMinor: "1 process",
Rank: test.NonContainerComm,
Rank: fixture.NonContainerComm,
Pseudo: false,
Origins: report.MakeIDList(
test.NonContainerProcessNodeID,
test.ServerHostNodeID,
test.NonContainerNodeID,
fixture.NonContainerProcessNodeID,
fixture.ServerHostNodeID,
fixture.NonContainerNodeID,
),
Node: report.MakeNode().WithAdjacent(render.TheInternetID),
EdgeMetadata: report.EdgeMetadata{},
@@ -193,39 +193,39 @@ var (
}).Prune()
RenderedContainers = (render.RenderableNodes{
test.ClientContainerID: {
ID: test.ClientContainerID,
fixture.ClientContainerID: {
ID: fixture.ClientContainerID,
LabelMajor: "client",
LabelMinor: test.ClientHostName,
Rank: test.ClientContainerImageName,
LabelMinor: fixture.ClientHostName,
Rank: fixture.ClientContainerImageName,
Pseudo: false,
Origins: report.MakeIDList(
test.ClientContainerImageNodeID,
test.ClientContainerNodeID,
test.Client54001NodeID,
test.Client54002NodeID,
test.ClientProcess1NodeID,
test.ClientProcess2NodeID,
test.ClientHostNodeID,
fixture.ClientContainerImageNodeID,
fixture.ClientContainerNodeID,
fixture.Client54001NodeID,
fixture.Client54002NodeID,
fixture.ClientProcess1NodeID,
fixture.ClientProcess2NodeID,
fixture.ClientHostNodeID,
),
Node: report.MakeNode().WithAdjacent(test.ServerContainerID),
Node: report.MakeNode().WithAdjacent(fixture.ServerContainerID),
EdgeMetadata: report.EdgeMetadata{
EgressPacketCount: newu64(30),
EgressByteCount: newu64(300),
},
},
test.ServerContainerID: {
ID: test.ServerContainerID,
fixture.ServerContainerID: {
ID: fixture.ServerContainerID,
LabelMajor: "server",
LabelMinor: test.ServerHostName,
Rank: test.ServerContainerImageName,
LabelMinor: fixture.ServerHostName,
Rank: fixture.ServerContainerImageName,
Pseudo: false,
Origins: report.MakeIDList(
test.ServerContainerImageNodeID,
test.ServerContainerNodeID,
test.Server80NodeID,
test.ServerProcessNodeID,
test.ServerHostNodeID,
fixture.ServerContainerImageNodeID,
fixture.ServerContainerNodeID,
fixture.Server80NodeID,
fixture.ServerProcessNodeID,
fixture.ServerHostNodeID,
),
Node: report.MakeNode(),
EdgeMetadata: report.EdgeMetadata{
@@ -236,54 +236,54 @@ var (
uncontainedServerID: {
ID: uncontainedServerID,
LabelMajor: render.UncontainedMajor,
LabelMinor: test.ServerHostName,
LabelMinor: fixture.ServerHostName,
Rank: "",
Pseudo: true,
Origins: report.MakeIDList(
test.NonContainerProcessNodeID,
test.ServerHostNodeID,
test.NonContainerNodeID,
fixture.NonContainerProcessNodeID,
fixture.ServerHostNodeID,
fixture.NonContainerNodeID,
),
Node: report.MakeNode().WithAdjacent(render.TheInternetID),
EdgeMetadata: report.EdgeMetadata{},
},
render.TheInternetID: theInternetNode(test.ServerContainerID),
render.TheInternetID: theInternetNode(fixture.ServerContainerID),
}).Prune()
RenderedContainerImages = (render.RenderableNodes{
test.ClientContainerImageName: {
ID: test.ClientContainerImageName,
LabelMajor: test.ClientContainerImageName,
fixture.ClientContainerImageName: {
ID: fixture.ClientContainerImageName,
LabelMajor: fixture.ClientContainerImageName,
LabelMinor: "1 container",
Rank: test.ClientContainerImageName,
Rank: fixture.ClientContainerImageName,
Pseudo: false,
Origins: report.MakeIDList(
test.ClientContainerImageNodeID,
test.ClientContainerNodeID,
test.Client54001NodeID,
test.Client54002NodeID,
test.ClientProcess1NodeID,
test.ClientProcess2NodeID,
test.ClientHostNodeID,
fixture.ClientContainerImageNodeID,
fixture.ClientContainerNodeID,
fixture.Client54001NodeID,
fixture.Client54002NodeID,
fixture.ClientProcess1NodeID,
fixture.ClientProcess2NodeID,
fixture.ClientHostNodeID,
),
Node: report.MakeNode().WithAdjacent(test.ServerContainerImageName),
Node: report.MakeNode().WithAdjacent(fixture.ServerContainerImageName),
EdgeMetadata: report.EdgeMetadata{
EgressPacketCount: newu64(30),
EgressByteCount: newu64(300),
},
},
test.ServerContainerImageName: {
ID: test.ServerContainerImageName,
LabelMajor: test.ServerContainerImageName,
fixture.ServerContainerImageName: {
ID: fixture.ServerContainerImageName,
LabelMajor: fixture.ServerContainerImageName,
LabelMinor: "1 container",
Rank: test.ServerContainerImageName,
Rank: fixture.ServerContainerImageName,
Pseudo: false,
Origins: report.MakeIDList(
test.ServerContainerImageNodeID,
test.ServerContainerNodeID,
test.Server80NodeID,
test.ServerProcessNodeID,
test.ServerHostNodeID),
fixture.ServerContainerImageNodeID,
fixture.ServerContainerNodeID,
fixture.Server80NodeID,
fixture.ServerProcessNodeID,
fixture.ServerHostNodeID),
Node: report.MakeNode(),
EdgeMetadata: report.EdgeMetadata{
IngressPacketCount: newu64(210),
@@ -293,24 +293,24 @@ var (
uncontainedServerID: {
ID: uncontainedServerID,
LabelMajor: render.UncontainedMajor,
LabelMinor: test.ServerHostName,
LabelMinor: fixture.ServerHostName,
Rank: "",
Pseudo: true,
Origins: report.MakeIDList(
test.NonContainerNodeID,
test.NonContainerProcessNodeID,
test.ServerHostNodeID,
fixture.NonContainerNodeID,
fixture.NonContainerProcessNodeID,
fixture.ServerHostNodeID,
),
Node: report.MakeNode().WithAdjacent(render.TheInternetID),
EdgeMetadata: report.EdgeMetadata{},
},
render.TheInternetID: theInternetNode(test.ServerContainerImageName),
render.TheInternetID: theInternetNode(fixture.ServerContainerImageName),
}).Prune()
ServerHostRenderedID = render.MakeHostID(test.ServerHostID)
ClientHostRenderedID = render.MakeHostID(test.ClientHostID)
pseudoHostID1 = render.MakePseudoNodeID(test.UnknownClient1IP, test.ServerIP)
pseudoHostID2 = render.MakePseudoNodeID(test.UnknownClient3IP, test.ServerIP)
ServerHostRenderedID = render.MakeHostID(fixture.ServerHostID)
ClientHostRenderedID = render.MakeHostID(fixture.ClientHostID)
pseudoHostID1 = render.MakePseudoNodeID(fixture.UnknownClient1IP, fixture.ServerIP)
pseudoHostID2 = render.MakePseudoNodeID(fixture.UnknownClient3IP, fixture.ServerIP)
RenderedHosts = (render.RenderableNodes{
ServerHostRenderedID: {
@@ -320,8 +320,8 @@ var (
Rank: "hostname.com",
Pseudo: false,
Origins: report.MakeIDList(
test.ServerHostNodeID,
test.ServerAddressNodeID,
fixture.ServerHostNodeID,
fixture.ServerAddressNodeID,
),
Node: report.MakeNode(),
EdgeMetadata: report.EdgeMetadata{
@@ -335,8 +335,8 @@ var (
Rank: "hostname.com",
Pseudo: false,
Origins: report.MakeIDList(
test.ClientHostNodeID,
test.ClientAddressNodeID,
fixture.ClientHostNodeID,
fixture.ClientAddressNodeID,
),
Node: report.MakeNode().WithAdjacent(ServerHostRenderedID),
EdgeMetadata: report.EdgeMetadata{
@@ -345,19 +345,19 @@ var (
},
pseudoHostID1: {
ID: pseudoHostID1,
LabelMajor: test.UnknownClient1IP,
LabelMajor: fixture.UnknownClient1IP,
Pseudo: true,
Node: report.MakeNode().WithAdjacent(ServerHostRenderedID),
EdgeMetadata: report.EdgeMetadata{},
Origins: report.MakeIDList(test.UnknownAddress1NodeID, test.UnknownAddress2NodeID),
Origins: report.MakeIDList(fixture.UnknownAddress1NodeID, fixture.UnknownAddress2NodeID),
},
pseudoHostID2: {
ID: pseudoHostID2,
LabelMajor: test.UnknownClient3IP,
LabelMajor: fixture.UnknownClient3IP,
Pseudo: true,
Node: report.MakeNode().WithAdjacent(ServerHostRenderedID),
EdgeMetadata: report.EdgeMetadata{},
Origins: report.MakeIDList(test.UnknownAddress3NodeID),
Origins: report.MakeIDList(fixture.UnknownAddress3NodeID),
},
render.TheInternetID: {
ID: render.TheInternetID,
@@ -365,7 +365,7 @@ var (
Pseudo: true,
Node: report.MakeNode().WithAdjacent(ServerHostRenderedID),
EdgeMetadata: report.EdgeMetadata{},
Origins: report.MakeIDList(test.RandomAddressNodeID),
Origins: report.MakeIDList(fixture.RandomAddressNodeID),
},
}).Prune()
@@ -377,14 +377,14 @@ var (
Rank: "ping/pong-a",
Pseudo: false,
Origins: report.MakeIDList(
test.Client54001NodeID,
test.Client54002NodeID,
test.ClientProcess1NodeID,
test.ClientProcess2NodeID,
test.ClientHostNodeID,
test.ClientContainerNodeID,
test.ClientContainerImageNodeID,
test.ClientPodNodeID,
fixture.Client54001NodeID,
fixture.Client54002NodeID,
fixture.ClientProcess1NodeID,
fixture.ClientProcess2NodeID,
fixture.ClientHostNodeID,
fixture.ClientContainerNodeID,
fixture.ClientContainerImageNodeID,
fixture.ClientPodNodeID,
),
Node: report.MakeNode().WithAdjacent("ping/pong-b"),
EdgeMetadata: report.EdgeMetadata{
@@ -399,12 +399,12 @@ var (
Rank: "ping/pong-b",
Pseudo: false,
Origins: report.MakeIDList(
test.Server80NodeID,
test.ServerPodNodeID,
test.ServerProcessNodeID,
test.ServerContainerNodeID,
test.ServerHostNodeID,
test.ServerContainerImageNodeID,
fixture.Server80NodeID,
fixture.ServerPodNodeID,
fixture.ServerProcessNodeID,
fixture.ServerContainerNodeID,
fixture.ServerHostNodeID,
fixture.ServerContainerImageNodeID,
),
Node: report.MakeNode(),
EdgeMetadata: report.EdgeMetadata{
@@ -415,13 +415,13 @@ var (
uncontainedServerID: {
ID: uncontainedServerID,
LabelMajor: render.UncontainedMajor,
LabelMinor: test.ServerHostName,
LabelMinor: fixture.ServerHostName,
Rank: "",
Pseudo: true,
Origins: report.MakeIDList(
test.ServerHostNodeID,
test.NonContainerProcessNodeID,
test.NonContainerNodeID,
fixture.ServerHostNodeID,
fixture.NonContainerProcessNodeID,
fixture.NonContainerNodeID,
),
Node: report.MakeNode().WithAdjacent(render.TheInternetID),
EdgeMetadata: report.EdgeMetadata{},
@@ -436,37 +436,37 @@ var (
EgressByteCount: newu64(600),
},
Origins: report.MakeIDList(
test.RandomClientNodeID,
test.GoogleEndpointNodeID,
fixture.RandomClientNodeID,
fixture.GoogleEndpointNodeID,
),
},
}).Prune()
RenderedPodServices = (render.RenderableNodes{
"ping/pongservice": {
ID: test.ServiceID,
ID: fixture.ServiceID,
LabelMajor: "pongservice",
LabelMinor: "2 pods",
Rank: test.ServiceID,
Rank: fixture.ServiceID,
Pseudo: false,
Origins: report.MakeIDList(
test.Client54001NodeID,
test.Client54002NodeID,
test.ClientProcess1NodeID,
test.ClientProcess2NodeID,
test.ClientHostNodeID,
test.ClientContainerNodeID,
test.ClientContainerImageNodeID,
test.ClientPodNodeID,
test.Server80NodeID,
test.ServerPodNodeID,
test.ServiceNodeID,
test.ServerProcessNodeID,
test.ServerContainerNodeID,
test.ServerHostNodeID,
test.ServerContainerImageNodeID,
fixture.Client54001NodeID,
fixture.Client54002NodeID,
fixture.ClientProcess1NodeID,
fixture.ClientProcess2NodeID,
fixture.ClientHostNodeID,
fixture.ClientContainerNodeID,
fixture.ClientContainerImageNodeID,
fixture.ClientPodNodeID,
fixture.Server80NodeID,
fixture.ServerPodNodeID,
fixture.ServiceNodeID,
fixture.ServerProcessNodeID,
fixture.ServerContainerNodeID,
fixture.ServerHostNodeID,
fixture.ServerContainerImageNodeID,
),
Node: report.MakeNode().WithAdjacent(test.ServiceID), // ?? Shouldn't be adjacent to itself?
Node: report.MakeNode().WithAdjacent(fixture.ServiceID), // ?? Shouldn't be adjacent to itself?
EdgeMetadata: report.EdgeMetadata{
EgressPacketCount: newu64(30),
EgressByteCount: newu64(300),
@@ -477,13 +477,13 @@ var (
uncontainedServerID: {
ID: uncontainedServerID,
LabelMajor: render.UncontainedMajor,
LabelMinor: test.ServerHostName,
LabelMinor: fixture.ServerHostName,
Rank: "",
Pseudo: true,
Origins: report.MakeIDList(
test.ServerHostNodeID,
test.NonContainerProcessNodeID,
test.NonContainerNodeID,
fixture.ServerHostNodeID,
fixture.NonContainerProcessNodeID,
fixture.NonContainerNodeID,
),
Node: report.MakeNode().WithAdjacent(render.TheInternetID),
EdgeMetadata: report.EdgeMetadata{},
@@ -492,14 +492,14 @@ var (
ID: render.TheInternetID,
LabelMajor: render.TheInternetMajor,
Pseudo: true,
Node: report.MakeNode().WithAdjacent(test.ServiceID),
Node: report.MakeNode().WithAdjacent(fixture.ServiceID),
EdgeMetadata: report.EdgeMetadata{
EgressPacketCount: newu64(60),
EgressByteCount: newu64(600),
},
Origins: report.MakeIDList(
test.RandomClientNodeID,
test.GoogleEndpointNodeID,
fixture.RandomClientNodeID,
fixture.GoogleEndpointNodeID,
),
},
}).Prune()

View File

@@ -9,10 +9,11 @@ import (
"github.com/weaveworks/scope/render"
"github.com/weaveworks/scope/render/expected"
"github.com/weaveworks/scope/test"
"github.com/weaveworks/scope/test/fixture"
)
func TestProcessRenderer(t *testing.T) {
have := render.ProcessRenderer.Render(test.Report).Prune()
have := render.ProcessRenderer.Render(fixture.Report).Prune()
want := expected.RenderedProcesses
if !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
@@ -20,7 +21,7 @@ func TestProcessRenderer(t *testing.T) {
}
func TestProcessNameRenderer(t *testing.T) {
have := render.ProcessNameRenderer.Render(test.Report).Prune()
have := render.ProcessNameRenderer.Render(fixture.Report).Prune()
want := expected.RenderedProcessNames
if !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
@@ -28,7 +29,7 @@ func TestProcessNameRenderer(t *testing.T) {
}
func TestContainerRenderer(t *testing.T) {
have := (render.ContainerWithImageNameRenderer.Render(test.Report)).Prune()
have := (render.ContainerWithImageNameRenderer.Render(fixture.Report)).Prune()
want := expected.RenderedContainers
if !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
@@ -38,18 +39,18 @@ func TestContainerRenderer(t *testing.T) {
func TestContainerFilterRenderer(t *testing.T) {
// tag on of the containers in the topology and ensure
// it is filtered out correctly.
input := test.Report.Copy()
input.Container.Nodes[test.ClientContainerNodeID].Metadata[docker.LabelPrefix+"works.weave.role"] = "system"
input := fixture.Report.Copy()
input.Container.Nodes[fixture.ClientContainerNodeID].Metadata[docker.LabelPrefix+"works.weave.role"] = "system"
have := render.FilterSystem(render.ContainerWithImageNameRenderer).Render(input).Prune()
want := expected.RenderedContainers.Copy()
delete(want, test.ClientContainerID)
delete(want, fixture.ClientContainerID)
if !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
}
}
func TestContainerImageRenderer(t *testing.T) {
have := render.ContainerImageRenderer.Render(test.Report).Prune()
have := render.ContainerImageRenderer.Render(fixture.Report).Prune()
want := expected.RenderedContainerImages
if !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
@@ -57,7 +58,7 @@ func TestContainerImageRenderer(t *testing.T) {
}
func TestHostRenderer(t *testing.T) {
have := render.HostRenderer.Render(test.Report).Prune()
have := render.HostRenderer.Render(fixture.Report).Prune()
want := expected.RenderedHosts
if !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
@@ -65,7 +66,7 @@ func TestHostRenderer(t *testing.T) {
}
func TestPodRenderer(t *testing.T) {
have := render.PodRenderer.Render(test.Report).Prune()
have := render.PodRenderer.Render(fixture.Report).Prune()
want := expected.RenderedPods
if !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
@@ -75,22 +76,22 @@ func TestPodRenderer(t *testing.T) {
func TestPodFilterRenderer(t *testing.T) {
// tag on containers or pod namespace in the topology and ensure
// it is filtered out correctly.
input := test.Report.Copy()
input.Pod.Nodes[test.ClientPodNodeID].Metadata[kubernetes.PodID] = "kube-system/foo"
input.Pod.Nodes[test.ClientPodNodeID].Metadata[kubernetes.Namespace] = "kube-system"
input.Pod.Nodes[test.ClientPodNodeID].Metadata[kubernetes.PodName] = "foo"
input.Container.Nodes[test.ClientContainerNodeID].Metadata[docker.LabelPrefix+"io.kubernetes.pod.name"] = "kube-system/foo"
input := fixture.Report.Copy()
input.Pod.Nodes[fixture.ClientPodNodeID].Metadata[kubernetes.PodID] = "kube-system/foo"
input.Pod.Nodes[fixture.ClientPodNodeID].Metadata[kubernetes.Namespace] = "kube-system"
input.Pod.Nodes[fixture.ClientPodNodeID].Metadata[kubernetes.PodName] = "foo"
input.Container.Nodes[fixture.ClientContainerNodeID].Metadata[docker.LabelPrefix+"io.kubernetes.pod.name"] = "kube-system/foo"
have := render.FilterSystem(render.PodRenderer).Render(input).Prune()
want := expected.RenderedPods.Copy()
delete(want, test.ClientPodID)
delete(want, test.ClientContainerID)
delete(want, fixture.ClientPodID)
delete(want, fixture.ClientContainerID)
if !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
}
}
func TestPodServiceRenderer(t *testing.T) {
have := render.PodServiceRenderer.Render(test.Report).Prune()
have := render.PodServiceRenderer.Render(fixture.Report).Prune()
want := expected.RenderedPodServices
if !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))

View File

@@ -1,4 +1,4 @@
package test
package fixture
import (
"time"

View File

@@ -29,7 +29,7 @@ cached_image_rev() {
has_changes() {
local rev1=$1
local rev2=$2
local changes=$(git log --oneline $rev1..$rev2 -- $INPUTFILES | wc -l)
local changes=$(git diff --oneline $rev1..$rev2 -- $INPUTFILES | wc -l)
[ "$changes" -gt 0 ]
}