From b9afa67ad60c58c990f4ca816b02cf1c64d3e76a Mon Sep 17 00:00:00 2001 From: Peter Bourgon Date: Tue, 14 Jul 2015 18:19:58 +0200 Subject: [PATCH 1/8] gopacket-based traffic sniffing --- Makefile | 2 +- app/api_topology.go | 5 +- app/api_topology_test.go | 22 +- circle.yml | 5 +- experimental/demoprobe/generate.go | 8 +- experimental/genreport/generate.go | 8 +- ...tainer_test.go => container_linux_test.go} | 0 probe/endpoint/reporter.go | 22 +- probe/main.go | 36 +++ probe/process/walker_test.go | 1 - probe/sniff/sniffer.go | 251 ++++++++++++++++++ probe/sniff/sniffer_internal_test.go | 45 ++++ probe/sniff/sniffer_test.go | 137 ++++++++++ probe/sniff/source.go | 24 ++ probe/sniff/testclient/main.go | 87 ++++++ render/detailed_node.go | 12 +- render/detailed_node_test.go | 4 +- render/expected/expected.go | 130 ++++----- render/metadata.go | 46 ---- render/metadata_test.go | 87 ------ render/render.go | 26 +- render/render_test.go | 30 +-- render/renderable_node.go | 64 ++--- render/topologies_test.go | 10 +- report/merge.go | 59 ++-- report/merge_test.go | 84 +++--- report/report.go | 100 +++++-- report/report_test.go | 27 ++ report/topology.go | 14 +- test/report_fixture.go | 47 ++-- 30 files changed, 953 insertions(+), 440 deletions(-) rename probe/docker/{container_test.go => container_linux_test.go} (100%) create mode 100644 probe/sniff/sniffer.go create mode 100644 probe/sniff/sniffer_internal_test.go create mode 100644 probe/sniff/sniffer_test.go create mode 100644 probe/sniff/source.go create mode 100644 probe/sniff/testclient/main.go delete mode 100644 render/metadata.go delete mode 100644 render/metadata_test.go create mode 100644 report/report_test.go diff --git a/Makefile b/Makefile index e7eda9e2f..e3b07e4f1 100644 --- a/Makefile +++ b/Makefile @@ -26,7 +26,7 @@ $(APP_EXE): app/*.go render/*.go report/*.go xfer/*.go $(PROBE_EXE): probe/*.go probe/docker/*.go probe/endpoint/*.go probe/host/*.go probe/process/*.go probe/overlay/*.go report/*.go xfer/*.go $(APP_EXE) $(PROBE_EXE): - go get -tags netgo ./$(@D) + go get -d -tags netgo ./$(@D) go build -ldflags "-extldflags \"-static\" -X main.version $(SCOPE_VERSION)" -tags netgo -o $@ ./$(@D) @strings $@ | grep cgo_stub\\\.go >/dev/null || { \ rm $@; \ diff --git a/app/api_topology.go b/app/api_topology.go index c68801438..9953414b2 100644 --- a/app/api_topology.go +++ b/app/api_topology.go @@ -8,6 +8,7 @@ import ( "github.com/gorilla/websocket" "github.com/weaveworks/scope/render" + "github.com/weaveworks/scope/report" ) const ( @@ -27,7 +28,7 @@ type APINode struct { // APIEdge is returned by the /api/topology/*/*/* handlers. type APIEdge struct { - Metadata render.AggregateMetadata `json:"metadata"` + Metadata report.EdgeMetadata `json:"metadata"` } // Full topology. @@ -76,7 +77,7 @@ func handleEdge(rep Reporter, t topologyView, w http.ResponseWriter, r *http.Req localID = vars["local"] remoteID = vars["remote"] rpt = rep.Report() - metadata = t.renderer.AggregateMetadata(rpt, localID, remoteID) + metadata = t.renderer.EdgeMetadata(rpt, localID, remoteID) ) respondWith(w, http.StatusOK, APIEdge{Metadata: metadata}) diff --git a/app/api_topology_test.go b/app/api_topology_test.go index fb219daa5..590b9fa02 100644 --- a/app/api_topology_test.go +++ b/app/api_topology_test.go @@ -12,6 +12,7 @@ import ( "github.com/weaveworks/scope/render" "github.com/weaveworks/scope/render/expected" + "github.com/weaveworks/scope/report" "github.com/weaveworks/scope/test" ) @@ -71,6 +72,7 @@ func TestAPITopologyApplications(t *testing.T) { if err := json.Unmarshal(body, &topo); err != nil { t.Fatal(err) } + if want, have := render.OnlyConnected(expected.RenderedProcesses), fixNodeMetadatas(topo.Nodes); !reflect.DeepEqual(want, have) { t.Error(test.Diff(want, have)) } @@ -93,9 +95,9 @@ func TestAPITopologyApplications(t *testing.T) { if err := json.Unmarshal(body, &edge); err != nil { t.Fatalf("JSON parse error: %s", err) } - if want, have := (render.AggregateMetadata{ - "egress_bytes": 10, - "ingress_bytes": 100, + if want, have := (report.EdgeMetadata{ + PacketCount: newu64(100), + ByteCount: newu64(10), }), edge.Metadata; !reflect.DeepEqual(want, have) { t.Error(test.Diff(want, have)) } @@ -112,7 +114,8 @@ func TestAPITopologyHosts(t *testing.T) { if err := json.Unmarshal(body, &topo); err != nil { t.Fatal(err) } - if want, have := (expected.RenderedHosts), fixNodeMetadatas(topo.Nodes); !reflect.DeepEqual(want, have) { + + if want, have := expected.RenderedHosts, fixNodeMetadatas(topo.Nodes); !reflect.DeepEqual(want, have) { t.Error(test.Diff(want, have)) } } @@ -134,11 +137,10 @@ func TestAPITopologyHosts(t *testing.T) { if err := json.Unmarshal(body, &edge); err != nil { t.Fatalf("JSON parse error: %s", err) } - want := render.AggregateMetadata{ - "max_conn_count_tcp": 3, - } - if !reflect.DeepEqual(want, edge.Metadata) { - t.Errorf("Edge metadata error. Want %v, have %v", want, edge) + if want, have := (report.EdgeMetadata{ + MaxConnCountTCP: newu64(3), + }), edge.Metadata; !reflect.DeepEqual(want, have) { + t.Error(test.Diff(want, have)) } } } @@ -176,3 +178,5 @@ func TestAPITopologyWebsocket(t *testing.T) { equals(t, 0, len(d.Update)) equals(t, 0, len(d.Remove)) } + +func newu64(value uint64) *uint64 { return &value } diff --git a/circle.yml b/circle.yml index d8a3a8f57..d5241c65a 100644 --- a/circle.yml +++ b/circle.yml @@ -19,7 +19,7 @@ dependencies: cache_directories: - "~/docker" override: - - sudo apt-get --only-upgrade install tar + - sudo apt-get --only-upgrade install tar libpcap0.8-dev - bin/rebuild-ui-build-image - curl https://sdk.cloud.google.com | bash - test -z "$SECRET_PASSWORD" || bin/setup-circleci-secrets "$SECRET_PASSWORD" @@ -40,8 +40,7 @@ test: - cd $SRCDIR; ./bin/lint . - cd $SRCDIR; make client-test - cd $SRCDIR; make static - - cd $SRCDIR; rm -f app/app probe/probe; GOOS=darwin make && make clean - - cd $SRCDIR; rm -f app/app probe/probe; GOOS=linux make + - cd $SRCDIR; rm -f app/scope-app probe/scope-probe; make - cd $SRCDIR; ./bin/test -slow - cd $SRCDIR/experimental; make - test -z "$SECRET_PASSWORD" || (cd $SRCDIR/integration; ./gce.sh setup) diff --git a/experimental/demoprobe/generate.go b/experimental/demoprobe/generate.go index e87ccc8da..c2aa97f0e 100644 --- a/experimental/demoprobe/generate.go +++ b/experimental/demoprobe/generate.go @@ -91,12 +91,10 @@ func DemoReport(nodeCount int) report.Report { edgeKeyIngress = report.MakeEdgeID(dstPortID, srcPortID) ) r.Endpoint.EdgeMetadatas[edgeKeyEgress] = report.EdgeMetadata{ - WithConnCountTCP: true, - MaxConnCountTCP: uint(rand.Intn(100) + 10), + MaxConnCountTCP: newu64(uint64(rand.Intn(100) + 10)), } r.Endpoint.EdgeMetadatas[edgeKeyIngress] = report.EdgeMetadata{ - WithConnCountTCP: true, - MaxConnCountTCP: uint(rand.Intn(100) + 10), + MaxConnCountTCP: newu64(uint64(rand.Intn(100) + 10)), } // Address topology @@ -124,3 +122,5 @@ func DemoReport(nodeCount int) report.Report { return r } + +func newu64(value uint64) *uint64 { return &value } diff --git a/experimental/genreport/generate.go b/experimental/genreport/generate.go index 31791468a..520997b05 100644 --- a/experimental/genreport/generate.go +++ b/experimental/genreport/generate.go @@ -89,12 +89,10 @@ func DemoReport(nodeCount int) report.Report { edgeKeyIngress = report.MakeEdgeID(dstPortID, srcPortID) ) r.Endpoint.EdgeMetadatas[edgeKeyEgress] = report.EdgeMetadata{ - WithConnCountTCP: true, - MaxConnCountTCP: uint(rand.Intn(100) + 10), + MaxConnCountTCP: newu64(uint64(rand.Intn(100) + 10)), } r.Endpoint.EdgeMetadatas[edgeKeyIngress] = report.EdgeMetadata{ - WithConnCountTCP: true, - MaxConnCountTCP: uint(rand.Intn(100) + 10), + MaxConnCountTCP: newu64(uint64(rand.Intn(100) + 10)), } // Address topology @@ -122,3 +120,5 @@ func DemoReport(nodeCount int) report.Report { return r } + +func newu64(value uint64) *uint64 { return &value } diff --git a/probe/docker/container_test.go b/probe/docker/container_linux_test.go similarity index 100% rename from probe/docker/container_test.go rename to probe/docker/container_linux_test.go diff --git a/probe/endpoint/reporter.go b/probe/endpoint/reporter.go index 7e99a6398..8912319f9 100644 --- a/probe/endpoint/reporter.go +++ b/probe/endpoint/reporter.go @@ -79,15 +79,15 @@ func (r *Reporter) addConnection(rpt *report.Report, c *procspy.Connection) { var ( localAddressNodeID = report.MakeAddressNodeID(r.hostID, c.LocalAddress.String()) remoteAddressNodeID = report.MakeAddressNodeID(r.hostID, c.RemoteAddress.String()) - adjacencyID = report.MakeAdjacencyID(localAddressNodeID) + adjecencyID = report.MakeAdjacencyID(localAddressNodeID) edgeID = report.MakeEdgeID(localAddressNodeID, remoteAddressNodeID) ) - rpt.Address.Adjacency[adjacencyID] = rpt.Address.Adjacency[adjacencyID].Add(remoteAddressNodeID) + rpt.Address.Adjacency[adjecencyID] = rpt.Address.Adjacency[adjecencyID].Add(remoteAddressNodeID) if _, ok := rpt.Address.NodeMetadatas[localAddressNodeID]; !ok { rpt.Address.NodeMetadatas[localAddressNodeID] = report.MakeNodeMetadataWith(map[string]string{ - "name": r.hostName, // TODO this is ambiguous, be more specific + "name": r.hostName, Addr: c.LocalAddress.String(), }) } @@ -98,11 +98,11 @@ func (r *Reporter) addConnection(rpt *report.Report, c *procspy.Connection) { var ( localEndpointNodeID = report.MakeEndpointNodeID(r.hostID, c.LocalAddress.String(), strconv.Itoa(int(c.LocalPort))) remoteEndpointNodeID = report.MakeEndpointNodeID(r.hostID, c.RemoteAddress.String(), strconv.Itoa(int(c.RemotePort))) - adjacencyID = report.MakeAdjacencyID(localEndpointNodeID) + adjecencyID = report.MakeAdjacencyID(localEndpointNodeID) edgeID = report.MakeEdgeID(localEndpointNodeID, remoteEndpointNodeID) ) - rpt.Endpoint.Adjacency[adjacencyID] = rpt.Endpoint.Adjacency[adjacencyID].Add(remoteEndpointNodeID) + rpt.Endpoint.Adjacency[adjecencyID] = rpt.Endpoint.Adjacency[adjecencyID].Add(remoteEndpointNodeID) if _, ok := rpt.Endpoint.NodeMetadatas[localEndpointNodeID]; !ok { // First hit establishes NodeMetadata for scoped local address + port @@ -119,9 +119,11 @@ func (r *Reporter) addConnection(rpt *report.Report, c *procspy.Connection) { } } -func countTCPConnection(m report.EdgeMetadatas, edgeKey string) { - edgeMeta := m[edgeKey] - edgeMeta.WithConnCountTCP = true - edgeMeta.MaxConnCountTCP++ - m[edgeKey] = edgeMeta +func countTCPConnection(mds report.EdgeMetadatas, key string) { + md := mds[key] + if md.MaxConnCountTCP == nil { + md.MaxConnCountTCP = new(uint64) + } + *md.MaxConnCountTCP++ + mds[key] = md } diff --git a/probe/main.go b/probe/main.go index 19c67359a..db5e99ae5 100644 --- a/probe/main.go +++ b/probe/main.go @@ -3,11 +3,13 @@ package main import ( "flag" "log" + "net" "net/http" _ "net/http/pprof" "os" "os/signal" "strconv" + "strings" "syscall" "time" @@ -17,6 +19,7 @@ import ( "github.com/weaveworks/scope/probe/host" "github.com/weaveworks/scope/probe/overlay" "github.com/weaveworks/scope/probe/process" + "github.com/weaveworks/scope/probe/sniff" "github.com/weaveworks/scope/report" "github.com/weaveworks/scope/xfer" ) @@ -36,8 +39,13 @@ func main() { dockerBridge = flag.String("docker.bridge", "docker0", "the docker bridge name") weaveRouterAddr = flag.String("weave.router.addr", "", "IP address or FQDN of the Weave router") procRoot = flag.String("proc.root", "/proc", "location of the proc filesystem") + captureEnabled = flag.Bool("capture", false, "perform sampled packet capture") + captureInterfaces = flag.String("capture.interfaces", interfaces(), "packet capture on these interfaces") + captureOn = flag.Duration("capture.on", 1*time.Second, "packet capture duty cycle 'on'") + captureOff = flag.Duration("capture.off", 5*time.Second, "packet capture duty cycle 'off'") ) flag.Parse() + log.SetFlags(log.Lmicroseconds) if len(flag.Args()) != 0 { flag.Usage() @@ -106,6 +114,21 @@ func main() { reporters = append(reporters, weave) } + if *captureEnabled { + if *captureOn > *captureOff { + log.Fatalf("-capture.on (%s) must be <= -capture.off (%s)", *captureOn, *captureOff) + } + for _, iface := range strings.Split(*captureInterfaces, ",") { + source, err := sniff.NewSource(iface) + if err != nil { + log.Printf("warning: %v", err) + continue + } + log.Printf("capturing packets on %s", iface) + reporters = append(reporters, sniff.New(hostID, source, *captureOn, *captureOff)) + } + } + log.Printf("listening on %s", *listen) quit := make(chan struct{}) @@ -153,3 +176,16 @@ func interrupt() chan os.Signal { signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) return c } + +func interfaces() string { + ifaces, err := net.Interfaces() + if err != nil { + log.Print(err) + return "" + } + a := make([]string, 0, len(ifaces)) + for _, iface := range ifaces { + a = append(a, iface.Name) + } + return strings.Join(a, ",") +} diff --git a/probe/process/walker_test.go b/probe/process/walker_test.go index 22af5ea62..37d445891 100644 --- a/probe/process/walker_test.go +++ b/probe/process/walker_test.go @@ -9,7 +9,6 @@ import ( ) func TestBasicWalk(t *testing.T) { - // Don't panic or error. var ( procRoot = "/proc" procFunc = func(process.Process) {} diff --git a/probe/sniff/sniffer.go b/probe/sniff/sniffer.go new file mode 100644 index 000000000..2489ed38f --- /dev/null +++ b/probe/sniff/sniffer.go @@ -0,0 +1,251 @@ +package sniff + +import ( + "io" + "log" + "strconv" + "sync/atomic" + "time" + + "github.com/weaveworks/scope/report" + + "github.com/google/gopacket" + "github.com/google/gopacket/layers" +) + +// Sniffer is a packet-sniffing reporter. +type Sniffer struct { + hostID string + reports chan chan report.Report + parser *gopacket.DecodingLayerParser + decoded []gopacket.LayerType + eth layers.Ethernet + ip4 layers.IPv4 + ip6 layers.IPv6 + tcp layers.TCP + udp layers.UDP + icmp4 layers.ICMPv4 + icmp6 layers.ICMPv6 +} + +// New returns a new sniffing reporter that samples traffic by turning its +// packet capture facilities on and off. Note that the on and off durations +// represent a way to bound CPU burn. Effective sample rate needs to be +// calculated as (packets decoded / packets observed). +func New(hostID string, src gopacket.ZeroCopyPacketDataSource, on, off time.Duration) *Sniffer { + s := &Sniffer{ + hostID: hostID, + reports: make(chan chan report.Report), + } + s.parser = gopacket.NewDecodingLayerParser( + layers.LayerTypeEthernet, + &s.eth, &s.ip4, &s.ip6, &s.tcp, &s.udp, &s.icmp4, &s.icmp6, + ) + go s.loop(src, on, off) + return s +} + +// Report implements the Reporter interface. +func (s *Sniffer) Report() (report.Report, error) { + c := make(chan report.Report) + s.reports <- c + return <-c, nil +} + +func (s *Sniffer) loop(src gopacket.ZeroCopyPacketDataSource, on, off time.Duration) { + var ( + process = uint64(1) // initially enabled + total = uint64(0) // total packets seen + count = uint64(0) // count of packets captured + packets = make(chan Packet, 1024) // decoded packets + rpt = report.MakeReport() // the report we build + turnOn = (<-chan time.Time)(nil) // signal to start capture (initially enabled) + turnOff = time.After(on) // signal to stop capture + done = make(chan struct{}) // when src is finished, we're done too + ) + + go func() { + s.read(src, packets, &process, &total, &count) + close(done) + }() + + for { + select { + case p := <-packets: + s.Merge(p, rpt) + + case <-turnOn: + atomic.StoreUint64(&process, 1) // enable packet capture + turnOn = nil // disable the on switch + turnOff = time.After(on) // enable the off switch + + case <-turnOff: + atomic.StoreUint64(&process, 0) // disable packet capture + turnOn = time.After(off) // enable the on switch + turnOff = nil // disable the off switch + + case c := <-s.reports: + rpt.Sampling.Count = atomic.LoadUint64(&count) + rpt.Sampling.Total = atomic.LoadUint64(&total) + interpolateCounts(rpt) + c <- rpt + atomic.StoreUint64(&count, 0) + atomic.StoreUint64(&total, 0) + rpt = report.MakeReport() + + case <-done: + return + } + } +} + +// interpolateCounts compensates for sampling by artifically inflating counts +// throughout the report. It should be run once for each report, within the +// probe, before it gets emitted into the rest of the system. +func interpolateCounts(r report.Report) { + rate := r.Sampling.Rate() + if rate >= 1.0 { + return + } + factor := 1.0 / rate + for _, topology := range r.Topologies() { + for _, emd := range topology.EdgeMetadatas { + if emd.PacketCount != nil { + *emd.PacketCount = uint64(float64(*emd.PacketCount) * factor) + } + if emd.ByteCount != nil { + *emd.ByteCount = uint64(float64(*emd.ByteCount) * factor) + } + } + } +} + +// Packet is an intermediate, decoded form of a packet, with the information +// that the Scope data model cares about. Designed to decouple the packet data +// source loop, which should be as fast as possible, and the process of +// merging the packet information to a report, which may take some time and +// allocations. +type Packet struct { + SrcIP, DstIP string + SrcPort, DstPort string + Network, Transport int // byte counts +} + +func (s *Sniffer) read(src gopacket.ZeroCopyPacketDataSource, dst chan Packet, process, total, count *uint64) { + var ( + p Packet + data []byte + err error + ) + for { + data, _, err = src.ZeroCopyReadPacketData() + if err == io.EOF { + return // done + } + if err != nil { + log.Printf("sniffer: read: %v", err) + continue + } + atomic.AddUint64(total, 1) + if atomic.LoadUint64(process) == 0 { + continue + } + + if err := s.parser.DecodeLayers(data, &s.decoded); err != nil { + // We'll always get an error when we encounter a layer type for + // which we haven't configured a decoder. + } + for _, t := range s.decoded { + switch t { + case layers.LayerTypeEthernet: + // + + case layers.LayerTypeICMPv4: + p.Network += len(s.icmp4.Payload) + + case layers.LayerTypeICMPv6: + p.Network += len(s.icmp6.Payload) + + case layers.LayerTypeIPv6: + p.SrcIP = s.ip6.SrcIP.String() + p.DstIP = s.ip6.DstIP.String() + p.Network += len(s.ip6.Payload) + + case layers.LayerTypeIPv4: + p.SrcIP = s.ip4.SrcIP.String() + p.DstIP = s.ip4.DstIP.String() + p.Network += len(s.ip4.Payload) + + case layers.LayerTypeTCP: + p.SrcPort = strconv.Itoa(int(s.tcp.SrcPort)) + p.DstPort = strconv.Itoa(int(s.tcp.DstPort)) + p.Transport += len(s.tcp.Payload) + + case layers.LayerTypeUDP: + p.SrcPort = strconv.Itoa(int(s.udp.SrcPort)) + p.DstPort = strconv.Itoa(int(s.udp.DstPort)) + p.Transport += len(s.udp.Payload) + } + } + + select { + case dst <- p: + atomic.AddUint64(count, 1) + default: + log.Printf("sniffer dropped packet") + } + } +} + +// Merge puts the packet into the report. +func (s *Sniffer) Merge(p Packet, rpt report.Report) { + // With a src and dst IP, we can add to the address topology. + if p.SrcIP != "" && p.DstIP != "" { + var ( + srcNodeID = report.MakeAddressNodeID(s.hostID, p.SrcIP) + dstNodeID = report.MakeAddressNodeID(s.hostID, p.DstIP) + edgeID = report.MakeEdgeID(srcNodeID, dstNodeID) + srcAdjacencyID = report.MakeAdjacencyID(srcNodeID) + ) + rpt.Address.NodeMetadatas[srcNodeID] = report.MakeNodeMetadata() + rpt.Address.NodeMetadatas[dstNodeID] = report.MakeNodeMetadata() + + emd := rpt.Address.EdgeMetadatas[edgeID] + if emd.PacketCount == nil { + emd.PacketCount = new(uint64) + } + *emd.PacketCount++ + if emd.ByteCount == nil { + emd.ByteCount = new(uint64) + } + *emd.ByteCount += uint64(p.Network) + rpt.Address.EdgeMetadatas[edgeID] = emd + + rpt.Address.Adjacency[srcAdjacencyID] = rpt.Address.Adjacency[srcAdjacencyID].Add(dstNodeID) + } + + // With a src and dst IP and port, we can add to the endpoints. + if p.SrcIP != "" && p.DstIP != "" && p.SrcPort != "" && p.DstPort != "" { + var ( + srcNodeID = report.MakeEndpointNodeID(s.hostID, p.SrcIP, p.SrcPort) + dstNodeID = report.MakeEndpointNodeID(s.hostID, p.DstIP, p.DstPort) + edgeID = report.MakeEdgeID(srcNodeID, dstNodeID) + srcAdjacencyID = report.MakeAdjacencyID(srcNodeID) + ) + rpt.Endpoint.NodeMetadatas[srcNodeID] = report.MakeNodeMetadata() + rpt.Endpoint.NodeMetadatas[dstNodeID] = report.MakeNodeMetadata() + + emd := rpt.Endpoint.EdgeMetadatas[edgeID] + if emd.PacketCount == nil { + emd.PacketCount = new(uint64) + } + *emd.PacketCount++ + if emd.ByteCount == nil { + emd.ByteCount = new(uint64) + } + *emd.ByteCount += uint64(p.Transport) + rpt.Endpoint.EdgeMetadatas[edgeID] = emd + + rpt.Endpoint.Adjacency[srcAdjacencyID] = rpt.Endpoint.Adjacency[srcAdjacencyID].Add(dstNodeID) + } +} diff --git a/probe/sniff/sniffer_internal_test.go b/probe/sniff/sniffer_internal_test.go new file mode 100644 index 000000000..ed6d559a2 --- /dev/null +++ b/probe/sniff/sniffer_internal_test.go @@ -0,0 +1,45 @@ +package sniff + +import ( + "testing" + + "github.com/weaveworks/scope/report" +) + +func TestInterpolateCounts(t *testing.T) { + var ( + hostID = "macbook-air" + srcNodeID = report.MakeEndpointNodeID(hostID, "1.2.3.4", "5678") + dstNodeID = report.MakeEndpointNodeID(hostID, "5.6.7.8", "9012") + edgeID = report.MakeEdgeID(srcNodeID, dstNodeID) + samplingCount = uint64(200) + samplingTotal = uint64(2345) + packetCount = uint64(123) + byteCount = uint64(4096) + ) + + r := report.MakeReport() + r.Sampling.Count = samplingCount + r.Sampling.Total = samplingTotal + r.Endpoint.EdgeMetadatas[edgeID] = report.EdgeMetadata{ + PacketCount: newu64(packetCount), + ByteCount: newu64(byteCount), + } + + interpolateCounts(r) + + var ( + rate = float64(samplingCount) / float64(samplingTotal) + factor = 1.0 / rate + apply = func(v uint64) uint64 { return uint64(factor * float64(v)) } + emd = r.Endpoint.EdgeMetadatas[edgeID] + ) + if want, have := apply(packetCount), (*emd.PacketCount); want != have { + t.Errorf("want %d packets, have %d", want, have) + } + if want, have := apply(byteCount), (*emd.ByteCount); want != have { + t.Errorf("want %d bytes, have %d", want, have) + } +} + +func newu64(value uint64) *uint64 { return &value } diff --git a/probe/sniff/sniffer_test.go b/probe/sniff/sniffer_test.go new file mode 100644 index 000000000..3a686e382 --- /dev/null +++ b/probe/sniff/sniffer_test.go @@ -0,0 +1,137 @@ +package sniff_test + +import ( + "io" + "reflect" + "sync" + "testing" + "time" + + "github.com/google/gopacket" + + "github.com/weaveworks/scope/probe/sniff" + "github.com/weaveworks/scope/report" + "github.com/weaveworks/scope/test" +) + +func TestSnifferShutdown(t *testing.T) { + var ( + hostID = "abcd" + src = newMockSource([]byte{}, nil) + on = time.Millisecond + off = time.Millisecond + s = sniff.New(hostID, src, on, off) + ) + + // Stopping the source should terminate the sniffer. + src.Close() + time.Sleep(10 * time.Millisecond) + + // Try to get a report from the sniffer. It should block forever, as the + // loop goroutine should have exited. + report := make(chan struct{}) + go func() { _, _ = s.Report(); close(report) }() + select { + case <-time.After(time.Millisecond): + case <-report: + t.Errorf("shouldn't get report after Close") + } +} + +func TestMerge(t *testing.T) { + var ( + hostID = "xyz" + src = newMockSource([]byte{}, nil) + on = time.Millisecond + off = time.Millisecond + rpt = report.MakeReport() + p = sniff.Packet{ + SrcIP: "1.0.0.0", + SrcPort: "1000", + DstIP: "2.0.0.0", + DstPort: "2000", + Network: 512, + Transport: 256, + } + ) + sniff.New(hostID, src, on, off).Merge(p, rpt) + + var ( + srcEndpointNodeID = report.MakeEndpointNodeID(hostID, p.SrcIP, p.SrcPort) + dstEndpointNodeID = report.MakeEndpointNodeID(hostID, p.DstIP, p.DstPort) + ) + if want, have := (report.Topology{ + Adjacency: report.Adjacency{ + report.MakeAdjacencyID(srcEndpointNodeID): report.MakeIDList( + dstEndpointNodeID, + ), + }, + EdgeMetadatas: report.EdgeMetadatas{ + report.MakeEdgeID(srcEndpointNodeID, dstEndpointNodeID): report.EdgeMetadata{ + PacketCount: newu64(1), + ByteCount: newu64(256), + }, + }, + NodeMetadatas: report.NodeMetadatas{ + srcEndpointNodeID: report.MakeNodeMetadata(), + dstEndpointNodeID: report.MakeNodeMetadata(), + }, + }), rpt.Endpoint; !reflect.DeepEqual(want, have) { + t.Errorf("%s", test.Diff(want, have)) + } + + var ( + srcAddressNodeID = report.MakeAddressNodeID(hostID, p.SrcIP) + dstAddressNodeID = report.MakeAddressNodeID(hostID, p.DstIP) + ) + if want, have := (report.Topology{ + Adjacency: report.Adjacency{ + report.MakeAdjacencyID(srcAddressNodeID): report.MakeIDList( + dstAddressNodeID, + ), + }, + EdgeMetadatas: report.EdgeMetadatas{ + report.MakeEdgeID(srcAddressNodeID, dstAddressNodeID): report.EdgeMetadata{ + PacketCount: newu64(1), + ByteCount: newu64(512), + }, + }, + NodeMetadatas: report.NodeMetadatas{ + srcAddressNodeID: report.MakeNodeMetadata(), + dstAddressNodeID: report.MakeNodeMetadata(), + }, + }), rpt.Address; !reflect.DeepEqual(want, have) { + t.Errorf("%s", test.Diff(want, have)) + } +} + +type mockSource struct { + mtx sync.RWMutex + data []byte + err error +} + +func newMockSource(data []byte, err error) *mockSource { + return &mockSource{ + data: data, + err: err, + } +} + +func (s *mockSource) ZeroCopyReadPacketData() ([]byte, gopacket.CaptureInfo, error) { + s.mtx.RLock() + defer s.mtx.RUnlock() + return s.data, gopacket.CaptureInfo{ + Timestamp: time.Now(), + CaptureLength: len(s.data), + Length: len(s.data), + }, s.err +} + +func (s *mockSource) Close() { + s.mtx.Lock() + defer s.mtx.Unlock() + s.err = io.EOF +} + +func newu64(value uint64) *uint64 { return &value } diff --git a/probe/sniff/source.go b/probe/sniff/source.go new file mode 100644 index 000000000..0ac4a4ff6 --- /dev/null +++ b/probe/sniff/source.go @@ -0,0 +1,24 @@ +package sniff + +import ( + "github.com/google/gopacket" + "github.com/google/gopacket/pcap" +) + +// Source describes a packet data source that can be terminated. +type Source interface { + gopacket.ZeroCopyPacketDataSource + Close() +} + +const ( + snaplen = 65535 + promisc = true + timeout = pcap.BlockForever +) + +// NewSource returns a live packet data source via the passed device +// (interface). +func NewSource(device string) (Source, error) { + return pcap.OpenLive(device, snaplen, promisc, timeout) +} diff --git a/probe/sniff/testclient/main.go b/probe/sniff/testclient/main.go new file mode 100644 index 000000000..8b568ad4f --- /dev/null +++ b/probe/sniff/testclient/main.go @@ -0,0 +1,87 @@ +package main + +import ( + "flag" + "io" + "log" + "time" + + "github.com/google/gopacket" + "github.com/google/gopacket/layers" + "github.com/google/gopacket/pcap" +) + +func main() { + var ( + device = flag.String("device", "eth0", "device to sniff") + ) + flag.Parse() + + const ( + snaplen = 1024 * 1024 + promisc = true + timeout = pcap.BlockForever + ) + handle, err := pcap.OpenLive(*device, snaplen, promisc, timeout) + if err != nil { + log.Fatal(err) + } + + go func() { + time.Sleep(5 * time.Second) + handle.Close() + }() + + var ( + eth layers.Ethernet + ip4 layers.IPv4 + ip6 layers.IPv6 + tcp layers.TCP + udp layers.UDP + icmp4 layers.ICMPv4 + icmp6 layers.ICMPv6 + ) + parser := gopacket.NewDecodingLayerParser( + layers.LayerTypeEthernet, + ð, &ip4, &ip6, &tcp, &udp, &icmp4, &icmp6, + ) + decoded := []gopacket.LayerType{} + + done := make(chan struct{}) + go func() { + defer close(done) + for { + data, ci, err := handle.ZeroCopyReadPacketData() + if err == io.EOF { + log.Print("read: EOF") + return + } + if err != nil { + log.Printf("read: %v", err) + continue + } + log.Println(ci.Timestamp.String()) + err = parser.DecodeLayers(data, &decoded) + for _, t := range decoded { + switch t { + case layers.LayerTypeEthernet: + log.Println(" Ethernet", eth.EthernetType, eth.SrcMAC, eth.DstMAC, eth.Length) + case layers.LayerTypeIPv6: + log.Println(" IP6", ip6.Version, ip6.SrcIP, ip6.DstIP, ip6.Length, ip6.TrafficClass) + case layers.LayerTypeIPv4: + log.Println(" IP4", ip4.Version, ip4.SrcIP, ip4.DstIP, ip4.Length, ip4.TTL, ip4.TOS) + case layers.LayerTypeTCP: + log.Println(" TCP", tcp.SrcPort, tcp.DstPort, tcp.Seq, tcp.Ack, tcp.Window) + case layers.LayerTypeUDP: + log.Println(" UDP", udp.SrcPort, udp.DstPort, udp.Length) + case layers.LayerTypeICMPv4: + log.Println(" ICMP4", icmp4.Id, icmp4.Seq) + case layers.LayerTypeICMPv6: + log.Println(" ICMP6") + } + } + log.Println() + } + }() + <-done +} diff --git a/render/detailed_node.go b/render/detailed_node.go index fbbd79f7d..c1f0def81 100644 --- a/render/detailed_node.go +++ b/render/detailed_node.go @@ -61,14 +61,14 @@ func MakeDetailedNode(r report.Report, n RenderableNode) DetailedNode { tables := tables{} { rows := []Row{} - if val, ok := n.AggregateMetadata[KeyMaxConnCountTCP]; ok { - rows = append(rows, Row{"TCP connections", strconv.FormatInt(int64(val), 10), ""}) + if n.EdgeMetadata.MaxConnCountTCP != nil { + rows = append(rows, Row{"TCP connections", strconv.FormatUint(*n.EdgeMetadata.MaxConnCountTCP, 10), ""}) } - if val, ok := n.AggregateMetadata[KeyBytesIngress]; ok { - rows = append(rows, Row{"Bytes ingress", strconv.FormatInt(int64(val), 10), ""}) + if n.EdgeMetadata.PacketCount != nil { + rows = append(rows, Row{"Packets", strconv.FormatUint(*n.EdgeMetadata.PacketCount, 10), ""}) } - if val, ok := n.AggregateMetadata[KeyBytesEgress]; ok { - rows = append(rows, Row{"Bytes egress", strconv.FormatInt(int64(val), 10), ""}) + if n.EdgeMetadata.ByteCount != nil { + rows = append(rows, Row{"Bytes", strconv.FormatUint(*n.EdgeMetadata.ByteCount, 10), ""}) } if len(rows) > 0 { tables = append(tables, Table{"Connections", true, connectionsRank, rows}) diff --git a/render/detailed_node_test.go b/render/detailed_node_test.go index bc754b4e1..0b3fe9994 100644 --- a/render/detailed_node_test.go +++ b/render/detailed_node_test.go @@ -66,8 +66,8 @@ func TestMakeDetailedNode(t *testing.T) { Numeric: true, Rank: 100, Rows: []render.Row{ - {"Bytes ingress", "150", ""}, - {"Bytes egress", "1500", ""}, + {"Packets", "150", ""}, + {"Bytes", "1500", ""}, }, }, { diff --git a/render/expected/expected.go b/render/expected/expected.go index 577adb04f..7e3b5fd74 100644 --- a/render/expected/expected.go +++ b/render/expected/expected.go @@ -14,25 +14,25 @@ var ( unknownPseudoNode1ID = render.MakePseudoNodeID("10.10.10.10", test.ServerIP, "80") unknownPseudoNode2ID = render.MakePseudoNodeID("10.10.10.11", test.ServerIP, "80") unknownPseudoNode1 = render.RenderableNode{ - ID: unknownPseudoNode1ID, - LabelMajor: "10.10.10.10", - Pseudo: true, - NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{}, + ID: unknownPseudoNode1ID, + LabelMajor: "10.10.10.10", + Pseudo: true, + NodeMetadata: report.MakeNodeMetadata(), + EdgeMetadata: report.EdgeMetadata{}, } unknownPseudoNode2 = render.RenderableNode{ - ID: unknownPseudoNode2ID, - LabelMajor: "10.10.10.11", - Pseudo: true, - NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{}, + ID: unknownPseudoNode2ID, + LabelMajor: "10.10.10.11", + Pseudo: true, + NodeMetadata: report.MakeNodeMetadata(), + EdgeMetadata: report.EdgeMetadata{}, } theInternetNode = render.RenderableNode{ - ID: render.TheInternetID, - LabelMajor: render.TheInternetMajor, - Pseudo: true, - NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{}, + ID: render.TheInternetID, + LabelMajor: render.TheInternetMajor, + Pseudo: true, + NodeMetadata: report.MakeNodeMetadata(), + EdgeMetadata: report.EdgeMetadata{}, } ClientProcess1ID = render.MakeProcessID(test.ClientHostID, test.Client1PID) @@ -54,9 +54,9 @@ var ( test.ClientHostNodeID, ), NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{ - render.KeyBytesIngress: 100, - render.KeyBytesEgress: 10, + EdgeMetadata: report.EdgeMetadata{ + PacketCount: newu64(100), + ByteCount: newu64(10), }, }, ClientProcess2ID: { @@ -72,9 +72,9 @@ var ( test.ClientHostNodeID, ), NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{ - render.KeyBytesIngress: 200, - render.KeyBytesEgress: 20, + EdgeMetadata: report.EdgeMetadata{ + PacketCount: newu64(200), + ByteCount: newu64(20), }, }, ServerProcessID: { @@ -96,9 +96,9 @@ var ( test.ServerHostNodeID, ), NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{ - render.KeyBytesIngress: 150, - render.KeyBytesEgress: 1500, + EdgeMetadata: report.EdgeMetadata{ + PacketCount: newu64(150), + ByteCount: newu64(1500), }, }, nonContainerProcessID: { @@ -112,8 +112,8 @@ var ( test.NonContainerProcessNodeID, test.ServerHostNodeID, ), - NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{}, + NodeMetadata: report.MakeNodeMetadata(), + EdgeMetadata: report.EdgeMetadata{}, }, unknownPseudoNode1ID: unknownPseudoNode1, unknownPseudoNode2ID: unknownPseudoNode2, @@ -136,9 +136,9 @@ var ( test.ClientHostNodeID, ), NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{ - render.KeyBytesIngress: 300, - render.KeyBytesEgress: 30, + EdgeMetadata: report.EdgeMetadata{ + PacketCount: newu64(300), + ByteCount: newu64(30), }, }, "apache": { @@ -159,9 +159,9 @@ var ( test.ServerHostNodeID, ), NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{ - render.KeyBytesIngress: 150, - render.KeyBytesEgress: 1500, + EdgeMetadata: report.EdgeMetadata{ + PacketCount: newu64(150), + ByteCount: newu64(1500), }, }, "bash": { @@ -174,8 +174,8 @@ var ( test.NonContainerProcessNodeID, test.ServerHostNodeID, ), - NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{}, + NodeMetadata: report.MakeNodeMetadata(), + EdgeMetadata: report.EdgeMetadata{}, }, unknownPseudoNode1ID: unknownPseudoNode1, unknownPseudoNode2ID: unknownPseudoNode2, @@ -199,9 +199,9 @@ var ( test.ClientHostNodeID, ), NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{ - render.KeyBytesIngress: 300, - render.KeyBytesEgress: 30, + EdgeMetadata: report.EdgeMetadata{ + PacketCount: newu64(300), + ByteCount: newu64(30), }, }, test.ServerContainerID: { @@ -218,9 +218,9 @@ var ( test.ServerHostNodeID, ), NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{ - render.KeyBytesIngress: 150, - render.KeyBytesEgress: 1500, + EdgeMetadata: report.EdgeMetadata{ + PacketCount: newu64(150), + ByteCount: newu64(1500), }, }, uncontainedServerID: { @@ -233,8 +233,8 @@ var ( test.NonContainerProcessNodeID, test.ServerHostNodeID, ), - NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{}, + NodeMetadata: report.MakeNodeMetadata(), + EdgeMetadata: report.EdgeMetadata{}, }, render.TheInternetID: theInternetNode, } @@ -257,9 +257,9 @@ var ( test.ClientHostNodeID, ), NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{ - render.KeyBytesIngress: 300, - render.KeyBytesEgress: 30, + EdgeMetadata: report.EdgeMetadata{ + PacketCount: newu64(300), + ByteCount: newu64(30), }, }, test.ServerContainerImageName: { @@ -276,9 +276,9 @@ var ( test.ServerProcessNodeID, test.ServerHostNodeID), NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{ - render.KeyBytesIngress: 150, - render.KeyBytesEgress: 1500, + EdgeMetadata: report.EdgeMetadata{ + PacketCount: newu64(150), + ByteCount: newu64(1500), }, }, uncontainedServerID: { @@ -291,8 +291,8 @@ var ( test.NonContainerProcessNodeID, test.ServerHostNodeID, ), - NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{}, + NodeMetadata: report.MakeNodeMetadata(), + EdgeMetadata: report.EdgeMetadata{}, }, render.TheInternetID: theInternetNode, } @@ -315,8 +315,8 @@ var ( test.ServerAddressNodeID, ), NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{ - render.KeyMaxConnCountTCP: 3, + EdgeMetadata: report.EdgeMetadata{ + MaxConnCountTCP: newu64(3), }, }, ClientHostRenderedID: { @@ -331,24 +331,26 @@ var ( test.ClientAddressNodeID, ), NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{ - render.KeyMaxConnCountTCP: 3, + EdgeMetadata: report.EdgeMetadata{ + MaxConnCountTCP: newu64(3), }, }, pseudoHostID1: { - ID: pseudoHostID1, - LabelMajor: "10.10.10.10", - Pseudo: true, - NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{}, + ID: pseudoHostID1, + LabelMajor: "10.10.10.10", + Pseudo: true, + NodeMetadata: report.MakeNodeMetadata(), + EdgeMetadata: report.EdgeMetadata{}, }, pseudoHostID2: { - ID: pseudoHostID2, - LabelMajor: "10.10.10.11", - Pseudo: true, - NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{}, + ID: pseudoHostID2, + LabelMajor: "10.10.10.11", + Pseudo: true, + NodeMetadata: report.MakeNodeMetadata(), + EdgeMetadata: report.EdgeMetadata{}, }, render.TheInternetID: theInternetNode, } ) + +func newu64(value uint64) *uint64 { return &value } diff --git a/render/metadata.go b/render/metadata.go deleted file mode 100644 index 577b48b19..000000000 --- a/render/metadata.go +++ /dev/null @@ -1,46 +0,0 @@ -package render - -import ( - "github.com/weaveworks/scope/report" -) - -// AggregateMetadata is a composable version of an EdgeMetadata. It's used -// when we want to merge nodes/edges for any reason. -// -// Even though we base it on EdgeMetadata, we can apply it to nodes, by -// summing up (merging) all of the {ingress, egress} metadatas of the -// {incoming, outgoing} edges to the node. -type AggregateMetadata map[string]int - -const ( - // KeyBytesIngress is the aggregate metadata key for the total count of - // ingress bytes. - KeyBytesIngress = "ingress_bytes" - // KeyBytesEgress is the aggregate metadata key for the total count of - // egress bytes. - KeyBytesEgress = "egress_bytes" - // KeyMaxConnCountTCP is the aggregate metadata key for the maximum number - // of simultaneous observed TCP connections in the window. - KeyMaxConnCountTCP = "max_conn_count_tcp" -) - -// AggregateMetadataOf calculates a AggregateMetadata from an EdgeMetadata. -func AggregateMetadataOf(md report.EdgeMetadata) AggregateMetadata { - m := AggregateMetadata{} - if md.WithBytes { - m[KeyBytesIngress] = int(md.BytesIngress) - m[KeyBytesEgress] = int(md.BytesEgress) - } - if md.WithConnCountTCP { - // The maximum is the maximum. No need to calculate anything. - m[KeyMaxConnCountTCP] = int(md.MaxConnCountTCP) - } - return m -} - -// Merge adds the fields from AggregateMetadata to r. r must be initialized. -func (r *AggregateMetadata) Merge(other AggregateMetadata) { - for k, v := range other { - (*r)[k] += v - } -} diff --git a/render/metadata_test.go b/render/metadata_test.go deleted file mode 100644 index 805506650..000000000 --- a/render/metadata_test.go +++ /dev/null @@ -1,87 +0,0 @@ -package render_test - -import ( - "reflect" - "testing" - - "github.com/weaveworks/scope/render" - "github.com/weaveworks/scope/report" -) - -func TestAggregateMetadata(t *testing.T) { - for from, want := range map[report.EdgeMetadata]render.AggregateMetadata{ - - // Simple connection count - report.EdgeMetadata{ - WithConnCountTCP: true, - MaxConnCountTCP: 400, - }: { - render.KeyMaxConnCountTCP: 400, - }, - - // Connection count rounding - report.EdgeMetadata{ - WithConnCountTCP: true, - MaxConnCountTCP: 4, - }: { - render.KeyMaxConnCountTCP: 4, - }, - - // 0 connections. - report.EdgeMetadata{ - WithConnCountTCP: true, - MaxConnCountTCP: 0, - }: { - render.KeyMaxConnCountTCP: 0, - }, - - // Egress - report.EdgeMetadata{ - WithBytes: true, - BytesEgress: 24, - BytesIngress: 0, - }: { - render.KeyBytesEgress: 24, - render.KeyBytesIngress: 0, - }, - - // Ingress - report.EdgeMetadata{ - WithBytes: true, - BytesEgress: 0, - BytesIngress: 1200, - }: { - render.KeyBytesEgress: 0, - render.KeyBytesIngress: 1200, - }, - - // Nothing there. - report.EdgeMetadata{}: {}, - } { - if have := render.AggregateMetadataOf(from); !reflect.DeepEqual(have, want) { - t.Errorf("have: %#v, want %#v", have, want) - } - - } -} - -func TestAggregateMetadataSum(t *testing.T) { - var ( - this = render.AggregateMetadata{ - "ingress_bytes": 3, - } - other = render.AggregateMetadata{ - "ingress_bytes": 333, - "egress_bytes": 3, - } - want = render.AggregateMetadata{ - "ingress_bytes": 336, - "egress_bytes": 3, - } - ) - - this.Merge(other) - if have := this; !reflect.DeepEqual(have, want) { - t.Errorf("have: %#v, want %#v", have, want) - } -} diff --git a/render/render.go b/render/render.go index b7adf6e4d..86daa7ed9 100644 --- a/render/render.go +++ b/render/render.go @@ -9,7 +9,7 @@ import ( // Renderer is something that can render a report to a set of RenderableNodes. type Renderer interface { Render(report.Report) RenderableNodes - AggregateMetadata(rpt report.Report, localID, remoteID string) AggregateMetadata + EdgeMetadata(rpt report.Report, localID, remoteID string) report.EdgeMetadata } // Reduce renderer is a Renderer which merges together the output of several @@ -50,11 +50,11 @@ func (r Reduce) Render(rpt report.Report) RenderableNodes { return result } -// AggregateMetadata produces an AggregateMetadata for a given edge. -func (r Reduce) AggregateMetadata(rpt report.Report, localID, remoteID string) AggregateMetadata { - metadata := AggregateMetadata{} +// EdgeMetadata produces an EdgeMetadata for a given edge. +func (r Reduce) EdgeMetadata(rpt report.Report, localID, remoteID string) report.EdgeMetadata { + metadata := report.EdgeMetadata{} for _, renderer := range r { - metadata.Merge(renderer.AggregateMetadata(rpt, localID, remoteID)) + metadata.Merge(renderer.EdgeMetadata(rpt, localID, remoteID)) } return metadata } @@ -107,11 +107,11 @@ func (m Map) render(rpt report.Report) (RenderableNodes, map[string]string) { return output, mapped } -// AggregateMetadata gives the metadata of an edge from the perspective of the +// EdgeMetadata gives the metadata of an edge from the perspective of the // srcRenderableID. Since an edgeID can have multiple edges on the address // level, it uses the supplied mapping function to translate address IDs to // renderable node (mapped) IDs. -func (m Map) AggregateMetadata(rpt report.Report, srcRenderableID, dstRenderableID string) AggregateMetadata { +func (m Map) EdgeMetadata(rpt report.Report, srcRenderableID, dstRenderableID string) report.EdgeMetadata { // First we need to map the ids in this layer into the ids in the underlying layer _, mapped := m.render(rpt) // this maps from old -> new inverted := map[string][]string{} // this maps from new -> old(s) @@ -130,9 +130,9 @@ func (m Map) AggregateMetadata(rpt report.Report, srcRenderableID, dstRenderable } // Now recurse for each old edge - output := AggregateMetadata{} + output := report.EdgeMetadata{} for _, edge := range oldEdges { - metadata := m.Renderer.AggregateMetadata(rpt, edge.src, edge.dst) + metadata := m.Renderer.EdgeMetadata(rpt, edge.src, edge.dst) output.Merge(metadata) } return output @@ -205,7 +205,7 @@ func (m LeafMap) Render(rpt report.Report) RenderableNodes { srcRenderableNode.Origins = srcRenderableNode.Origins.Add(srcNodeID) edgeID := report.MakeEdgeID(srcNodeID, dstNodeID) if md, ok := t.EdgeMetadatas[edgeID]; ok { - srcRenderableNode.AggregateMetadata.Merge(AggregateMetadataOf(md)) + srcRenderableNode.EdgeMetadata.Merge(md) } } @@ -215,11 +215,11 @@ func (m LeafMap) Render(rpt report.Report) RenderableNodes { return nodes } -// AggregateMetadata gives the metadata of an edge from the perspective of the +// EdgeMetadata gives the metadata of an edge from the perspective of the // srcRenderableID. Since an edgeID can have multiple edges on the address // level, it uses the supplied mapping function to translate address IDs to // renderable node (mapped) IDs. -func (m LeafMap) AggregateMetadata(rpt report.Report, srcRenderableID, dstRenderableID string) AggregateMetadata { +func (m LeafMap) EdgeMetadata(rpt report.Report, srcRenderableID, dstRenderableID string) report.EdgeMetadata { t := m.Selector(rpt) metadata := report.EdgeMetadata{} for edgeID, edgeMeta := range t.EdgeMetadatas { @@ -240,7 +240,7 @@ func (m LeafMap) AggregateMetadata(rpt report.Report, srcRenderableID, dstRender metadata.Flatten(edgeMeta) } } - return AggregateMetadataOf(metadata) + return metadata } // Render produces a set of RenderableNodes given a Report diff --git a/render/render_test.go b/render/render_test.go index bfb2e9ed1..954d9df34 100644 --- a/render/render_test.go +++ b/render/render_test.go @@ -10,14 +10,14 @@ import ( type mockRenderer struct { render.RenderableNodes - aggregateMetadata render.AggregateMetadata + edgeMetadata report.EdgeMetadata } func (m mockRenderer) Render(rpt report.Report) render.RenderableNodes { return m.RenderableNodes } -func (m mockRenderer) AggregateMetadata(rpt report.Report, localID, remoteID string) render.AggregateMetadata { - return m.aggregateMetadata +func (m mockRenderer) EdgeMetadata(rpt report.Report, localID, remoteID string) report.EdgeMetadata { + return m.edgeMetadata } func TestReduceRender(t *testing.T) { @@ -36,12 +36,12 @@ func TestReduceRender(t *testing.T) { func TestReduceEdge(t *testing.T) { renderer := render.Reduce([]render.Renderer{ - mockRenderer{aggregateMetadata: render.AggregateMetadata{"foo": 1}}, - mockRenderer{aggregateMetadata: render.AggregateMetadata{"bar": 2}}, + mockRenderer{edgeMetadata: report.EdgeMetadata{PacketCount: newu64(1)}}, + mockRenderer{edgeMetadata: report.EdgeMetadata{PacketCount: newu64(2)}}, }) - want := render.AggregateMetadata{"foo": 1, "bar": 2} - have := renderer.AggregateMetadata(report.MakeReport(), "", "") + want := report.EdgeMetadata{PacketCount: newu64(3)} + have := renderer.EdgeMetadata(report.MakeReport(), "", "") if !reflect.DeepEqual(want, have) { t.Errorf("want %+v, have %+v", want, have) @@ -118,8 +118,8 @@ func TestMapEdge(t *testing.T) { ">bar": report.MakeIDList("foo"), }, EdgeMetadatas: report.EdgeMetadatas{ - "foo|bar": report.EdgeMetadata{WithBytes: true, BytesIngress: 1, BytesEgress: 2}, - "bar|foo": report.EdgeMetadata{WithBytes: true, BytesIngress: 3, BytesEgress: 4}, + "foo|bar": report.EdgeMetadata{PacketCount: newu64(1), ByteCount: newu64(2)}, + "bar|foo": report.EdgeMetadata{PacketCount: newu64(3), ByteCount: newu64(4)}, }, } } @@ -139,12 +139,10 @@ func TestMapEdge(t *testing.T) { }, } - want := render.AggregateMetadata{ - render.KeyBytesIngress: 1, - render.KeyBytesEgress: 2, - } - have := mapper.AggregateMetadata(report.MakeReport(), "_foo", "_bar") - if !reflect.DeepEqual(want, have) { + if want, have := (report.EdgeMetadata{ + PacketCount: newu64(1), + ByteCount: newu64(2), + }), mapper.EdgeMetadata(report.MakeReport(), "_foo", "_bar"); !reflect.DeepEqual(want, have) { t.Errorf("want %+v, have %+v", want, have) } } @@ -166,3 +164,5 @@ func TestFilterRender(t *testing.T) { t.Errorf("want %+v, have %+v", want, have) } } + +func newu64(value uint64) *uint64 { return &value } diff --git a/render/renderable_node.go b/render/renderable_node.go index bdd492fbe..074ca3774 100644 --- a/render/renderable_node.go +++ b/render/renderable_node.go @@ -16,7 +16,7 @@ type RenderableNode struct { Adjacency report.IDList `json:"adjacency,omitempty"` // Node IDs (in the same topology domain) Origins report.IDList `json:"origins,omitempty"` // Core node IDs that contributed information - AggregateMetadata `json:"metadata"` // Numeric sums + report.EdgeMetadata `json:"metadata"` // Numeric sums report.NodeMetadata `json:"-"` // merged NodeMetadata of the nodes used to build this } @@ -56,57 +56,57 @@ func (rn *RenderableNode) Merge(other RenderableNode) { rn.Adjacency = rn.Adjacency.Merge(other.Adjacency) rn.Origins = rn.Origins.Merge(other.Origins) - rn.AggregateMetadata.Merge(other.AggregateMetadata) + rn.EdgeMetadata.Merge(other.EdgeMetadata) rn.NodeMetadata.Merge(other.NodeMetadata) } // NewRenderableNode makes a new RenderableNode func NewRenderableNode(id, major, minor, rank string, nmd report.NodeMetadata) RenderableNode { return RenderableNode{ - ID: id, - LabelMajor: major, - LabelMinor: minor, - Rank: rank, - Pseudo: false, - AggregateMetadata: AggregateMetadata{}, - NodeMetadata: nmd.Copy(), + ID: id, + LabelMajor: major, + LabelMinor: minor, + Rank: rank, + Pseudo: false, + EdgeMetadata: report.EdgeMetadata{}, + NodeMetadata: nmd.Copy(), } } func newDerivedNode(id string, node RenderableNode) RenderableNode { return RenderableNode{ - ID: id, - LabelMajor: "", - LabelMinor: "", - Rank: "", - Pseudo: node.Pseudo, - AggregateMetadata: node.AggregateMetadata, - Origins: node.Origins, - NodeMetadata: report.MakeNodeMetadata(), + ID: id, + LabelMajor: "", + LabelMinor: "", + Rank: "", + Pseudo: node.Pseudo, + EdgeMetadata: node.EdgeMetadata, + Origins: node.Origins, + NodeMetadata: report.MakeNodeMetadata(), } } func newPseudoNode(id, major, minor string) RenderableNode { return RenderableNode{ - ID: id, - LabelMajor: major, - LabelMinor: minor, - Rank: "", - Pseudo: true, - AggregateMetadata: AggregateMetadata{}, - NodeMetadata: report.MakeNodeMetadata(), + ID: id, + LabelMajor: major, + LabelMinor: minor, + Rank: "", + Pseudo: true, + EdgeMetadata: report.EdgeMetadata{}, + NodeMetadata: report.MakeNodeMetadata(), } } func newDerivedPseudoNode(id, major string, node RenderableNode) RenderableNode { return RenderableNode{ - ID: id, - LabelMajor: major, - LabelMinor: "", - Rank: "", - Pseudo: true, - AggregateMetadata: node.AggregateMetadata, - Origins: node.Origins, - NodeMetadata: report.MakeNodeMetadata(), + ID: id, + LabelMajor: major, + LabelMinor: "", + Rank: "", + Pseudo: true, + EdgeMetadata: node.EdgeMetadata, + Origins: node.Origins, + NodeMetadata: report.MakeNodeMetadata(), } } diff --git a/render/topologies_test.go b/render/topologies_test.go index f5496aec7..f8f0402f8 100644 --- a/render/topologies_test.go +++ b/render/topologies_test.go @@ -23,7 +23,7 @@ func TestProcessRenderer(t *testing.T) { have := render.ProcessRenderer.Render(test.Report) have = trimNodeMetadata(have) if !reflect.DeepEqual(expected.RenderedProcesses, have) { - t.Error("\n" + test.Diff(expected.RenderedProcesses, have)) + t.Error(test.Diff(expected.RenderedProcesses, have)) } } @@ -31,7 +31,7 @@ func TestProcessNameRenderer(t *testing.T) { have := render.ProcessNameRenderer.Render(test.Report) have = trimNodeMetadata(have) if !reflect.DeepEqual(expected.RenderedProcessNames, have) { - t.Error("\n" + test.Diff(expected.RenderedProcessNames, have)) + t.Error(test.Diff(expected.RenderedProcessNames, have)) } } @@ -39,7 +39,7 @@ func TestContainerRenderer(t *testing.T) { have := render.ContainerRenderer.Render(test.Report) have = trimNodeMetadata(have) if !reflect.DeepEqual(expected.RenderedContainers, have) { - t.Error("\n" + test.Diff(expected.RenderedContainers, have)) + t.Error(test.Diff(expected.RenderedContainers, have)) } } @@ -47,7 +47,7 @@ func TestContainerImageRenderer(t *testing.T) { have := render.ContainerImageRenderer.Render(test.Report) have = trimNodeMetadata(have) if !reflect.DeepEqual(expected.RenderedContainerImages, have) { - t.Error("\n" + test.Diff(expected.RenderedContainerImages, have)) + t.Error(test.Diff(expected.RenderedContainerImages, have)) } } @@ -55,6 +55,6 @@ func TestHostRenderer(t *testing.T) { have := render.HostRenderer.Render(test.Report) have = trimNodeMetadata(have) if !reflect.DeepEqual(expected.RenderedHosts, have) { - t.Error("\n" + test.Diff(expected.RenderedHosts, have)) + t.Error(test.Diff(expected.RenderedHosts, have)) } } diff --git a/report/merge.go b/report/merge.go index 025bd4755..8ebd1eab2 100644 --- a/report/merge.go +++ b/report/merge.go @@ -12,6 +12,7 @@ func (r *Report) Merge(other Report) { r.ContainerImage.Merge(other.ContainerImage) r.Host.Merge(other.Host) r.Overlay.Merge(other.Overlay) + r.Sampling.Merge(other.Sampling) } // Merge merges another Topology into the receiver. @@ -62,31 +63,45 @@ func (e *EdgeMetadatas) Merge(other EdgeMetadatas) { // Merge merges another EdgeMetadata into the receiver. The two edge metadatas // should represent the same edge on different times. func (m *EdgeMetadata) Merge(other EdgeMetadata) { - if other.WithBytes { - m.WithBytes = true - m.BytesIngress += other.BytesIngress - m.BytesEgress += other.BytesEgress - } - if other.WithConnCountTCP { - m.WithConnCountTCP = true - if other.MaxConnCountTCP > m.MaxConnCountTCP { - m.MaxConnCountTCP = other.MaxConnCountTCP - } - } + m.PacketCount = merge(m.PacketCount, other.PacketCount, sum) + m.ByteCount = merge(m.ByteCount, other.ByteCount, sum) + m.MaxConnCountTCP = merge(m.MaxConnCountTCP, other.MaxConnCountTCP, max) } // Flatten sums two EdgeMetadatas. Their windows should be the same duration; // they should represent different edges at the same time. func (m *EdgeMetadata) Flatten(other EdgeMetadata) { - if other.WithBytes { - m.WithBytes = true - m.BytesIngress += other.BytesIngress - m.BytesEgress += other.BytesEgress - } - if other.WithConnCountTCP { - m.WithConnCountTCP = true - // Note: summing of two maximums doesn't always give the true maximum. - // But it's our Best Effort effort. - m.MaxConnCountTCP += other.MaxConnCountTCP - } + m.PacketCount = merge(m.PacketCount, other.PacketCount, sum) + m.ByteCount = merge(m.ByteCount, other.ByteCount, sum) + // Note that summing of two maximums doesn't always give us the true + // maximum. But it's a best effort. + m.MaxConnCountTCP = merge(m.MaxConnCountTCP, other.MaxConnCountTCP, sum) +} + +// Merge combines two sampling structures via simple addition. +func (s *Sampling) Merge(other Sampling) { + s.Count += other.Count + s.Total += other.Total +} + +func merge(dst, src *uint64, op func(uint64, uint64) uint64) *uint64 { + if src == nil { + return dst + } + if dst == nil { + dst = new(uint64) + } + (*dst) = op(*dst, *src) + return dst +} + +func sum(dst, src uint64) uint64 { + return dst + src +} + +func max(dst, src uint64) uint64 { + if dst > src { + return dst + } + return src } diff --git a/report/merge_test.go b/report/merge_test.go index 99d04a48c..5f3a75beb 100644 --- a/report/merge_test.go +++ b/report/merge_test.go @@ -112,102 +112,82 @@ func TestMergeEdgeMetadatas(t *testing.T) { a: report.EdgeMetadatas{}, b: report.EdgeMetadatas{ "hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - WithBytes: true, - BytesEgress: 12, - BytesIngress: 0, - WithConnCountTCP: true, - MaxConnCountTCP: 2, + PacketCount: newu64(12), + ByteCount: newu64(0), + MaxConnCountTCP: newu64(2), }, }, want: report.EdgeMetadatas{ "hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - WithBytes: true, - BytesEgress: 12, - BytesIngress: 0, - WithConnCountTCP: true, - MaxConnCountTCP: 2, + PacketCount: newu64(12), + ByteCount: newu64(0), + MaxConnCountTCP: newu64(2), }, }, }, "Empty b": { a: report.EdgeMetadatas{ "hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - WithBytes: true, - BytesEgress: 12, - BytesIngress: 0, + PacketCount: newu64(12), + ByteCount: newu64(0), }, }, b: report.EdgeMetadatas{}, want: report.EdgeMetadatas{ "hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - WithBytes: true, - BytesEgress: 12, - BytesIngress: 0, + PacketCount: newu64(12), + ByteCount: newu64(0), }, }, }, "Host merge": { a: report.EdgeMetadatas{ "hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - WithBytes: true, - BytesEgress: 12, - BytesIngress: 0, - WithConnCountTCP: true, - MaxConnCountTCP: 4, + PacketCount: newu64(12), + ByteCount: newu64(0), + MaxConnCountTCP: newu64(4), }, }, b: report.EdgeMetadatas{ "hostQ|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - WithBytes: true, - BytesEgress: 1, - BytesIngress: 2, - WithConnCountTCP: true, - MaxConnCountTCP: 6, + PacketCount: newu64(1), + ByteCount: newu64(2), + MaxConnCountTCP: newu64(6), }, }, want: report.EdgeMetadatas{ "hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - WithBytes: true, - BytesEgress: 12, - BytesIngress: 0, - WithConnCountTCP: true, - MaxConnCountTCP: 4, + PacketCount: newu64(12), + ByteCount: newu64(0), + MaxConnCountTCP: newu64(4), }, "hostQ|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - WithBytes: true, - BytesEgress: 1, - BytesIngress: 2, - WithConnCountTCP: true, - MaxConnCountTCP: 6, + PacketCount: newu64(1), + ByteCount: newu64(2), + MaxConnCountTCP: newu64(6), }, }, }, "Edge merge": { a: report.EdgeMetadatas{ "hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - WithBytes: true, - BytesEgress: 12, - BytesIngress: 0, - WithConnCountTCP: true, - MaxConnCountTCP: 7, + PacketCount: newu64(12), + ByteCount: newu64(0), + MaxConnCountTCP: newu64(7), }, }, b: report.EdgeMetadatas{ "hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - WithBytes: true, - BytesEgress: 1, - BytesIngress: 2, - WithConnCountTCP: true, - MaxConnCountTCP: 9, + PacketCount: newu64(1), + ByteCount: newu64(2), + MaxConnCountTCP: newu64(9), }, }, want: report.EdgeMetadatas{ "hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - WithBytes: true, - BytesEgress: 13, - BytesIngress: 2, - WithConnCountTCP: true, - MaxConnCountTCP: 9, + PacketCount: newu64(13), + ByteCount: newu64(2), + MaxConnCountTCP: newu64(9), }, }, }, @@ -319,3 +299,5 @@ func TestMergeNodeMetadatas(t *testing.T) { } } } + +func newu64(value uint64) *uint64 { return &value } diff --git a/report/report.go b/report/report.go index dc8f72d72..ca1eaa470 100644 --- a/report/report.go +++ b/report/report.go @@ -1,5 +1,7 @@ package report +import "fmt" + // Report is the core data type. It's produced by probes, and consumed and // stored by apps. It's composed of multiple topologies, each representing // a different (related, but not equivalent) view of the network. @@ -36,6 +38,75 @@ type Report struct { // overlaid on the infrastructure. The information is scraped by polling // their status endpoints. Edges could be present, but aren't currently. Overlay Topology + + // Sampling data for this report. + Sampling +} + +// MakeReport makes a clean report, ready to Merge() other reports into. +func MakeReport() Report { + return Report{ + Endpoint: NewTopology(), + Address: NewTopology(), + Process: NewTopology(), + Container: NewTopology(), + ContainerImage: NewTopology(), + Host: NewTopology(), + Overlay: NewTopology(), + } +} + +// Topologies returns a slice of Topologies in this report +func (r Report) Topologies() []Topology { + return []Topology{ + r.Endpoint, + r.Address, + r.Process, + r.Container, + r.ContainerImage, + r.Host, + r.Overlay, + } +} + +// Validate checks the report for various inconsistencies. +func (r Report) Validate() error { + var packets uint64 + for _, topology := range r.Topologies() { + if err := topology.Validate(); err != nil { + return err + } + for _, emd := range topology.EdgeMetadatas { + if emd.PacketCount != nil { + packets += *emd.PacketCount + } + } + } + if r.Sampling.Count > r.Sampling.Total { + return fmt.Errorf("sampling count (%d) bigger than total (%d)", r.Sampling.Count, r.Sampling.Total) + } + if packets > 0 && (r.Sampling.Count == 0 || r.Sampling.Total == 0) { + return fmt.Errorf("packets exist in EdgeMetadata, but no sampling count or total in the base report") + } + return nil +} + +// Sampling describes how the packet data sources for this report were +// sampled. It can be used to calculate effective sample rates. We can't +// just put the rate here, because that can't be accurately merged. Counts +// in e.g. edge metadata structures have already been adjusted to +// compensate for the sample rate. +type Sampling struct { + Count uint64 // observed and processed + Total uint64 // observed overall +} + +// Rate returns the effective sampling rate. +func (s Sampling) Rate() float64 { + if s.Total <= 0 { + return 1.0 + } + return float64(s.Count) / float64(s.Total) } const ( @@ -77,32 +148,3 @@ func SelectAddress(r Report) Topology { func SelectHost(r Report) Topology { return r.Host } - -// MakeReport makes a clean report, ready to Merge() other reports into. -func MakeReport() Report { - return Report{ - Endpoint: NewTopology(), - Address: NewTopology(), - Process: NewTopology(), - Container: NewTopology(), - ContainerImage: NewTopology(), - Host: NewTopology(), - Overlay: NewTopology(), - } -} - -// Topologies returns a slice of Topologies in this report -func (r Report) Topologies() []Topology { - return []Topology{r.Endpoint, r.Address, r.Process, r.Container, - r.ContainerImage, r.Host, r.Overlay} -} - -// Validate checks the report for various inconsistencies. -func (r Report) Validate() error { - for _, topology := range r.Topologies() { - if err := topology.Validate(); err != nil { - return err - } - } - return nil -} diff --git a/report/report_test.go b/report/report_test.go new file mode 100644 index 000000000..aa330ce1f --- /dev/null +++ b/report/report_test.go @@ -0,0 +1,27 @@ +package report_test + +import ( + "reflect" + "testing" + + "github.com/weaveworks/scope/report" +) + +// Make sure we don't add a topology and miss it in the Topologies method. +func TestReportTopologies(t *testing.T) { + var ( + reportType = reflect.TypeOf(report.MakeReport()) + topologyType = reflect.TypeOf(report.NewTopology()) + ) + + var want int + for i := 0; i < reportType.NumField(); i++ { + if reportType.Field(i).Type == topologyType { + want++ + } + } + + if have := len(report.MakeReport().Topologies()); want != have { + t.Errorf("want %d, have %d", want, have) + } +} diff --git a/report/topology.go b/report/topology.go index ce9055997..9b9a5fed2 100644 --- a/report/topology.go +++ b/report/topology.go @@ -28,16 +28,12 @@ type EdgeMetadatas map[string]EdgeMetadata // IDs. type NodeMetadatas map[string]NodeMetadata -// EdgeMetadata describes a superset of the metadata that probes can -// conceivably (and usefully) collect about an edge between two nodes in any -// topology. +// EdgeMetadata describes a superset of the metadata that probes can possibly +// collect about a directed edge between two nodes in any topology. type EdgeMetadata struct { - WithBytes bool `json:"with_bytes,omitempty"` - BytesIngress uint `json:"bytes_ingress,omitempty"` // dst -> src - BytesEgress uint `json:"bytes_egress,omitempty"` // src -> dst - - WithConnCountTCP bool `json:"with_conn_count_tcp,omitempty"` - MaxConnCountTCP uint `json:"max_conn_count_tcp,omitempty"` + PacketCount *uint64 `json:"packet_count,omitempty"` + ByteCount *uint64 `json:"byte_count,omitempty"` + MaxConnCountTCP *uint64 `json:"max_conn_count_tcp,omitempty"` } // NodeMetadata describes a superset of the metadata that probes can collect diff --git a/test/report_fixture.go b/test/report_fixture.go index d811fc69f..b48fcd8dc 100644 --- a/test/report_fixture.go +++ b/test/report_fixture.go @@ -108,40 +108,33 @@ var ( }, EdgeMetadatas: report.EdgeMetadatas{ report.MakeEdgeID(Client54001NodeID, Server80NodeID): report.EdgeMetadata{ - WithBytes: true, - BytesIngress: 100, - BytesEgress: 10, + PacketCount: newu64(100), + ByteCount: newu64(10), }, report.MakeEdgeID(Client54002NodeID, Server80NodeID): report.EdgeMetadata{ - WithBytes: true, - BytesIngress: 200, - BytesEgress: 20, + PacketCount: newu64(200), + ByteCount: newu64(20), }, report.MakeEdgeID(Server80NodeID, Client54001NodeID): report.EdgeMetadata{ - WithBytes: true, - BytesIngress: 10, - BytesEgress: 100, + PacketCount: newu64(10), + ByteCount: newu64(100), }, report.MakeEdgeID(Server80NodeID, Client54002NodeID): report.EdgeMetadata{ - WithBytes: true, - BytesIngress: 20, - BytesEgress: 200, + PacketCount: newu64(20), + ByteCount: newu64(200), }, report.MakeEdgeID(Server80NodeID, UnknownClient1NodeID): report.EdgeMetadata{ - WithBytes: true, - BytesIngress: 30, - BytesEgress: 300, + PacketCount: newu64(30), + ByteCount: newu64(300), }, report.MakeEdgeID(Server80NodeID, UnknownClient2NodeID): report.EdgeMetadata{ - WithBytes: true, - BytesIngress: 40, - BytesEgress: 400, + PacketCount: newu64(40), + ByteCount: newu64(400), }, report.MakeEdgeID(Server80NodeID, UnknownClient3NodeID): report.EdgeMetadata{ - WithBytes: true, - BytesIngress: 50, - BytesEgress: 500, + PacketCount: newu64(50), + ByteCount: newu64(500), }, }, }, @@ -222,12 +215,10 @@ var ( }, EdgeMetadatas: report.EdgeMetadatas{ report.MakeEdgeID(ClientAddressNodeID, ServerAddressNodeID): report.EdgeMetadata{ - WithConnCountTCP: true, - MaxConnCountTCP: 3, + MaxConnCountTCP: newu64(3), }, report.MakeEdgeID(ServerAddressNodeID, ClientAddressNodeID): report.EdgeMetadata{ - WithConnCountTCP: true, - MaxConnCountTCP: 3, + MaxConnCountTCP: newu64(3), }, }, }, @@ -251,6 +242,10 @@ var ( }, EdgeMetadatas: report.EdgeMetadatas{}, }, + Sampling: report.Sampling{ + Count: 1024, + Total: 4096, + }, } ) @@ -259,3 +254,5 @@ func init() { panic(err) } } + +func newu64(value uint64) *uint64 { return &value } From c7a06d2a435d83be7ba7b9749a0f937f185277bf Mon Sep 17 00:00:00 2001 From: Peter Bourgon Date: Fri, 31 Jul 2015 15:21:41 +0200 Subject: [PATCH 2/8] Don't flat-embed sampling in the report In the JSON representation, we want the Sampling data to be distinct. --- report/report.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/report/report.go b/report/report.go index ca1eaa470..797bd11f1 100644 --- a/report/report.go +++ b/report/report.go @@ -40,7 +40,7 @@ type Report struct { Overlay Topology // Sampling data for this report. - Sampling + Sampling Sampling } // MakeReport makes a clean report, ready to Merge() other reports into. @@ -53,6 +53,7 @@ func MakeReport() Report { ContainerImage: NewTopology(), Host: NewTopology(), Overlay: NewTopology(), + Sampling: Sampling{}, } } From 64ebedccb177b966ceb1983596a1b859574529a3 Mon Sep 17 00:00:00 2001 From: Peter Bourgon Date: Fri, 31 Jul 2015 15:34:58 +0200 Subject: [PATCH 3/8] Allow packet capture with effective sample rate 100% --- probe/main.go | 3 --- probe/sniff/sniffer.go | 6 ++++++ 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/probe/main.go b/probe/main.go index db5e99ae5..0b3a8c63a 100644 --- a/probe/main.go +++ b/probe/main.go @@ -115,9 +115,6 @@ func main() { } if *captureEnabled { - if *captureOn > *captureOff { - log.Fatalf("-capture.on (%s) must be <= -capture.off (%s)", *captureOn, *captureOff) - } for _, iface := range strings.Split(*captureInterfaces, ",") { source, err := sniff.NewSource(iface) if err != nil { diff --git a/probe/sniff/sniffer.go b/probe/sniff/sniffer.go index 2489ed38f..e0a13feef 100644 --- a/probe/sniff/sniffer.go +++ b/probe/sniff/sniffer.go @@ -64,6 +64,12 @@ func (s *Sniffer) loop(src gopacket.ZeroCopyPacketDataSource, on, off time.Durat done = make(chan struct{}) // when src is finished, we're done too ) + // As a special case, if our off duty cycle is zero, i.e. 100% sample + // rate, we simply disable the turn-off signal channel. + if off == 0 { + turnOff = nil + } + go func() { s.read(src, packets, &process, &total, &count) close(done) From 0aadf6447b1cce85bacf85efb14902103706d726 Mon Sep 17 00:00:00 2001 From: Peter Bourgon Date: Fri, 31 Jul 2015 19:47:55 +0200 Subject: [PATCH 4/8] Revert to correct edge construction Another implicit invariant in the data model is that edges are always of the form (local -> remote). That is, the source of an edge must always be a node that originates from within Scope's domain of visibility. This was evident by the presence of ingress and egress fields in edge/aggregate metadata. When building the sniffer, I accidentally and incorrectly violated this invariant, by constructing distinct edges for (local -> remote) and (remote -> local), and collapsing ingress and egress byte counts to a single scalar. I experienced a variety of subtle undefined behavior as a result. See #339. This change reverts to the old, correct methodology. Consequently the sniffer needs to be able to find out which side of the sniffed packet is local v. remote, and to do that it needs access to local networks. I moved the discovery from the probe/host package into probe/main.go. As part of that work I discovered that package report also maintains its own, independent "cache" of local networks. Except it contains only the (optional) Docker bridge network, if it's been populated by the probe, and it's only used by the report.Make{Endpoint,Address}NodeID constructors to scope local addresses. Normally, scoping happens during rendering, and only for pseudo nodes -- see current LeafMap Render localNetworks. This is pretty convoluted and should be either be made consistent or heavily commented. --- app/api_topology_test.go | 4 +- probe/host/reporter.go | 33 +++---- probe/host/reporter_test.go | 32 ++++--- probe/main.go | 16 +++- probe/sniff/sniffer.go | 128 +++++++++++++++++++-------- probe/sniff/sniffer_internal_test.go | 10 ++- probe/sniff/sniffer_test.go | 18 ++-- render/detailed_node.go | 7 +- render/detailed_node_test.go | 2 +- render/expected/expected.go | 36 ++++---- render/render_test.go | 8 +- report/merge.go | 6 +- report/merge_test.go | 38 ++++---- report/networks.go | 7 +- report/topology.go | 7 +- test/report_fixture.go | 28 +++--- 16 files changed, 225 insertions(+), 155 deletions(-) diff --git a/app/api_topology_test.go b/app/api_topology_test.go index 590b9fa02..61fa8fa84 100644 --- a/app/api_topology_test.go +++ b/app/api_topology_test.go @@ -96,8 +96,8 @@ func TestAPITopologyApplications(t *testing.T) { t.Fatalf("JSON parse error: %s", err) } if want, have := (report.EdgeMetadata{ - PacketCount: newu64(100), - ByteCount: newu64(10), + PacketCount: newu64(10), + EgressByteCount: newu64(100), }), edge.Metadata; !reflect.DeepEqual(want, have) { t.Error(test.Diff(want, have)) } diff --git a/probe/host/reporter.go b/probe/host/reporter.go index 6694e96f7..a242e6a6f 100644 --- a/probe/host/reporter.go +++ b/probe/host/reporter.go @@ -1,7 +1,6 @@ package host import ( - "net" "runtime" "strings" "time" @@ -9,7 +8,7 @@ import ( "github.com/weaveworks/scope/report" ) -// Keys for use in NodeMetadata +// Keys for use in NodeMetadata. const ( Timestamp = "ts" HostName = "host_name" @@ -20,30 +19,31 @@ const ( Uptime = "uptime" ) -// Exposed for testing +// Exposed for testing. const ( ProcUptime = "/proc/uptime" ProcLoad = "/proc/loadavg" ) -// Exposed for testing +// Exposed for testing. var ( - InterfaceAddrs = net.InterfaceAddrs - Now = func() string { return time.Now().UTC().Format(time.RFC3339Nano) } + Now = func() string { return time.Now().UTC().Format(time.RFC3339Nano) } ) // Reporter generates Reports containing the host topology. type Reporter struct { - hostID string - hostName string + hostID string + hostName string + localNets report.Networks } // NewReporter returns a Reporter which produces a report containing host // topology for this host. -func NewReporter(hostID, hostName string) *Reporter { +func NewReporter(hostID, hostName string, localNets report.Networks) *Reporter { return &Reporter{ - hostID: hostID, - hostName: hostName, + hostID: hostID, + hostName: hostName, + localNets: localNets, } } @@ -54,15 +54,8 @@ func (r *Reporter) Report() (report.Report, error) { localCIDRs []string ) - localNets, err := InterfaceAddrs() - if err != nil { - return rep, err - } - for _, localNet := range localNets { - // Not all networks are IP networks. - if ipNet, ok := localNet.(*net.IPNet); ok { - localCIDRs = append(localCIDRs, ipNet.String()) - } + for _, localNet := range r.localNets { + localCIDRs = append(localCIDRs, localNet.String()) } uptime, err := GetUptime() diff --git a/probe/host/reporter_test.go b/probe/host/reporter_test.go index f4127a577..a3fa0df09 100644 --- a/probe/host/reporter_test.go +++ b/probe/host/reporter_test.go @@ -12,38 +12,37 @@ import ( "github.com/weaveworks/scope/test" ) -const ( - release = "release" - version = "version" - network = "192.168.0.0/16" - hostID = "hostid" - now = "now" - hostname = "hostname" - load = "0.59 0.36 0.29" - uptime = "278h55m43s" - kernel = "release version" -) - func TestReporter(t *testing.T) { + var ( + release = "release" + version = "version" + network = "192.168.0.0/16" + hostID = "hostid" + now = "now" + hostname = "hostname" + load = "0.59 0.36 0.29" + uptime = "278h55m43s" + kernel = "release version" + _, ipnet, _ = net.ParseCIDR(network) + localNets = report.Networks([]*net.IPNet{ipnet}) + ) + var ( oldGetKernelVersion = host.GetKernelVersion oldGetLoad = host.GetLoad oldGetUptime = host.GetUptime - oldInterfaceAddrs = host.InterfaceAddrs oldNow = host.Now ) defer func() { host.GetKernelVersion = oldGetKernelVersion host.GetLoad = oldGetLoad host.GetUptime = oldGetUptime - host.InterfaceAddrs = oldInterfaceAddrs host.Now = oldNow }() host.GetKernelVersion = func() (string, error) { return release + " " + version, nil } host.GetLoad = func() string { return load } host.GetUptime = func() (time.Duration, error) { return time.ParseDuration(uptime) } host.Now = func() string { return now } - host.InterfaceAddrs = func() ([]net.Addr, error) { _, ipnet, _ := net.ParseCIDR(network); return []net.Addr{ipnet}, nil } want := report.MakeReport() want.Host.NodeMetadatas[report.MakeHostNodeID(hostID)] = report.MakeNodeMetadataWith(map[string]string{ @@ -55,8 +54,7 @@ func TestReporter(t *testing.T) { host.Uptime: uptime, host.KernelVersion: kernel, }) - r := host.NewReporter(hostID, hostname) - have, _ := r.Report() + have, _ := host.NewReporter(hostID, hostname, localNets).Report() if !reflect.DeepEqual(want, have) { t.Errorf("%s", test.Diff(want, have)) } diff --git a/probe/main.go b/probe/main.go index 0b3a8c63a..29f7626d1 100644 --- a/probe/main.go +++ b/probe/main.go @@ -79,11 +79,23 @@ func main() { } defer publisher.Close() + addrs, err := net.InterfaceAddrs() + if err != nil { + log.Fatal(err) + } + localNets := report.Networks{} + for _, addr := range addrs { + // Not all addrs are IPNets. + if ipNet, ok := addr.(*net.IPNet); ok { + localNets = append(localNets, ipNet) + } + } + var ( hostName = hostname() hostID = hostName // TODO: we should sanitize the hostname taggers = []Tagger{newTopologyTagger(), host.NewTagger(hostID)} - reporters = []Reporter{host.NewReporter(hostID, hostName), endpoint.NewReporter(hostID, hostName, *spyProcs)} + reporters = []Reporter{host.NewReporter(hostID, hostName, localNets), endpoint.NewReporter(hostID, hostName, *spyProcs)} processCache *process.CachingWalker ) @@ -122,7 +134,7 @@ func main() { continue } log.Printf("capturing packets on %s", iface) - reporters = append(reporters, sniff.New(hostID, source, *captureOn, *captureOff)) + reporters = append(reporters, sniff.New(hostID, localNets, source, *captureOn, *captureOff)) } } diff --git a/probe/sniff/sniffer.go b/probe/sniff/sniffer.go index e0a13feef..91dff56c5 100644 --- a/probe/sniff/sniffer.go +++ b/probe/sniff/sniffer.go @@ -3,6 +3,7 @@ package sniff import ( "io" "log" + "net" "strconv" "sync/atomic" "time" @@ -15,27 +16,29 @@ import ( // Sniffer is a packet-sniffing reporter. type Sniffer struct { - hostID string - reports chan chan report.Report - parser *gopacket.DecodingLayerParser - decoded []gopacket.LayerType - eth layers.Ethernet - ip4 layers.IPv4 - ip6 layers.IPv6 - tcp layers.TCP - udp layers.UDP - icmp4 layers.ICMPv4 - icmp6 layers.ICMPv6 + hostID string + localNets report.Networks + reports chan chan report.Report + parser *gopacket.DecodingLayerParser + decoded []gopacket.LayerType + eth layers.Ethernet + ip4 layers.IPv4 + ip6 layers.IPv6 + tcp layers.TCP + udp layers.UDP + icmp4 layers.ICMPv4 + icmp6 layers.ICMPv6 } // New returns a new sniffing reporter that samples traffic by turning its // packet capture facilities on and off. Note that the on and off durations // represent a way to bound CPU burn. Effective sample rate needs to be // calculated as (packets decoded / packets observed). -func New(hostID string, src gopacket.ZeroCopyPacketDataSource, on, off time.Duration) *Sniffer { +func New(hostID string, localNets report.Networks, src gopacket.ZeroCopyPacketDataSource, on, off time.Duration) *Sniffer { s := &Sniffer{ - hostID: hostID, - reports: make(chan chan report.Report), + hostID: hostID, + localNets: localNets, + reports: make(chan chan report.Report), } s.parser = gopacket.NewDecodingLayerParser( layers.LayerTypeEthernet, @@ -119,8 +122,11 @@ func interpolateCounts(r report.Report) { if emd.PacketCount != nil { *emd.PacketCount = uint64(float64(*emd.PacketCount) * factor) } - if emd.ByteCount != nil { - *emd.ByteCount = uint64(float64(*emd.ByteCount) * factor) + if emd.EgressByteCount != nil { + *emd.EgressByteCount = uint64(float64(*emd.EgressByteCount) * factor) + } + if emd.IngressByteCount != nil { + *emd.IngressByteCount = uint64(float64(*emd.IngressByteCount) * factor) } } } @@ -204,54 +210,104 @@ func (s *Sniffer) read(src gopacket.ZeroCopyPacketDataSource, dst chan Packet, p } // Merge puts the packet into the report. +// +// Note that, for the moment, we encode bidirectional traffic as ingress and +// egress traffic on a single edge whose src is local and dst is remote. That +// is, if we see a packet from the remote addr 9.8.7.6 to the local addr +// 1.2.3.4, we apply it as *ingress* on the edge (1.2.3.4 -> 9.8.7.6). func (s *Sniffer) Merge(p Packet, rpt report.Report) { - // With a src and dst IP, we can add to the address topology. - if p.SrcIP != "" && p.DstIP != "" { + if p.SrcIP == "" || p.DstIP == "" { + return + } + + // One end of the traffic has to be local. Otherwise, we don't know how to + // construct the edge. + // + // If we need to get around this limitation, we may be able to change the + // semantics of the report, and allow the src side of edges to be from + // anywhere. But that will have ramifications throughout Scope (read: it + // may violate implicit invariants) and needs to be thought through. + var ( + srcLocal = s.localNets.Contains(net.ParseIP(p.SrcIP)) + dstLocal = s.localNets.Contains(net.ParseIP(p.DstIP)) + localIP string + remoteIP string + egress bool + ) + switch { + case srcLocal && !dstLocal: + localIP, remoteIP, egress = p.SrcIP, p.DstIP, true + case !srcLocal && dstLocal: + localIP, remoteIP, egress = p.DstIP, p.SrcIP, false + case srcLocal && dstLocal: + localIP, remoteIP, egress = p.SrcIP, p.DstIP, true // loopback + case !srcLocal && !dstLocal: + log.Printf("sniffer ignoring remote-to-remote (%s -> %s) traffic", p.SrcIP, p.DstIP) + return + } + + // For sure, we can add to the address topology. + { var ( - srcNodeID = report.MakeAddressNodeID(s.hostID, p.SrcIP) - dstNodeID = report.MakeAddressNodeID(s.hostID, p.DstIP) + srcNodeID = report.MakeAddressNodeID(s.hostID, localIP) + dstNodeID = report.MakeAddressNodeID(s.hostID, remoteIP) edgeID = report.MakeEdgeID(srcNodeID, dstNodeID) srcAdjacencyID = report.MakeAdjacencyID(srcNodeID) ) + rpt.Address.NodeMetadatas[srcNodeID] = report.MakeNodeMetadata() - rpt.Address.NodeMetadatas[dstNodeID] = report.MakeNodeMetadata() emd := rpt.Address.EdgeMetadatas[edgeID] if emd.PacketCount == nil { emd.PacketCount = new(uint64) } *emd.PacketCount++ - if emd.ByteCount == nil { - emd.ByteCount = new(uint64) - } - *emd.ByteCount += uint64(p.Network) - rpt.Address.EdgeMetadatas[edgeID] = emd + if egress { + if emd.EgressByteCount == nil { + emd.EgressByteCount = new(uint64) + } + *emd.EgressByteCount += uint64(p.Network) + } else { + if emd.IngressByteCount == nil { + emd.IngressByteCount = new(uint64) + } + *emd.IngressByteCount += uint64(p.Network) + } + + rpt.Address.EdgeMetadatas[edgeID] = emd rpt.Address.Adjacency[srcAdjacencyID] = rpt.Address.Adjacency[srcAdjacencyID].Add(dstNodeID) } - // With a src and dst IP and port, we can add to the endpoints. - if p.SrcIP != "" && p.DstIP != "" && p.SrcPort != "" && p.DstPort != "" { + // If we have ports, we can add to the endpoint topology, too. + if p.SrcPort != "" && p.DstPort != "" { var ( - srcNodeID = report.MakeEndpointNodeID(s.hostID, p.SrcIP, p.SrcPort) - dstNodeID = report.MakeEndpointNodeID(s.hostID, p.DstIP, p.DstPort) + srcNodeID = report.MakeEndpointNodeID(s.hostID, localIP, p.SrcPort) + dstNodeID = report.MakeEndpointNodeID(s.hostID, remoteIP, p.DstPort) edgeID = report.MakeEdgeID(srcNodeID, dstNodeID) srcAdjacencyID = report.MakeAdjacencyID(srcNodeID) ) rpt.Endpoint.NodeMetadatas[srcNodeID] = report.MakeNodeMetadata() - rpt.Endpoint.NodeMetadatas[dstNodeID] = report.MakeNodeMetadata() emd := rpt.Endpoint.EdgeMetadatas[edgeID] if emd.PacketCount == nil { emd.PacketCount = new(uint64) } *emd.PacketCount++ - if emd.ByteCount == nil { - emd.ByteCount = new(uint64) - } - *emd.ByteCount += uint64(p.Transport) - rpt.Endpoint.EdgeMetadatas[edgeID] = emd + if egress { + if emd.EgressByteCount == nil { + emd.EgressByteCount = new(uint64) + } + *emd.EgressByteCount += uint64(p.Transport) + } else { + if emd.IngressByteCount == nil { + emd.IngressByteCount = new(uint64) + } + *emd.IngressByteCount += uint64(p.Transport) + } + + rpt.Endpoint.EdgeMetadatas[edgeID] = emd rpt.Endpoint.Adjacency[srcAdjacencyID] = rpt.Endpoint.Adjacency[srcAdjacencyID].Add(dstNodeID) } } diff --git a/probe/sniff/sniffer_internal_test.go b/probe/sniff/sniffer_internal_test.go index ed6d559a2..fc04b2245 100644 --- a/probe/sniff/sniffer_internal_test.go +++ b/probe/sniff/sniffer_internal_test.go @@ -22,8 +22,9 @@ func TestInterpolateCounts(t *testing.T) { r.Sampling.Count = samplingCount r.Sampling.Total = samplingTotal r.Endpoint.EdgeMetadatas[edgeID] = report.EdgeMetadata{ - PacketCount: newu64(packetCount), - ByteCount: newu64(byteCount), + PacketCount: newu64(packetCount), + IngressByteCount: newu64(byteCount), + EgressByteCount: newu64(byteCount), } interpolateCounts(r) @@ -37,7 +38,10 @@ func TestInterpolateCounts(t *testing.T) { if want, have := apply(packetCount), (*emd.PacketCount); want != have { t.Errorf("want %d packets, have %d", want, have) } - if want, have := apply(byteCount), (*emd.ByteCount); want != have { + if want, have := apply(byteCount), (*emd.EgressByteCount); want != have { + t.Errorf("want %d bytes, have %d", want, have) + } + if want, have := apply(byteCount), (*emd.IngressByteCount); want != have { t.Errorf("want %d bytes, have %d", want, have) } } diff --git a/probe/sniff/sniffer_test.go b/probe/sniff/sniffer_test.go index 3a686e382..6187924bc 100644 --- a/probe/sniff/sniffer_test.go +++ b/probe/sniff/sniffer_test.go @@ -2,6 +2,7 @@ package sniff_test import ( "io" + "net" "reflect" "sync" "testing" @@ -20,7 +21,7 @@ func TestSnifferShutdown(t *testing.T) { src = newMockSource([]byte{}, nil) on = time.Millisecond off = time.Millisecond - s = sniff.New(hostID, src, on, off) + s = sniff.New(hostID, report.Networks{}, src, on, off) ) // Stopping the source should terminate the sniffer. @@ -53,8 +54,11 @@ func TestMerge(t *testing.T) { Network: 512, Transport: 256, } + + _, ipnet, _ = net.ParseCIDR(p.SrcIP + "/24") // ;) + localNets = report.Networks([]*net.IPNet{ipnet}) ) - sniff.New(hostID, src, on, off).Merge(p, rpt) + sniff.New(hostID, localNets, src, on, off).Merge(p, rpt) var ( srcEndpointNodeID = report.MakeEndpointNodeID(hostID, p.SrcIP, p.SrcPort) @@ -68,13 +72,12 @@ func TestMerge(t *testing.T) { }, EdgeMetadatas: report.EdgeMetadatas{ report.MakeEdgeID(srcEndpointNodeID, dstEndpointNodeID): report.EdgeMetadata{ - PacketCount: newu64(1), - ByteCount: newu64(256), + PacketCount: newu64(1), + EgressByteCount: newu64(256), }, }, NodeMetadatas: report.NodeMetadatas{ srcEndpointNodeID: report.MakeNodeMetadata(), - dstEndpointNodeID: report.MakeNodeMetadata(), }, }), rpt.Endpoint; !reflect.DeepEqual(want, have) { t.Errorf("%s", test.Diff(want, have)) @@ -92,13 +95,12 @@ func TestMerge(t *testing.T) { }, EdgeMetadatas: report.EdgeMetadatas{ report.MakeEdgeID(srcAddressNodeID, dstAddressNodeID): report.EdgeMetadata{ - PacketCount: newu64(1), - ByteCount: newu64(512), + PacketCount: newu64(1), + EgressByteCount: newu64(512), }, }, NodeMetadatas: report.NodeMetadatas{ srcAddressNodeID: report.MakeNodeMetadata(), - dstAddressNodeID: report.MakeNodeMetadata(), }, }), rpt.Address; !reflect.DeepEqual(want, have) { t.Errorf("%s", test.Diff(want, have)) diff --git a/render/detailed_node.go b/render/detailed_node.go index c1f0def81..fc05a4ad0 100644 --- a/render/detailed_node.go +++ b/render/detailed_node.go @@ -67,8 +67,11 @@ func MakeDetailedNode(r report.Report, n RenderableNode) DetailedNode { if n.EdgeMetadata.PacketCount != nil { rows = append(rows, Row{"Packets", strconv.FormatUint(*n.EdgeMetadata.PacketCount, 10), ""}) } - if n.EdgeMetadata.ByteCount != nil { - rows = append(rows, Row{"Bytes", strconv.FormatUint(*n.EdgeMetadata.ByteCount, 10), ""}) + if n.EdgeMetadata.EgressByteCount != nil { + rows = append(rows, Row{"Egress bytes", strconv.FormatUint(*n.EdgeMetadata.EgressByteCount, 10), ""}) // TODO rate + } + if n.EdgeMetadata.IngressByteCount != nil { + rows = append(rows, Row{"Ingress bytes", strconv.FormatUint(*n.EdgeMetadata.IngressByteCount, 10), ""}) // TODO rate } if len(rows) > 0 { tables = append(tables, Table{"Connections", true, connectionsRank, rows}) diff --git a/render/detailed_node_test.go b/render/detailed_node_test.go index 0b3fe9994..606f05556 100644 --- a/render/detailed_node_test.go +++ b/render/detailed_node_test.go @@ -67,7 +67,7 @@ func TestMakeDetailedNode(t *testing.T) { Rank: 100, Rows: []render.Row{ {"Packets", "150", ""}, - {"Bytes", "1500", ""}, + {"Egress bytes", "1500", ""}, }, }, { diff --git a/render/expected/expected.go b/render/expected/expected.go index 7e3b5fd74..5d4ebc634 100644 --- a/render/expected/expected.go +++ b/render/expected/expected.go @@ -55,8 +55,8 @@ var ( ), NodeMetadata: report.MakeNodeMetadata(), EdgeMetadata: report.EdgeMetadata{ - PacketCount: newu64(100), - ByteCount: newu64(10), + PacketCount: newu64(10), + EgressByteCount: newu64(100), }, }, ClientProcess2ID: { @@ -73,8 +73,8 @@ var ( ), NodeMetadata: report.MakeNodeMetadata(), EdgeMetadata: report.EdgeMetadata{ - PacketCount: newu64(200), - ByteCount: newu64(20), + PacketCount: newu64(20), + EgressByteCount: newu64(200), }, }, ServerProcessID: { @@ -97,8 +97,8 @@ var ( ), NodeMetadata: report.MakeNodeMetadata(), EdgeMetadata: report.EdgeMetadata{ - PacketCount: newu64(150), - ByteCount: newu64(1500), + PacketCount: newu64(150), + EgressByteCount: newu64(1500), }, }, nonContainerProcessID: { @@ -137,8 +137,8 @@ var ( ), NodeMetadata: report.MakeNodeMetadata(), EdgeMetadata: report.EdgeMetadata{ - PacketCount: newu64(300), - ByteCount: newu64(30), + PacketCount: newu64(30), + EgressByteCount: newu64(300), }, }, "apache": { @@ -160,8 +160,8 @@ var ( ), NodeMetadata: report.MakeNodeMetadata(), EdgeMetadata: report.EdgeMetadata{ - PacketCount: newu64(150), - ByteCount: newu64(1500), + PacketCount: newu64(150), + EgressByteCount: newu64(1500), }, }, "bash": { @@ -200,8 +200,8 @@ var ( ), NodeMetadata: report.MakeNodeMetadata(), EdgeMetadata: report.EdgeMetadata{ - PacketCount: newu64(300), - ByteCount: newu64(30), + PacketCount: newu64(30), + EgressByteCount: newu64(300), }, }, test.ServerContainerID: { @@ -219,8 +219,8 @@ var ( ), NodeMetadata: report.MakeNodeMetadata(), EdgeMetadata: report.EdgeMetadata{ - PacketCount: newu64(150), - ByteCount: newu64(1500), + PacketCount: newu64(150), + EgressByteCount: newu64(1500), }, }, uncontainedServerID: { @@ -258,8 +258,8 @@ var ( ), NodeMetadata: report.MakeNodeMetadata(), EdgeMetadata: report.EdgeMetadata{ - PacketCount: newu64(300), - ByteCount: newu64(30), + PacketCount: newu64(30), + EgressByteCount: newu64(300), }, }, test.ServerContainerImageName: { @@ -277,8 +277,8 @@ var ( test.ServerHostNodeID), NodeMetadata: report.MakeNodeMetadata(), EdgeMetadata: report.EdgeMetadata{ - PacketCount: newu64(150), - ByteCount: newu64(1500), + PacketCount: newu64(150), + EgressByteCount: newu64(1500), }, }, uncontainedServerID: { diff --git a/render/render_test.go b/render/render_test.go index 954d9df34..0c50cbb7d 100644 --- a/render/render_test.go +++ b/render/render_test.go @@ -118,8 +118,8 @@ func TestMapEdge(t *testing.T) { ">bar": report.MakeIDList("foo"), }, EdgeMetadatas: report.EdgeMetadatas{ - "foo|bar": report.EdgeMetadata{PacketCount: newu64(1), ByteCount: newu64(2)}, - "bar|foo": report.EdgeMetadata{PacketCount: newu64(3), ByteCount: newu64(4)}, + "foo|bar": report.EdgeMetadata{PacketCount: newu64(1), EgressByteCount: newu64(2)}, + "bar|foo": report.EdgeMetadata{PacketCount: newu64(3), EgressByteCount: newu64(4)}, }, } } @@ -140,8 +140,8 @@ func TestMapEdge(t *testing.T) { } if want, have := (report.EdgeMetadata{ - PacketCount: newu64(1), - ByteCount: newu64(2), + PacketCount: newu64(1), + EgressByteCount: newu64(2), }), mapper.EdgeMetadata(report.MakeReport(), "_foo", "_bar"); !reflect.DeepEqual(want, have) { t.Errorf("want %+v, have %+v", want, have) } diff --git a/report/merge.go b/report/merge.go index 8ebd1eab2..12c053105 100644 --- a/report/merge.go +++ b/report/merge.go @@ -64,7 +64,8 @@ func (e *EdgeMetadatas) Merge(other EdgeMetadatas) { // should represent the same edge on different times. func (m *EdgeMetadata) Merge(other EdgeMetadata) { m.PacketCount = merge(m.PacketCount, other.PacketCount, sum) - m.ByteCount = merge(m.ByteCount, other.ByteCount, sum) + m.EgressByteCount = merge(m.EgressByteCount, other.EgressByteCount, sum) + m.IngressByteCount = merge(m.IngressByteCount, other.IngressByteCount, sum) m.MaxConnCountTCP = merge(m.MaxConnCountTCP, other.MaxConnCountTCP, max) } @@ -72,7 +73,8 @@ func (m *EdgeMetadata) Merge(other EdgeMetadata) { // they should represent different edges at the same time. func (m *EdgeMetadata) Flatten(other EdgeMetadata) { m.PacketCount = merge(m.PacketCount, other.PacketCount, sum) - m.ByteCount = merge(m.ByteCount, other.ByteCount, sum) + m.EgressByteCount = merge(m.EgressByteCount, other.EgressByteCount, sum) + m.IngressByteCount = merge(m.IngressByteCount, other.IngressByteCount, sum) // Note that summing of two maximums doesn't always give us the true // maximum. But it's a best effort. m.MaxConnCountTCP = merge(m.MaxConnCountTCP, other.MaxConnCountTCP, sum) diff --git a/report/merge_test.go b/report/merge_test.go index 5f3a75beb..b87ea176e 100644 --- a/report/merge_test.go +++ b/report/merge_test.go @@ -112,15 +112,13 @@ func TestMergeEdgeMetadatas(t *testing.T) { a: report.EdgeMetadatas{}, b: report.EdgeMetadatas{ "hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - PacketCount: newu64(12), - ByteCount: newu64(0), + PacketCount: newu64(1), MaxConnCountTCP: newu64(2), }, }, want: report.EdgeMetadatas{ "hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - PacketCount: newu64(12), - ByteCount: newu64(0), + PacketCount: newu64(1), MaxConnCountTCP: newu64(2), }, }, @@ -128,15 +126,15 @@ func TestMergeEdgeMetadatas(t *testing.T) { "Empty b": { a: report.EdgeMetadatas{ "hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - PacketCount: newu64(12), - ByteCount: newu64(0), + PacketCount: newu64(12), + EgressByteCount: newu64(999), }, }, b: report.EdgeMetadatas{}, want: report.EdgeMetadatas{ "hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - PacketCount: newu64(12), - ByteCount: newu64(0), + PacketCount: newu64(12), + EgressByteCount: newu64(999), }, }, }, @@ -144,26 +142,26 @@ func TestMergeEdgeMetadatas(t *testing.T) { a: report.EdgeMetadatas{ "hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ PacketCount: newu64(12), - ByteCount: newu64(0), + EgressByteCount: newu64(500), MaxConnCountTCP: newu64(4), }, }, b: report.EdgeMetadatas{ "hostQ|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ PacketCount: newu64(1), - ByteCount: newu64(2), + EgressByteCount: newu64(2), MaxConnCountTCP: newu64(6), }, }, want: report.EdgeMetadatas{ "hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ PacketCount: newu64(12), - ByteCount: newu64(0), + EgressByteCount: newu64(500), MaxConnCountTCP: newu64(4), }, "hostQ|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ PacketCount: newu64(1), - ByteCount: newu64(2), + EgressByteCount: newu64(2), MaxConnCountTCP: newu64(6), }, }, @@ -172,22 +170,24 @@ func TestMergeEdgeMetadatas(t *testing.T) { a: report.EdgeMetadatas{ "hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ PacketCount: newu64(12), - ByteCount: newu64(0), + EgressByteCount: newu64(1000), MaxConnCountTCP: newu64(7), }, }, b: report.EdgeMetadatas{ "hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - PacketCount: newu64(1), - ByteCount: newu64(2), - MaxConnCountTCP: newu64(9), + PacketCount: newu64(1), + IngressByteCount: newu64(123), + EgressByteCount: newu64(2), + MaxConnCountTCP: newu64(9), }, }, want: report.EdgeMetadatas{ "hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - PacketCount: newu64(13), - ByteCount: newu64(2), - MaxConnCountTCP: newu64(9), + PacketCount: newu64(13), + IngressByteCount: newu64(123), + EgressByteCount: newu64(1002), + MaxConnCountTCP: newu64(9), }, }, }, diff --git a/report/networks.go b/report/networks.go index 4fe5f6fe4..a1aa9e6e4 100644 --- a/report/networks.go +++ b/report/networks.go @@ -12,12 +12,11 @@ type Interface interface { Addrs() ([]net.Addr, error) } -// Variables exposed for testing +// Variables exposed for testing. +// TODO this design is broken, make it consistent with probe networks. var ( LocalNetworks = Networks{} - InterfaceByNameStub = func(name string) (Interface, error) { - return net.InterfaceByName(name) - } + InterfaceByNameStub = func(name string) (Interface, error) { return net.InterfaceByName(name) } ) // Contains returns true if IP is in Networks. diff --git a/report/topology.go b/report/topology.go index 9b9a5fed2..0327f35ad 100644 --- a/report/topology.go +++ b/report/topology.go @@ -31,9 +31,10 @@ type NodeMetadatas map[string]NodeMetadata // EdgeMetadata describes a superset of the metadata that probes can possibly // collect about a directed edge between two nodes in any topology. type EdgeMetadata struct { - PacketCount *uint64 `json:"packet_count,omitempty"` - ByteCount *uint64 `json:"byte_count,omitempty"` - MaxConnCountTCP *uint64 `json:"max_conn_count_tcp,omitempty"` + PacketCount *uint64 `json:"packet_count,omitempty"` + EgressByteCount *uint64 `json:"ingress_byte_count,omitempty"` + IngressByteCount *uint64 `json:"egress_byte_count,omitempty"` + MaxConnCountTCP *uint64 `json:"max_conn_count_tcp,omitempty"` } // NodeMetadata describes a superset of the metadata that probes can collect diff --git a/test/report_fixture.go b/test/report_fixture.go index b48fcd8dc..c949a2ae1 100644 --- a/test/report_fixture.go +++ b/test/report_fixture.go @@ -108,33 +108,33 @@ var ( }, EdgeMetadatas: report.EdgeMetadatas{ report.MakeEdgeID(Client54001NodeID, Server80NodeID): report.EdgeMetadata{ - PacketCount: newu64(100), - ByteCount: newu64(10), + PacketCount: newu64(10), + EgressByteCount: newu64(100), }, report.MakeEdgeID(Client54002NodeID, Server80NodeID): report.EdgeMetadata{ - PacketCount: newu64(200), - ByteCount: newu64(20), + PacketCount: newu64(20), + EgressByteCount: newu64(200), }, report.MakeEdgeID(Server80NodeID, Client54001NodeID): report.EdgeMetadata{ - PacketCount: newu64(10), - ByteCount: newu64(100), + PacketCount: newu64(10), + EgressByteCount: newu64(100), }, report.MakeEdgeID(Server80NodeID, Client54002NodeID): report.EdgeMetadata{ - PacketCount: newu64(20), - ByteCount: newu64(200), + PacketCount: newu64(20), + EgressByteCount: newu64(200), }, report.MakeEdgeID(Server80NodeID, UnknownClient1NodeID): report.EdgeMetadata{ - PacketCount: newu64(30), - ByteCount: newu64(300), + PacketCount: newu64(30), + EgressByteCount: newu64(300), }, report.MakeEdgeID(Server80NodeID, UnknownClient2NodeID): report.EdgeMetadata{ - PacketCount: newu64(40), - ByteCount: newu64(400), + PacketCount: newu64(40), + EgressByteCount: newu64(400), }, report.MakeEdgeID(Server80NodeID, UnknownClient3NodeID): report.EdgeMetadata{ - PacketCount: newu64(50), - ByteCount: newu64(500), + PacketCount: newu64(50), + EgressByteCount: newu64(500), }, }, }, From 0361b11b87f066c641d3bbe2448571bfe64f13cc Mon Sep 17 00:00:00 2001 From: Peter Bourgon Date: Mon, 3 Aug 2015 12:25:56 +0200 Subject: [PATCH 5/8] Fix bugs in how we report bandwidth --- probe/sniff/sniffer.go | 44 ++++++++++++++++++++++++------------------ render/render.go | 1 + report/topology.go | 4 ++-- 3 files changed, 28 insertions(+), 21 deletions(-) diff --git a/probe/sniff/sniffer.go b/probe/sniff/sniffer.go index 91dff56c5..e7d2c00bf 100644 --- a/probe/sniff/sniffer.go +++ b/probe/sniff/sniffer.go @@ -145,7 +145,6 @@ type Packet struct { func (s *Sniffer) read(src gopacket.ZeroCopyPacketDataSource, dst chan Packet, process, total, count *uint64) { var ( - p Packet data []byte err error ) @@ -167,6 +166,7 @@ func (s *Sniffer) read(src gopacket.ZeroCopyPacketDataSource, dst chan Packet, p // We'll always get an error when we encounter a layer type for // which we haven't configured a decoder. } + var p Packet for _, t := range s.decoded { switch t { case layers.LayerTypeEthernet: @@ -178,16 +178,16 @@ func (s *Sniffer) read(src gopacket.ZeroCopyPacketDataSource, dst chan Packet, p case layers.LayerTypeICMPv6: p.Network += len(s.icmp6.Payload) - case layers.LayerTypeIPv6: - p.SrcIP = s.ip6.SrcIP.String() - p.DstIP = s.ip6.DstIP.String() - p.Network += len(s.ip6.Payload) - case layers.LayerTypeIPv4: p.SrcIP = s.ip4.SrcIP.String() p.DstIP = s.ip4.DstIP.String() p.Network += len(s.ip4.Payload) + case layers.LayerTypeIPv6: + p.SrcIP = s.ip6.SrcIP.String() + p.DstIP = s.ip6.DstIP.String() + p.Network += len(s.ip6.Payload) + case layers.LayerTypeTCP: p.SrcPort = strconv.Itoa(int(s.tcp.SrcPort)) p.DstPort = strconv.Itoa(int(s.tcp.DstPort)) @@ -199,7 +199,6 @@ func (s *Sniffer) read(src gopacket.ZeroCopyPacketDataSource, dst chan Packet, p p.Transport += len(s.udp.Payload) } } - select { case dst <- p: atomic.AddUint64(count, 1) @@ -228,19 +227,21 @@ func (s *Sniffer) Merge(p Packet, rpt report.Report) { // anywhere. But that will have ramifications throughout Scope (read: it // may violate implicit invariants) and needs to be thought through. var ( - srcLocal = s.localNets.Contains(net.ParseIP(p.SrcIP)) - dstLocal = s.localNets.Contains(net.ParseIP(p.DstIP)) - localIP string - remoteIP string - egress bool + srcLocal = s.localNets.Contains(net.ParseIP(p.SrcIP)) + dstLocal = s.localNets.Contains(net.ParseIP(p.DstIP)) + localIP string + remoteIP string + localPort string + remotePort string + egress bool ) switch { case srcLocal && !dstLocal: - localIP, remoteIP, egress = p.SrcIP, p.DstIP, true + localIP, localPort, remoteIP, remotePort, egress = p.SrcIP, p.SrcPort, p.DstIP, p.DstPort, true case !srcLocal && dstLocal: - localIP, remoteIP, egress = p.DstIP, p.SrcIP, false + localIP, localPort, remoteIP, remotePort, egress = p.DstIP, p.DstPort, p.SrcIP, p.SrcPort, false case srcLocal && dstLocal: - localIP, remoteIP, egress = p.SrcIP, p.DstIP, true // loopback + localIP, localPort, remoteIP, remotePort, egress = p.SrcIP, p.SrcPort, p.DstIP, p.DstPort, true // loopback case !srcLocal && !dstLocal: log.Printf("sniffer ignoring remote-to-remote (%s -> %s) traffic", p.SrcIP, p.DstIP) return @@ -255,7 +256,9 @@ func (s *Sniffer) Merge(p Packet, rpt report.Report) { srcAdjacencyID = report.MakeAdjacencyID(srcNodeID) ) - rpt.Address.NodeMetadatas[srcNodeID] = report.MakeNodeMetadata() + if _, ok := rpt.Address.NodeMetadatas[srcNodeID]; !ok { + rpt.Address.NodeMetadatas[srcNodeID] = report.MakeNodeMetadata() + } emd := rpt.Address.EdgeMetadatas[edgeID] if emd.PacketCount == nil { @@ -282,12 +285,15 @@ func (s *Sniffer) Merge(p Packet, rpt report.Report) { // If we have ports, we can add to the endpoint topology, too. if p.SrcPort != "" && p.DstPort != "" { var ( - srcNodeID = report.MakeEndpointNodeID(s.hostID, localIP, p.SrcPort) - dstNodeID = report.MakeEndpointNodeID(s.hostID, remoteIP, p.DstPort) + srcNodeID = report.MakeEndpointNodeID(s.hostID, localIP, localPort) + dstNodeID = report.MakeEndpointNodeID(s.hostID, remoteIP, remotePort) edgeID = report.MakeEdgeID(srcNodeID, dstNodeID) srcAdjacencyID = report.MakeAdjacencyID(srcNodeID) ) - rpt.Endpoint.NodeMetadatas[srcNodeID] = report.MakeNodeMetadata() + + if _, ok := rpt.Endpoint.NodeMetadatas[srcNodeID]; !ok { + rpt.Endpoint.NodeMetadatas[srcNodeID] = report.MakeNodeMetadata() + } emd := rpt.Endpoint.EdgeMetadatas[edgeID] if emd.PacketCount == nil { diff --git a/render/render.go b/render/render.go index 86daa7ed9..e80de1781 100644 --- a/render/render.go +++ b/render/render.go @@ -207,6 +207,7 @@ func (m LeafMap) Render(rpt report.Report) RenderableNodes { if md, ok := t.EdgeMetadatas[edgeID]; ok { srcRenderableNode.EdgeMetadata.Merge(md) } + } nodes[srcRenderableID] = srcRenderableNode diff --git a/report/topology.go b/report/topology.go index 0327f35ad..c6f28e23f 100644 --- a/report/topology.go +++ b/report/topology.go @@ -32,8 +32,8 @@ type NodeMetadatas map[string]NodeMetadata // collect about a directed edge between two nodes in any topology. type EdgeMetadata struct { PacketCount *uint64 `json:"packet_count,omitempty"` - EgressByteCount *uint64 `json:"ingress_byte_count,omitempty"` - IngressByteCount *uint64 `json:"egress_byte_count,omitempty"` + EgressByteCount *uint64 `json:"egress_byte_count,omitempty"` + IngressByteCount *uint64 `json:"ingress_byte_count,omitempty"` MaxConnCountTCP *uint64 `json:"max_conn_count_tcp,omitempty"` } From e1f7752a341e10a100dd1aa232f8ec66b98ded00 Mon Sep 17 00:00:00 2001 From: Peter Bourgon Date: Mon, 3 Aug 2015 14:58:41 +0200 Subject: [PATCH 6/8] Split PacketCount to Egress and Ingress Also, 1 packet may be counted in N topologies, so you can't rely on the sum of all packet counts across topologies having any relation to the sampling data. --- app/api_topology_test.go | 4 +- probe/sniff/sniffer.go | 31 +++++++++----- probe/sniff/sniffer_internal_test.go | 12 ++++-- probe/sniff/sniffer_test.go | 8 ++-- render/detailed_node.go | 7 +++- render/detailed_node_test.go | 2 +- render/expected/expected.go | 36 ++++++++-------- render/render_test.go | 14 +++---- report/merge.go | 9 ++-- report/merge_test.go | 62 ++++++++++++++-------------- report/report.go | 20 ++++----- report/topology.go | 9 ++-- test/report_fixture.go | 28 ++++++------- 13 files changed, 131 insertions(+), 111 deletions(-) diff --git a/app/api_topology_test.go b/app/api_topology_test.go index 61fa8fa84..0cc249faf 100644 --- a/app/api_topology_test.go +++ b/app/api_topology_test.go @@ -96,8 +96,8 @@ func TestAPITopologyApplications(t *testing.T) { t.Fatalf("JSON parse error: %s", err) } if want, have := (report.EdgeMetadata{ - PacketCount: newu64(10), - EgressByteCount: newu64(100), + EgressPacketCount: newu64(10), + EgressByteCount: newu64(100), }), edge.Metadata; !reflect.DeepEqual(want, have) { t.Error(test.Diff(want, have)) } diff --git a/probe/sniff/sniffer.go b/probe/sniff/sniffer.go index e7d2c00bf..70209f7e8 100644 --- a/probe/sniff/sniffer.go +++ b/probe/sniff/sniffer.go @@ -119,8 +119,11 @@ func interpolateCounts(r report.Report) { factor := 1.0 / rate for _, topology := range r.Topologies() { for _, emd := range topology.EdgeMetadatas { - if emd.PacketCount != nil { - *emd.PacketCount = uint64(float64(*emd.PacketCount) * factor) + if emd.EgressPacketCount != nil { + *emd.EgressPacketCount = uint64(float64(*emd.EgressPacketCount) * factor) + } + if emd.IngressPacketCount != nil { + *emd.IngressPacketCount = uint64(float64(*emd.IngressPacketCount) * factor) } if emd.EgressByteCount != nil { *emd.EgressByteCount = uint64(float64(*emd.EgressByteCount) * factor) @@ -261,17 +264,21 @@ func (s *Sniffer) Merge(p Packet, rpt report.Report) { } emd := rpt.Address.EdgeMetadatas[edgeID] - if emd.PacketCount == nil { - emd.PacketCount = new(uint64) - } - *emd.PacketCount++ if egress { + if emd.EgressPacketCount == nil { + emd.EgressPacketCount = new(uint64) + } + *emd.EgressPacketCount++ if emd.EgressByteCount == nil { emd.EgressByteCount = new(uint64) } *emd.EgressByteCount += uint64(p.Network) } else { + if emd.IngressPacketCount == nil { + emd.IngressPacketCount = new(uint64) + } + *emd.IngressPacketCount++ if emd.IngressByteCount == nil { emd.IngressByteCount = new(uint64) } @@ -296,17 +303,21 @@ func (s *Sniffer) Merge(p Packet, rpt report.Report) { } emd := rpt.Endpoint.EdgeMetadatas[edgeID] - if emd.PacketCount == nil { - emd.PacketCount = new(uint64) - } - *emd.PacketCount++ if egress { + if emd.EgressPacketCount == nil { + emd.EgressPacketCount = new(uint64) + } + *emd.EgressPacketCount++ if emd.EgressByteCount == nil { emd.EgressByteCount = new(uint64) } *emd.EgressByteCount += uint64(p.Transport) } else { + if emd.IngressPacketCount == nil { + emd.IngressPacketCount = new(uint64) + } + *emd.IngressPacketCount++ if emd.IngressByteCount == nil { emd.IngressByteCount = new(uint64) } diff --git a/probe/sniff/sniffer_internal_test.go b/probe/sniff/sniffer_internal_test.go index fc04b2245..72f78bf44 100644 --- a/probe/sniff/sniffer_internal_test.go +++ b/probe/sniff/sniffer_internal_test.go @@ -22,9 +22,10 @@ func TestInterpolateCounts(t *testing.T) { r.Sampling.Count = samplingCount r.Sampling.Total = samplingTotal r.Endpoint.EdgeMetadatas[edgeID] = report.EdgeMetadata{ - PacketCount: newu64(packetCount), - IngressByteCount: newu64(byteCount), - EgressByteCount: newu64(byteCount), + EgressPacketCount: newu64(packetCount), + IngressPacketCount: newu64(packetCount), + EgressByteCount: newu64(byteCount), + IngressByteCount: newu64(byteCount), } interpolateCounts(r) @@ -35,7 +36,10 @@ func TestInterpolateCounts(t *testing.T) { apply = func(v uint64) uint64 { return uint64(factor * float64(v)) } emd = r.Endpoint.EdgeMetadatas[edgeID] ) - if want, have := apply(packetCount), (*emd.PacketCount); want != have { + if want, have := apply(packetCount), (*emd.EgressPacketCount); want != have { + t.Errorf("want %d packets, have %d", want, have) + } + if want, have := apply(packetCount), (*emd.IngressPacketCount); want != have { t.Errorf("want %d packets, have %d", want, have) } if want, have := apply(byteCount), (*emd.EgressByteCount); want != have { diff --git a/probe/sniff/sniffer_test.go b/probe/sniff/sniffer_test.go index 6187924bc..d939ee5ee 100644 --- a/probe/sniff/sniffer_test.go +++ b/probe/sniff/sniffer_test.go @@ -72,8 +72,8 @@ func TestMerge(t *testing.T) { }, EdgeMetadatas: report.EdgeMetadatas{ report.MakeEdgeID(srcEndpointNodeID, dstEndpointNodeID): report.EdgeMetadata{ - PacketCount: newu64(1), - EgressByteCount: newu64(256), + EgressPacketCount: newu64(1), + EgressByteCount: newu64(256), }, }, NodeMetadatas: report.NodeMetadatas{ @@ -95,8 +95,8 @@ func TestMerge(t *testing.T) { }, EdgeMetadatas: report.EdgeMetadatas{ report.MakeEdgeID(srcAddressNodeID, dstAddressNodeID): report.EdgeMetadata{ - PacketCount: newu64(1), - EgressByteCount: newu64(512), + EgressPacketCount: newu64(1), + EgressByteCount: newu64(512), }, }, NodeMetadatas: report.NodeMetadatas{ diff --git a/render/detailed_node.go b/render/detailed_node.go index fc05a4ad0..29e872bc4 100644 --- a/render/detailed_node.go +++ b/render/detailed_node.go @@ -64,8 +64,11 @@ func MakeDetailedNode(r report.Report, n RenderableNode) DetailedNode { if n.EdgeMetadata.MaxConnCountTCP != nil { rows = append(rows, Row{"TCP connections", strconv.FormatUint(*n.EdgeMetadata.MaxConnCountTCP, 10), ""}) } - if n.EdgeMetadata.PacketCount != nil { - rows = append(rows, Row{"Packets", strconv.FormatUint(*n.EdgeMetadata.PacketCount, 10), ""}) + if n.EdgeMetadata.EgressPacketCount != nil { + rows = append(rows, Row{"Egress packets", strconv.FormatUint(*n.EdgeMetadata.EgressPacketCount, 10), ""}) + } + if n.EdgeMetadata.IngressPacketCount != nil { + rows = append(rows, Row{"Ingress packets", strconv.FormatUint(*n.EdgeMetadata.IngressPacketCount, 10), ""}) } if n.EdgeMetadata.EgressByteCount != nil { rows = append(rows, Row{"Egress bytes", strconv.FormatUint(*n.EdgeMetadata.EgressByteCount, 10), ""}) // TODO rate diff --git a/render/detailed_node_test.go b/render/detailed_node_test.go index 606f05556..a0b84a85e 100644 --- a/render/detailed_node_test.go +++ b/render/detailed_node_test.go @@ -66,7 +66,7 @@ func TestMakeDetailedNode(t *testing.T) { Numeric: true, Rank: 100, Rows: []render.Row{ - {"Packets", "150", ""}, + {"Egress packets", "150", ""}, {"Egress bytes", "1500", ""}, }, }, diff --git a/render/expected/expected.go b/render/expected/expected.go index 5d4ebc634..8928bbdc0 100644 --- a/render/expected/expected.go +++ b/render/expected/expected.go @@ -55,8 +55,8 @@ var ( ), NodeMetadata: report.MakeNodeMetadata(), EdgeMetadata: report.EdgeMetadata{ - PacketCount: newu64(10), - EgressByteCount: newu64(100), + EgressPacketCount: newu64(10), + EgressByteCount: newu64(100), }, }, ClientProcess2ID: { @@ -73,8 +73,8 @@ var ( ), NodeMetadata: report.MakeNodeMetadata(), EdgeMetadata: report.EdgeMetadata{ - PacketCount: newu64(20), - EgressByteCount: newu64(200), + EgressPacketCount: newu64(20), + EgressByteCount: newu64(200), }, }, ServerProcessID: { @@ -97,8 +97,8 @@ var ( ), NodeMetadata: report.MakeNodeMetadata(), EdgeMetadata: report.EdgeMetadata{ - PacketCount: newu64(150), - EgressByteCount: newu64(1500), + EgressPacketCount: newu64(150), + EgressByteCount: newu64(1500), }, }, nonContainerProcessID: { @@ -137,8 +137,8 @@ var ( ), NodeMetadata: report.MakeNodeMetadata(), EdgeMetadata: report.EdgeMetadata{ - PacketCount: newu64(30), - EgressByteCount: newu64(300), + EgressPacketCount: newu64(30), + EgressByteCount: newu64(300), }, }, "apache": { @@ -160,8 +160,8 @@ var ( ), NodeMetadata: report.MakeNodeMetadata(), EdgeMetadata: report.EdgeMetadata{ - PacketCount: newu64(150), - EgressByteCount: newu64(1500), + EgressPacketCount: newu64(150), + EgressByteCount: newu64(1500), }, }, "bash": { @@ -200,8 +200,8 @@ var ( ), NodeMetadata: report.MakeNodeMetadata(), EdgeMetadata: report.EdgeMetadata{ - PacketCount: newu64(30), - EgressByteCount: newu64(300), + EgressPacketCount: newu64(30), + EgressByteCount: newu64(300), }, }, test.ServerContainerID: { @@ -219,8 +219,8 @@ var ( ), NodeMetadata: report.MakeNodeMetadata(), EdgeMetadata: report.EdgeMetadata{ - PacketCount: newu64(150), - EgressByteCount: newu64(1500), + EgressPacketCount: newu64(150), + EgressByteCount: newu64(1500), }, }, uncontainedServerID: { @@ -258,8 +258,8 @@ var ( ), NodeMetadata: report.MakeNodeMetadata(), EdgeMetadata: report.EdgeMetadata{ - PacketCount: newu64(30), - EgressByteCount: newu64(300), + EgressPacketCount: newu64(30), + EgressByteCount: newu64(300), }, }, test.ServerContainerImageName: { @@ -277,8 +277,8 @@ var ( test.ServerHostNodeID), NodeMetadata: report.MakeNodeMetadata(), EdgeMetadata: report.EdgeMetadata{ - PacketCount: newu64(150), - EgressByteCount: newu64(1500), + EgressPacketCount: newu64(150), + EgressByteCount: newu64(1500), }, }, uncontainedServerID: { diff --git a/render/render_test.go b/render/render_test.go index 0c50cbb7d..03a9ae232 100644 --- a/render/render_test.go +++ b/render/render_test.go @@ -36,11 +36,11 @@ func TestReduceRender(t *testing.T) { func TestReduceEdge(t *testing.T) { renderer := render.Reduce([]render.Renderer{ - mockRenderer{edgeMetadata: report.EdgeMetadata{PacketCount: newu64(1)}}, - mockRenderer{edgeMetadata: report.EdgeMetadata{PacketCount: newu64(2)}}, + mockRenderer{edgeMetadata: report.EdgeMetadata{EgressPacketCount: newu64(1)}}, + mockRenderer{edgeMetadata: report.EdgeMetadata{EgressPacketCount: newu64(2)}}, }) - want := report.EdgeMetadata{PacketCount: newu64(3)} + want := report.EdgeMetadata{EgressPacketCount: newu64(3)} have := renderer.EdgeMetadata(report.MakeReport(), "", "") if !reflect.DeepEqual(want, have) { @@ -118,8 +118,8 @@ func TestMapEdge(t *testing.T) { ">bar": report.MakeIDList("foo"), }, EdgeMetadatas: report.EdgeMetadatas{ - "foo|bar": report.EdgeMetadata{PacketCount: newu64(1), EgressByteCount: newu64(2)}, - "bar|foo": report.EdgeMetadata{PacketCount: newu64(3), EgressByteCount: newu64(4)}, + "foo|bar": report.EdgeMetadata{EgressPacketCount: newu64(1), EgressByteCount: newu64(2)}, + "bar|foo": report.EdgeMetadata{EgressPacketCount: newu64(3), EgressByteCount: newu64(4)}, }, } } @@ -140,8 +140,8 @@ func TestMapEdge(t *testing.T) { } if want, have := (report.EdgeMetadata{ - PacketCount: newu64(1), - EgressByteCount: newu64(2), + EgressPacketCount: newu64(1), + EgressByteCount: newu64(2), }), mapper.EdgeMetadata(report.MakeReport(), "_foo", "_bar"); !reflect.DeepEqual(want, have) { t.Errorf("want %+v, have %+v", want, have) } diff --git a/report/merge.go b/report/merge.go index 12c053105..9ccf99518 100644 --- a/report/merge.go +++ b/report/merge.go @@ -3,7 +3,8 @@ package report // Merge functions for all topology datatypes. The general semantics are that // the receiver is modified, and what's merged in isn't. -// Merge merges another Report into the receiver. +// Merge merges another Report into the receiver. Pass addWindows true if the +// reports represent distinct (non-overlapping) periods of time. func (r *Report) Merge(other Report) { r.Endpoint.Merge(other.Endpoint) r.Address.Merge(other.Address) @@ -63,7 +64,8 @@ func (e *EdgeMetadatas) Merge(other EdgeMetadatas) { // Merge merges another EdgeMetadata into the receiver. The two edge metadatas // should represent the same edge on different times. func (m *EdgeMetadata) Merge(other EdgeMetadata) { - m.PacketCount = merge(m.PacketCount, other.PacketCount, sum) + m.EgressPacketCount = merge(m.EgressPacketCount, other.EgressPacketCount, sum) + m.IngressPacketCount = merge(m.IngressPacketCount, other.IngressPacketCount, sum) m.EgressByteCount = merge(m.EgressByteCount, other.EgressByteCount, sum) m.IngressByteCount = merge(m.IngressByteCount, other.IngressByteCount, sum) m.MaxConnCountTCP = merge(m.MaxConnCountTCP, other.MaxConnCountTCP, max) @@ -72,7 +74,8 @@ func (m *EdgeMetadata) Merge(other EdgeMetadata) { // Flatten sums two EdgeMetadatas. Their windows should be the same duration; // they should represent different edges at the same time. func (m *EdgeMetadata) Flatten(other EdgeMetadata) { - m.PacketCount = merge(m.PacketCount, other.PacketCount, sum) + m.EgressPacketCount = merge(m.EgressPacketCount, other.EgressPacketCount, sum) + m.IngressPacketCount = merge(m.IngressPacketCount, other.IngressPacketCount, sum) m.EgressByteCount = merge(m.EgressByteCount, other.EgressByteCount, sum) m.IngressByteCount = merge(m.IngressByteCount, other.IngressByteCount, sum) // Note that summing of two maximums doesn't always give us the true diff --git a/report/merge_test.go b/report/merge_test.go index b87ea176e..1426ea17f 100644 --- a/report/merge_test.go +++ b/report/merge_test.go @@ -112,82 +112,82 @@ func TestMergeEdgeMetadatas(t *testing.T) { a: report.EdgeMetadatas{}, b: report.EdgeMetadatas{ "hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - PacketCount: newu64(1), - MaxConnCountTCP: newu64(2), + EgressPacketCount: newu64(1), + MaxConnCountTCP: newu64(2), }, }, want: report.EdgeMetadatas{ "hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - PacketCount: newu64(1), - MaxConnCountTCP: newu64(2), + EgressPacketCount: newu64(1), + MaxConnCountTCP: newu64(2), }, }, }, "Empty b": { a: report.EdgeMetadatas{ "hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - PacketCount: newu64(12), - EgressByteCount: newu64(999), + EgressPacketCount: newu64(12), + EgressByteCount: newu64(999), }, }, b: report.EdgeMetadatas{}, want: report.EdgeMetadatas{ "hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - PacketCount: newu64(12), - EgressByteCount: newu64(999), + EgressPacketCount: newu64(12), + EgressByteCount: newu64(999), }, }, }, "Host merge": { a: report.EdgeMetadatas{ "hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - PacketCount: newu64(12), - EgressByteCount: newu64(500), - MaxConnCountTCP: newu64(4), + EgressPacketCount: newu64(12), + EgressByteCount: newu64(500), + MaxConnCountTCP: newu64(4), }, }, b: report.EdgeMetadatas{ "hostQ|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - PacketCount: newu64(1), - EgressByteCount: newu64(2), - MaxConnCountTCP: newu64(6), + EgressPacketCount: newu64(1), + EgressByteCount: newu64(2), + MaxConnCountTCP: newu64(6), }, }, want: report.EdgeMetadatas{ "hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - PacketCount: newu64(12), - EgressByteCount: newu64(500), - MaxConnCountTCP: newu64(4), + EgressPacketCount: newu64(12), + EgressByteCount: newu64(500), + MaxConnCountTCP: newu64(4), }, "hostQ|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - PacketCount: newu64(1), - EgressByteCount: newu64(2), - MaxConnCountTCP: newu64(6), + EgressPacketCount: newu64(1), + EgressByteCount: newu64(2), + MaxConnCountTCP: newu64(6), }, }, }, "Edge merge": { a: report.EdgeMetadatas{ "hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - PacketCount: newu64(12), - EgressByteCount: newu64(1000), - MaxConnCountTCP: newu64(7), + EgressPacketCount: newu64(12), + EgressByteCount: newu64(1000), + MaxConnCountTCP: newu64(7), }, }, b: report.EdgeMetadatas{ "hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - PacketCount: newu64(1), - IngressByteCount: newu64(123), - EgressByteCount: newu64(2), - MaxConnCountTCP: newu64(9), + EgressPacketCount: newu64(1), + IngressByteCount: newu64(123), + EgressByteCount: newu64(2), + MaxConnCountTCP: newu64(9), }, }, want: report.EdgeMetadatas{ "hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - PacketCount: newu64(13), - IngressByteCount: newu64(123), - EgressByteCount: newu64(1002), - MaxConnCountTCP: newu64(9), + EgressPacketCount: newu64(13), + IngressByteCount: newu64(123), + EgressByteCount: newu64(1002), + MaxConnCountTCP: newu64(9), }, }, }, diff --git a/report/report.go b/report/report.go index 797bd11f1..152760bc2 100644 --- a/report/report.go +++ b/report/report.go @@ -1,6 +1,9 @@ package report -import "fmt" +import ( + "fmt" + "strings" +) // Report is the core data type. It's produced by probes, and consumed and // stored by apps. It's composed of multiple topologies, each representing @@ -72,22 +75,17 @@ func (r Report) Topologies() []Topology { // Validate checks the report for various inconsistencies. func (r Report) Validate() error { - var packets uint64 + var errs []string for _, topology := range r.Topologies() { if err := topology.Validate(); err != nil { - return err - } - for _, emd := range topology.EdgeMetadatas { - if emd.PacketCount != nil { - packets += *emd.PacketCount - } + errs = append(errs, err.Error()) } } if r.Sampling.Count > r.Sampling.Total { - return fmt.Errorf("sampling count (%d) bigger than total (%d)", r.Sampling.Count, r.Sampling.Total) + errs = append(errs, fmt.Sprintf("sampling count (%d) bigger than total (%d)", r.Sampling.Count, r.Sampling.Total)) } - if packets > 0 && (r.Sampling.Count == 0 || r.Sampling.Total == 0) { - return fmt.Errorf("packets exist in EdgeMetadata, but no sampling count or total in the base report") + if len(errs) > 0 { + return fmt.Errorf("%d error(s): %s", len(errs), strings.Join(errs, "; ")) } return nil } diff --git a/report/topology.go b/report/topology.go index c6f28e23f..2ca9b3e0a 100644 --- a/report/topology.go +++ b/report/topology.go @@ -31,10 +31,11 @@ type NodeMetadatas map[string]NodeMetadata // EdgeMetadata describes a superset of the metadata that probes can possibly // collect about a directed edge between two nodes in any topology. type EdgeMetadata struct { - PacketCount *uint64 `json:"packet_count,omitempty"` - EgressByteCount *uint64 `json:"egress_byte_count,omitempty"` - IngressByteCount *uint64 `json:"ingress_byte_count,omitempty"` - MaxConnCountTCP *uint64 `json:"max_conn_count_tcp,omitempty"` + EgressPacketCount *uint64 `json:"egress_packet_count,omitempty"` + IngressPacketCount *uint64 `json:"ingress_packet_count,omitempty"` + EgressByteCount *uint64 `json:"egress_byte_count,omitempty"` // Transport layer + IngressByteCount *uint64 `json:"ingress_byte_count,omitempty"` // Transport layer + MaxConnCountTCP *uint64 `json:"max_conn_count_tcp,omitempty"` } // NodeMetadata describes a superset of the metadata that probes can collect diff --git a/test/report_fixture.go b/test/report_fixture.go index c949a2ae1..a8a7d0f51 100644 --- a/test/report_fixture.go +++ b/test/report_fixture.go @@ -108,33 +108,33 @@ var ( }, EdgeMetadatas: report.EdgeMetadatas{ report.MakeEdgeID(Client54001NodeID, Server80NodeID): report.EdgeMetadata{ - PacketCount: newu64(10), - EgressByteCount: newu64(100), + EgressPacketCount: newu64(10), + EgressByteCount: newu64(100), }, report.MakeEdgeID(Client54002NodeID, Server80NodeID): report.EdgeMetadata{ - PacketCount: newu64(20), - EgressByteCount: newu64(200), + EgressPacketCount: newu64(20), + EgressByteCount: newu64(200), }, report.MakeEdgeID(Server80NodeID, Client54001NodeID): report.EdgeMetadata{ - PacketCount: newu64(10), - EgressByteCount: newu64(100), + EgressPacketCount: newu64(10), + EgressByteCount: newu64(100), }, report.MakeEdgeID(Server80NodeID, Client54002NodeID): report.EdgeMetadata{ - PacketCount: newu64(20), - EgressByteCount: newu64(200), + EgressPacketCount: newu64(20), + EgressByteCount: newu64(200), }, report.MakeEdgeID(Server80NodeID, UnknownClient1NodeID): report.EdgeMetadata{ - PacketCount: newu64(30), - EgressByteCount: newu64(300), + EgressPacketCount: newu64(30), + EgressByteCount: newu64(300), }, report.MakeEdgeID(Server80NodeID, UnknownClient2NodeID): report.EdgeMetadata{ - PacketCount: newu64(40), - EgressByteCount: newu64(400), + EgressPacketCount: newu64(40), + EgressByteCount: newu64(400), }, report.MakeEdgeID(Server80NodeID, UnknownClient3NodeID): report.EdgeMetadata{ - PacketCount: newu64(50), - EgressByteCount: newu64(500), + EgressPacketCount: newu64(50), + EgressByteCount: newu64(500), }, }, }, From 0dafad763fa0c754d8d13fb8f62af930724aa05b Mon Sep 17 00:00:00 2001 From: Peter Bourgon Date: Mon, 3 Aug 2015 16:04:06 +0200 Subject: [PATCH 7/8] Calculate rates in detailed nodes --- app/report_lifo.go | 5 +++++ probe/main.go | 3 +-- render/detailed_node.go | 39 ++++++++++++++++++++++++++++-------- render/detailed_node_test.go | 4 ++-- report/merge.go | 1 + report/report.go | 10 +++++++++ test/report_fixture.go | 3 +++ 7 files changed, 53 insertions(+), 12 deletions(-) diff --git a/app/report_lifo.go b/app/report_lifo.go index c1c580585..31824ef11 100644 --- a/app/report_lifo.go +++ b/app/report_lifo.go @@ -51,9 +51,14 @@ func NewReportLIFO(r reporter, maxAge time.Duration) *ReportLIFO { case req := <-l.requests: // Request for the current report. report := report.MakeReport() + oldest := time.Now() for _, r := range l.reports { + if r.Timestamp.Before(oldest) { + oldest = r.Timestamp + } report.Merge(r.Report) } + report.Window = time.Now().Sub(oldest) req <- report case q := <-l.quit: diff --git a/probe/main.go b/probe/main.go index 29f7626d1..6b4e4f979 100644 --- a/probe/main.go +++ b/probe/main.go @@ -153,6 +153,7 @@ func main() { select { case <-pubTick: publishTicks.WithLabelValues().Add(1) + r.Window = *publishInterval publisher.Publish(r) r = report.MakeReport() @@ -160,7 +161,6 @@ func main() { if err := processCache.Update(); err != nil { log.Printf("error reading processes: %v", err) } - for _, reporter := range reporters { newReport, err := reporter.Report() if err != nil { @@ -168,7 +168,6 @@ func main() { } r.Merge(newReport) } - r = Apply(r, taggers) case <-quit: diff --git a/render/detailed_node.go b/render/detailed_node.go index 29e872bc4..b739eb104 100644 --- a/render/detailed_node.go +++ b/render/detailed_node.go @@ -58,23 +58,46 @@ func (t tables) Less(i, j int) bool { return t[i].Rank > t[j].Rank } // MakeDetailedNode transforms a renderable node to a detailed node. It uses // aggregate metadata, plus the set of origin node IDs, to produce tables. func MakeDetailedNode(r report.Report, n RenderableNode) DetailedNode { + sec := r.Window.Seconds() + rate := func(u *uint64) (float64, bool) { + if u == nil { + return 0.0, false + } + if sec <= 0 { + return 0.0, true + } + return float64(*u) / sec, true + } + shortenByteRate := func(rate float64) (major, minor string) { + switch { + case rate > 1024*1024: + return fmt.Sprintf("%.2f", rate/1024/1024), "MBps" + case rate > 1024: + return fmt.Sprintf("%.1f", rate/1024), "KBps" + default: + return fmt.Sprintf("%.0f", rate), "Bps" + } + } + tables := tables{} { rows := []Row{} if n.EdgeMetadata.MaxConnCountTCP != nil { rows = append(rows, Row{"TCP connections", strconv.FormatUint(*n.EdgeMetadata.MaxConnCountTCP, 10), ""}) } - if n.EdgeMetadata.EgressPacketCount != nil { - rows = append(rows, Row{"Egress packets", strconv.FormatUint(*n.EdgeMetadata.EgressPacketCount, 10), ""}) + if rate, ok := rate(n.EdgeMetadata.EgressPacketCount); ok { + rows = append(rows, Row{"Egress packet rate", fmt.Sprintf("%.0f", rate), "packets/sec"}) } - if n.EdgeMetadata.IngressPacketCount != nil { - rows = append(rows, Row{"Ingress packets", strconv.FormatUint(*n.EdgeMetadata.IngressPacketCount, 10), ""}) + if rate, ok := rate(n.EdgeMetadata.IngressPacketCount); ok { + rows = append(rows, Row{"Ingress packet rate", fmt.Sprintf("%.0f", rate), "packets/sec"}) } - if n.EdgeMetadata.EgressByteCount != nil { - rows = append(rows, Row{"Egress bytes", strconv.FormatUint(*n.EdgeMetadata.EgressByteCount, 10), ""}) // TODO rate + if rate, ok := rate(n.EdgeMetadata.EgressByteCount); ok { + s, unit := shortenByteRate(rate) + rows = append(rows, Row{"Egress byte rate", s, unit}) } - if n.EdgeMetadata.IngressByteCount != nil { - rows = append(rows, Row{"Ingress bytes", strconv.FormatUint(*n.EdgeMetadata.IngressByteCount, 10), ""}) // TODO rate + if rate, ok := rate(n.EdgeMetadata.IngressByteCount); ok { + s, unit := shortenByteRate(rate) + rows = append(rows, Row{"Ingress byte rate", s, unit}) } if len(rows) > 0 { tables = append(tables, Table{"Connections", true, connectionsRank, rows}) diff --git a/render/detailed_node_test.go b/render/detailed_node_test.go index a0b84a85e..7e561efb5 100644 --- a/render/detailed_node_test.go +++ b/render/detailed_node_test.go @@ -66,8 +66,8 @@ func TestMakeDetailedNode(t *testing.T) { Numeric: true, Rank: 100, Rows: []render.Row{ - {"Egress packets", "150", ""}, - {"Egress bytes", "1500", ""}, + {"Egress packet rate", "75", "packets/sec"}, + {"Egress byte rate", "750", "Bps"}, }, }, { diff --git a/report/merge.go b/report/merge.go index 9ccf99518..406abe6ce 100644 --- a/report/merge.go +++ b/report/merge.go @@ -14,6 +14,7 @@ func (r *Report) Merge(other Report) { r.Host.Merge(other.Host) r.Overlay.Merge(other.Overlay) r.Sampling.Merge(other.Sampling) + r.Window += other.Window } // Merge merges another Topology into the receiver. diff --git a/report/report.go b/report/report.go index 152760bc2..990d5bab0 100644 --- a/report/report.go +++ b/report/report.go @@ -3,6 +3,7 @@ package report import ( "fmt" "strings" + "time" ) // Report is the core data type. It's produced by probes, and consumed and @@ -44,6 +45,14 @@ type Report struct { // Sampling data for this report. Sampling Sampling + + // Window is the amount of time that this report purports to represent. + // Windows must be carefully merged. They should only be added when + // reports cover non-overlapping periods of time. By default, we assume + // that's true, and add windows in merge operations. When that's not true, + // such as in the app, we expect the component to overwrite the window + // before serving it to consumers. + Window time.Duration } // MakeReport makes a clean report, ready to Merge() other reports into. @@ -57,6 +66,7 @@ func MakeReport() Report { Host: NewTopology(), Overlay: NewTopology(), Sampling: Sampling{}, + Window: 0, } } diff --git a/test/report_fixture.go b/test/report_fixture.go index a8a7d0f51..fe3d1db67 100644 --- a/test/report_fixture.go +++ b/test/report_fixture.go @@ -1,6 +1,8 @@ package test import ( + "time" + "github.com/weaveworks/scope/probe/docker" "github.com/weaveworks/scope/probe/endpoint" "github.com/weaveworks/scope/probe/process" @@ -246,6 +248,7 @@ var ( Count: 1024, Total: 4096, }, + Window: 2 * time.Second, } ) From 3069ce01e005b50669e8228eaa7704c8b873842a Mon Sep 17 00:00:00 2001 From: Peter Bourgon Date: Tue, 4 Aug 2015 12:15:15 +0200 Subject: [PATCH 8/8] Fix lockup bug on Linux --- probe/main.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/probe/main.go b/probe/main.go index 6b4e4f979..b1e5864d2 100644 --- a/probe/main.go +++ b/probe/main.go @@ -8,6 +8,7 @@ import ( _ "net/http/pprof" "os" "os/signal" + "runtime" "strconv" "strings" "syscall" @@ -127,14 +128,22 @@ func main() { } if *captureEnabled { + var sniffers int for _, iface := range strings.Split(*captureInterfaces, ",") { source, err := sniff.NewSource(iface) if err != nil { log.Printf("warning: %v", err) continue } + defer source.Close() log.Printf("capturing packets on %s", iface) reporters = append(reporters, sniff.New(hostID, localNets, source, *captureOn, *captureOff)) + sniffers++ + } + // Packet capture can block OS threads on Linux, so we need to provide + // sufficient overhead in GOMAXPROCS. + if have, want := runtime.GOMAXPROCS(-1), (sniffers + 1); have < want { + runtime.GOMAXPROCS(want) } }