diff --git a/Makefile b/Makefile index e7eda9e2f..e3b07e4f1 100644 --- a/Makefile +++ b/Makefile @@ -26,7 +26,7 @@ $(APP_EXE): app/*.go render/*.go report/*.go xfer/*.go $(PROBE_EXE): probe/*.go probe/docker/*.go probe/endpoint/*.go probe/host/*.go probe/process/*.go probe/overlay/*.go report/*.go xfer/*.go $(APP_EXE) $(PROBE_EXE): - go get -tags netgo ./$(@D) + go get -d -tags netgo ./$(@D) go build -ldflags "-extldflags \"-static\" -X main.version $(SCOPE_VERSION)" -tags netgo -o $@ ./$(@D) @strings $@ | grep cgo_stub\\\.go >/dev/null || { \ rm $@; \ diff --git a/app/api_topology.go b/app/api_topology.go index c68801438..9953414b2 100644 --- a/app/api_topology.go +++ b/app/api_topology.go @@ -8,6 +8,7 @@ import ( "github.com/gorilla/websocket" "github.com/weaveworks/scope/render" + "github.com/weaveworks/scope/report" ) const ( @@ -27,7 +28,7 @@ type APINode struct { // APIEdge is returned by the /api/topology/*/*/* handlers. type APIEdge struct { - Metadata render.AggregateMetadata `json:"metadata"` + Metadata report.EdgeMetadata `json:"metadata"` } // Full topology. @@ -76,7 +77,7 @@ func handleEdge(rep Reporter, t topologyView, w http.ResponseWriter, r *http.Req localID = vars["local"] remoteID = vars["remote"] rpt = rep.Report() - metadata = t.renderer.AggregateMetadata(rpt, localID, remoteID) + metadata = t.renderer.EdgeMetadata(rpt, localID, remoteID) ) respondWith(w, http.StatusOK, APIEdge{Metadata: metadata}) diff --git a/app/api_topology_test.go b/app/api_topology_test.go index fb219daa5..0cc249faf 100644 --- a/app/api_topology_test.go +++ b/app/api_topology_test.go @@ -12,6 +12,7 @@ import ( "github.com/weaveworks/scope/render" "github.com/weaveworks/scope/render/expected" + "github.com/weaveworks/scope/report" "github.com/weaveworks/scope/test" ) @@ -71,6 +72,7 @@ func TestAPITopologyApplications(t *testing.T) { if err := json.Unmarshal(body, &topo); err != nil { t.Fatal(err) } + if want, have := render.OnlyConnected(expected.RenderedProcesses), fixNodeMetadatas(topo.Nodes); !reflect.DeepEqual(want, have) { t.Error(test.Diff(want, have)) } @@ -93,9 +95,9 @@ func TestAPITopologyApplications(t *testing.T) { if err := json.Unmarshal(body, &edge); err != nil { t.Fatalf("JSON parse error: %s", err) } - if want, have := (render.AggregateMetadata{ - "egress_bytes": 10, - "ingress_bytes": 100, + if want, have := (report.EdgeMetadata{ + EgressPacketCount: newu64(10), + EgressByteCount: newu64(100), }), edge.Metadata; !reflect.DeepEqual(want, have) { t.Error(test.Diff(want, have)) } @@ -112,7 +114,8 @@ func TestAPITopologyHosts(t *testing.T) { if err := json.Unmarshal(body, &topo); err != nil { t.Fatal(err) } - if want, have := (expected.RenderedHosts), fixNodeMetadatas(topo.Nodes); !reflect.DeepEqual(want, have) { + + if want, have := expected.RenderedHosts, fixNodeMetadatas(topo.Nodes); !reflect.DeepEqual(want, have) { t.Error(test.Diff(want, have)) } } @@ -134,11 +137,10 @@ func TestAPITopologyHosts(t *testing.T) { if err := json.Unmarshal(body, &edge); err != nil { t.Fatalf("JSON parse error: %s", err) } - want := render.AggregateMetadata{ - "max_conn_count_tcp": 3, - } - if !reflect.DeepEqual(want, edge.Metadata) { - t.Errorf("Edge metadata error. Want %v, have %v", want, edge) + if want, have := (report.EdgeMetadata{ + MaxConnCountTCP: newu64(3), + }), edge.Metadata; !reflect.DeepEqual(want, have) { + t.Error(test.Diff(want, have)) } } } @@ -176,3 +178,5 @@ func TestAPITopologyWebsocket(t *testing.T) { equals(t, 0, len(d.Update)) equals(t, 0, len(d.Remove)) } + +func newu64(value uint64) *uint64 { return &value } diff --git a/app/report_lifo.go b/app/report_lifo.go index c1c580585..31824ef11 100644 --- a/app/report_lifo.go +++ b/app/report_lifo.go @@ -51,9 +51,14 @@ func NewReportLIFO(r reporter, maxAge time.Duration) *ReportLIFO { case req := <-l.requests: // Request for the current report. report := report.MakeReport() + oldest := time.Now() for _, r := range l.reports { + if r.Timestamp.Before(oldest) { + oldest = r.Timestamp + } report.Merge(r.Report) } + report.Window = time.Now().Sub(oldest) req <- report case q := <-l.quit: diff --git a/circle.yml b/circle.yml index d8a3a8f57..d5241c65a 100644 --- a/circle.yml +++ b/circle.yml @@ -19,7 +19,7 @@ dependencies: cache_directories: - "~/docker" override: - - sudo apt-get --only-upgrade install tar + - sudo apt-get --only-upgrade install tar libpcap0.8-dev - bin/rebuild-ui-build-image - curl https://sdk.cloud.google.com | bash - test -z "$SECRET_PASSWORD" || bin/setup-circleci-secrets "$SECRET_PASSWORD" @@ -40,8 +40,7 @@ test: - cd $SRCDIR; ./bin/lint . - cd $SRCDIR; make client-test - cd $SRCDIR; make static - - cd $SRCDIR; rm -f app/app probe/probe; GOOS=darwin make && make clean - - cd $SRCDIR; rm -f app/app probe/probe; GOOS=linux make + - cd $SRCDIR; rm -f app/scope-app probe/scope-probe; make - cd $SRCDIR; ./bin/test -slow - cd $SRCDIR/experimental; make - test -z "$SECRET_PASSWORD" || (cd $SRCDIR/integration; ./gce.sh setup) diff --git a/experimental/demoprobe/generate.go b/experimental/demoprobe/generate.go index e87ccc8da..c2aa97f0e 100644 --- a/experimental/demoprobe/generate.go +++ b/experimental/demoprobe/generate.go @@ -91,12 +91,10 @@ func DemoReport(nodeCount int) report.Report { edgeKeyIngress = report.MakeEdgeID(dstPortID, srcPortID) ) r.Endpoint.EdgeMetadatas[edgeKeyEgress] = report.EdgeMetadata{ - WithConnCountTCP: true, - MaxConnCountTCP: uint(rand.Intn(100) + 10), + MaxConnCountTCP: newu64(uint64(rand.Intn(100) + 10)), } r.Endpoint.EdgeMetadatas[edgeKeyIngress] = report.EdgeMetadata{ - WithConnCountTCP: true, - MaxConnCountTCP: uint(rand.Intn(100) + 10), + MaxConnCountTCP: newu64(uint64(rand.Intn(100) + 10)), } // Address topology @@ -124,3 +122,5 @@ func DemoReport(nodeCount int) report.Report { return r } + +func newu64(value uint64) *uint64 { return &value } diff --git a/experimental/genreport/generate.go b/experimental/genreport/generate.go index 31791468a..520997b05 100644 --- a/experimental/genreport/generate.go +++ b/experimental/genreport/generate.go @@ -89,12 +89,10 @@ func DemoReport(nodeCount int) report.Report { edgeKeyIngress = report.MakeEdgeID(dstPortID, srcPortID) ) r.Endpoint.EdgeMetadatas[edgeKeyEgress] = report.EdgeMetadata{ - WithConnCountTCP: true, - MaxConnCountTCP: uint(rand.Intn(100) + 10), + MaxConnCountTCP: newu64(uint64(rand.Intn(100) + 10)), } r.Endpoint.EdgeMetadatas[edgeKeyIngress] = report.EdgeMetadata{ - WithConnCountTCP: true, - MaxConnCountTCP: uint(rand.Intn(100) + 10), + MaxConnCountTCP: newu64(uint64(rand.Intn(100) + 10)), } // Address topology @@ -122,3 +120,5 @@ func DemoReport(nodeCount int) report.Report { return r } + +func newu64(value uint64) *uint64 { return &value } diff --git a/probe/docker/container_test.go b/probe/docker/container_linux_test.go similarity index 100% rename from probe/docker/container_test.go rename to probe/docker/container_linux_test.go diff --git a/probe/endpoint/reporter.go b/probe/endpoint/reporter.go index 7e99a6398..8912319f9 100644 --- a/probe/endpoint/reporter.go +++ b/probe/endpoint/reporter.go @@ -79,15 +79,15 @@ func (r *Reporter) addConnection(rpt *report.Report, c *procspy.Connection) { var ( localAddressNodeID = report.MakeAddressNodeID(r.hostID, c.LocalAddress.String()) remoteAddressNodeID = report.MakeAddressNodeID(r.hostID, c.RemoteAddress.String()) - adjacencyID = report.MakeAdjacencyID(localAddressNodeID) + adjecencyID = report.MakeAdjacencyID(localAddressNodeID) edgeID = report.MakeEdgeID(localAddressNodeID, remoteAddressNodeID) ) - rpt.Address.Adjacency[adjacencyID] = rpt.Address.Adjacency[adjacencyID].Add(remoteAddressNodeID) + rpt.Address.Adjacency[adjecencyID] = rpt.Address.Adjacency[adjecencyID].Add(remoteAddressNodeID) if _, ok := rpt.Address.NodeMetadatas[localAddressNodeID]; !ok { rpt.Address.NodeMetadatas[localAddressNodeID] = report.MakeNodeMetadataWith(map[string]string{ - "name": r.hostName, // TODO this is ambiguous, be more specific + "name": r.hostName, Addr: c.LocalAddress.String(), }) } @@ -98,11 +98,11 @@ func (r *Reporter) addConnection(rpt *report.Report, c *procspy.Connection) { var ( localEndpointNodeID = report.MakeEndpointNodeID(r.hostID, c.LocalAddress.String(), strconv.Itoa(int(c.LocalPort))) remoteEndpointNodeID = report.MakeEndpointNodeID(r.hostID, c.RemoteAddress.String(), strconv.Itoa(int(c.RemotePort))) - adjacencyID = report.MakeAdjacencyID(localEndpointNodeID) + adjecencyID = report.MakeAdjacencyID(localEndpointNodeID) edgeID = report.MakeEdgeID(localEndpointNodeID, remoteEndpointNodeID) ) - rpt.Endpoint.Adjacency[adjacencyID] = rpt.Endpoint.Adjacency[adjacencyID].Add(remoteEndpointNodeID) + rpt.Endpoint.Adjacency[adjecencyID] = rpt.Endpoint.Adjacency[adjecencyID].Add(remoteEndpointNodeID) if _, ok := rpt.Endpoint.NodeMetadatas[localEndpointNodeID]; !ok { // First hit establishes NodeMetadata for scoped local address + port @@ -119,9 +119,11 @@ func (r *Reporter) addConnection(rpt *report.Report, c *procspy.Connection) { } } -func countTCPConnection(m report.EdgeMetadatas, edgeKey string) { - edgeMeta := m[edgeKey] - edgeMeta.WithConnCountTCP = true - edgeMeta.MaxConnCountTCP++ - m[edgeKey] = edgeMeta +func countTCPConnection(mds report.EdgeMetadatas, key string) { + md := mds[key] + if md.MaxConnCountTCP == nil { + md.MaxConnCountTCP = new(uint64) + } + *md.MaxConnCountTCP++ + mds[key] = md } diff --git a/probe/host/reporter.go b/probe/host/reporter.go index 6694e96f7..a242e6a6f 100644 --- a/probe/host/reporter.go +++ b/probe/host/reporter.go @@ -1,7 +1,6 @@ package host import ( - "net" "runtime" "strings" "time" @@ -9,7 +8,7 @@ import ( "github.com/weaveworks/scope/report" ) -// Keys for use in NodeMetadata +// Keys for use in NodeMetadata. const ( Timestamp = "ts" HostName = "host_name" @@ -20,30 +19,31 @@ const ( Uptime = "uptime" ) -// Exposed for testing +// Exposed for testing. const ( ProcUptime = "/proc/uptime" ProcLoad = "/proc/loadavg" ) -// Exposed for testing +// Exposed for testing. var ( - InterfaceAddrs = net.InterfaceAddrs - Now = func() string { return time.Now().UTC().Format(time.RFC3339Nano) } + Now = func() string { return time.Now().UTC().Format(time.RFC3339Nano) } ) // Reporter generates Reports containing the host topology. type Reporter struct { - hostID string - hostName string + hostID string + hostName string + localNets report.Networks } // NewReporter returns a Reporter which produces a report containing host // topology for this host. -func NewReporter(hostID, hostName string) *Reporter { +func NewReporter(hostID, hostName string, localNets report.Networks) *Reporter { return &Reporter{ - hostID: hostID, - hostName: hostName, + hostID: hostID, + hostName: hostName, + localNets: localNets, } } @@ -54,15 +54,8 @@ func (r *Reporter) Report() (report.Report, error) { localCIDRs []string ) - localNets, err := InterfaceAddrs() - if err != nil { - return rep, err - } - for _, localNet := range localNets { - // Not all networks are IP networks. - if ipNet, ok := localNet.(*net.IPNet); ok { - localCIDRs = append(localCIDRs, ipNet.String()) - } + for _, localNet := range r.localNets { + localCIDRs = append(localCIDRs, localNet.String()) } uptime, err := GetUptime() diff --git a/probe/host/reporter_test.go b/probe/host/reporter_test.go index f4127a577..a3fa0df09 100644 --- a/probe/host/reporter_test.go +++ b/probe/host/reporter_test.go @@ -12,38 +12,37 @@ import ( "github.com/weaveworks/scope/test" ) -const ( - release = "release" - version = "version" - network = "192.168.0.0/16" - hostID = "hostid" - now = "now" - hostname = "hostname" - load = "0.59 0.36 0.29" - uptime = "278h55m43s" - kernel = "release version" -) - func TestReporter(t *testing.T) { + var ( + release = "release" + version = "version" + network = "192.168.0.0/16" + hostID = "hostid" + now = "now" + hostname = "hostname" + load = "0.59 0.36 0.29" + uptime = "278h55m43s" + kernel = "release version" + _, ipnet, _ = net.ParseCIDR(network) + localNets = report.Networks([]*net.IPNet{ipnet}) + ) + var ( oldGetKernelVersion = host.GetKernelVersion oldGetLoad = host.GetLoad oldGetUptime = host.GetUptime - oldInterfaceAddrs = host.InterfaceAddrs oldNow = host.Now ) defer func() { host.GetKernelVersion = oldGetKernelVersion host.GetLoad = oldGetLoad host.GetUptime = oldGetUptime - host.InterfaceAddrs = oldInterfaceAddrs host.Now = oldNow }() host.GetKernelVersion = func() (string, error) { return release + " " + version, nil } host.GetLoad = func() string { return load } host.GetUptime = func() (time.Duration, error) { return time.ParseDuration(uptime) } host.Now = func() string { return now } - host.InterfaceAddrs = func() ([]net.Addr, error) { _, ipnet, _ := net.ParseCIDR(network); return []net.Addr{ipnet}, nil } want := report.MakeReport() want.Host.NodeMetadatas[report.MakeHostNodeID(hostID)] = report.MakeNodeMetadataWith(map[string]string{ @@ -55,8 +54,7 @@ func TestReporter(t *testing.T) { host.Uptime: uptime, host.KernelVersion: kernel, }) - r := host.NewReporter(hostID, hostname) - have, _ := r.Report() + have, _ := host.NewReporter(hostID, hostname, localNets).Report() if !reflect.DeepEqual(want, have) { t.Errorf("%s", test.Diff(want, have)) } diff --git a/probe/main.go b/probe/main.go index 19c67359a..b1e5864d2 100644 --- a/probe/main.go +++ b/probe/main.go @@ -3,11 +3,14 @@ package main import ( "flag" "log" + "net" "net/http" _ "net/http/pprof" "os" "os/signal" + "runtime" "strconv" + "strings" "syscall" "time" @@ -17,6 +20,7 @@ import ( "github.com/weaveworks/scope/probe/host" "github.com/weaveworks/scope/probe/overlay" "github.com/weaveworks/scope/probe/process" + "github.com/weaveworks/scope/probe/sniff" "github.com/weaveworks/scope/report" "github.com/weaveworks/scope/xfer" ) @@ -36,8 +40,13 @@ func main() { dockerBridge = flag.String("docker.bridge", "docker0", "the docker bridge name") weaveRouterAddr = flag.String("weave.router.addr", "", "IP address or FQDN of the Weave router") procRoot = flag.String("proc.root", "/proc", "location of the proc filesystem") + captureEnabled = flag.Bool("capture", false, "perform sampled packet capture") + captureInterfaces = flag.String("capture.interfaces", interfaces(), "packet capture on these interfaces") + captureOn = flag.Duration("capture.on", 1*time.Second, "packet capture duty cycle 'on'") + captureOff = flag.Duration("capture.off", 5*time.Second, "packet capture duty cycle 'off'") ) flag.Parse() + log.SetFlags(log.Lmicroseconds) if len(flag.Args()) != 0 { flag.Usage() @@ -71,11 +80,23 @@ func main() { } defer publisher.Close() + addrs, err := net.InterfaceAddrs() + if err != nil { + log.Fatal(err) + } + localNets := report.Networks{} + for _, addr := range addrs { + // Not all addrs are IPNets. + if ipNet, ok := addr.(*net.IPNet); ok { + localNets = append(localNets, ipNet) + } + } + var ( hostName = hostname() hostID = hostName // TODO: we should sanitize the hostname taggers = []Tagger{newTopologyTagger(), host.NewTagger(hostID)} - reporters = []Reporter{host.NewReporter(hostID, hostName), endpoint.NewReporter(hostID, hostName, *spyProcs)} + reporters = []Reporter{host.NewReporter(hostID, hostName, localNets), endpoint.NewReporter(hostID, hostName, *spyProcs)} processCache *process.CachingWalker ) @@ -106,6 +127,26 @@ func main() { reporters = append(reporters, weave) } + if *captureEnabled { + var sniffers int + for _, iface := range strings.Split(*captureInterfaces, ",") { + source, err := sniff.NewSource(iface) + if err != nil { + log.Printf("warning: %v", err) + continue + } + defer source.Close() + log.Printf("capturing packets on %s", iface) + reporters = append(reporters, sniff.New(hostID, localNets, source, *captureOn, *captureOff)) + sniffers++ + } + // Packet capture can block OS threads on Linux, so we need to provide + // sufficient overhead in GOMAXPROCS. + if have, want := runtime.GOMAXPROCS(-1), (sniffers + 1); have < want { + runtime.GOMAXPROCS(want) + } + } + log.Printf("listening on %s", *listen) quit := make(chan struct{}) @@ -121,6 +162,7 @@ func main() { select { case <-pubTick: publishTicks.WithLabelValues().Add(1) + r.Window = *publishInterval publisher.Publish(r) r = report.MakeReport() @@ -128,7 +170,6 @@ func main() { if err := processCache.Update(); err != nil { log.Printf("error reading processes: %v", err) } - for _, reporter := range reporters { newReport, err := reporter.Report() if err != nil { @@ -136,7 +177,6 @@ func main() { } r.Merge(newReport) } - r = Apply(r, taggers) case <-quit: @@ -153,3 +193,16 @@ func interrupt() chan os.Signal { signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) return c } + +func interfaces() string { + ifaces, err := net.Interfaces() + if err != nil { + log.Print(err) + return "" + } + a := make([]string, 0, len(ifaces)) + for _, iface := range ifaces { + a = append(a, iface.Name) + } + return strings.Join(a, ",") +} diff --git a/probe/process/walker_test.go b/probe/process/walker_test.go index 22af5ea62..37d445891 100644 --- a/probe/process/walker_test.go +++ b/probe/process/walker_test.go @@ -9,7 +9,6 @@ import ( ) func TestBasicWalk(t *testing.T) { - // Don't panic or error. var ( procRoot = "/proc" procFunc = func(process.Process) {} diff --git a/probe/sniff/sniffer.go b/probe/sniff/sniffer.go new file mode 100644 index 000000000..70209f7e8 --- /dev/null +++ b/probe/sniff/sniffer.go @@ -0,0 +1,330 @@ +package sniff + +import ( + "io" + "log" + "net" + "strconv" + "sync/atomic" + "time" + + "github.com/weaveworks/scope/report" + + "github.com/google/gopacket" + "github.com/google/gopacket/layers" +) + +// Sniffer is a packet-sniffing reporter. +type Sniffer struct { + hostID string + localNets report.Networks + reports chan chan report.Report + parser *gopacket.DecodingLayerParser + decoded []gopacket.LayerType + eth layers.Ethernet + ip4 layers.IPv4 + ip6 layers.IPv6 + tcp layers.TCP + udp layers.UDP + icmp4 layers.ICMPv4 + icmp6 layers.ICMPv6 +} + +// New returns a new sniffing reporter that samples traffic by turning its +// packet capture facilities on and off. Note that the on and off durations +// represent a way to bound CPU burn. Effective sample rate needs to be +// calculated as (packets decoded / packets observed). +func New(hostID string, localNets report.Networks, src gopacket.ZeroCopyPacketDataSource, on, off time.Duration) *Sniffer { + s := &Sniffer{ + hostID: hostID, + localNets: localNets, + reports: make(chan chan report.Report), + } + s.parser = gopacket.NewDecodingLayerParser( + layers.LayerTypeEthernet, + &s.eth, &s.ip4, &s.ip6, &s.tcp, &s.udp, &s.icmp4, &s.icmp6, + ) + go s.loop(src, on, off) + return s +} + +// Report implements the Reporter interface. +func (s *Sniffer) Report() (report.Report, error) { + c := make(chan report.Report) + s.reports <- c + return <-c, nil +} + +func (s *Sniffer) loop(src gopacket.ZeroCopyPacketDataSource, on, off time.Duration) { + var ( + process = uint64(1) // initially enabled + total = uint64(0) // total packets seen + count = uint64(0) // count of packets captured + packets = make(chan Packet, 1024) // decoded packets + rpt = report.MakeReport() // the report we build + turnOn = (<-chan time.Time)(nil) // signal to start capture (initially enabled) + turnOff = time.After(on) // signal to stop capture + done = make(chan struct{}) // when src is finished, we're done too + ) + + // As a special case, if our off duty cycle is zero, i.e. 100% sample + // rate, we simply disable the turn-off signal channel. + if off == 0 { + turnOff = nil + } + + go func() { + s.read(src, packets, &process, &total, &count) + close(done) + }() + + for { + select { + case p := <-packets: + s.Merge(p, rpt) + + case <-turnOn: + atomic.StoreUint64(&process, 1) // enable packet capture + turnOn = nil // disable the on switch + turnOff = time.After(on) // enable the off switch + + case <-turnOff: + atomic.StoreUint64(&process, 0) // disable packet capture + turnOn = time.After(off) // enable the on switch + turnOff = nil // disable the off switch + + case c := <-s.reports: + rpt.Sampling.Count = atomic.LoadUint64(&count) + rpt.Sampling.Total = atomic.LoadUint64(&total) + interpolateCounts(rpt) + c <- rpt + atomic.StoreUint64(&count, 0) + atomic.StoreUint64(&total, 0) + rpt = report.MakeReport() + + case <-done: + return + } + } +} + +// interpolateCounts compensates for sampling by artifically inflating counts +// throughout the report. It should be run once for each report, within the +// probe, before it gets emitted into the rest of the system. +func interpolateCounts(r report.Report) { + rate := r.Sampling.Rate() + if rate >= 1.0 { + return + } + factor := 1.0 / rate + for _, topology := range r.Topologies() { + for _, emd := range topology.EdgeMetadatas { + if emd.EgressPacketCount != nil { + *emd.EgressPacketCount = uint64(float64(*emd.EgressPacketCount) * factor) + } + if emd.IngressPacketCount != nil { + *emd.IngressPacketCount = uint64(float64(*emd.IngressPacketCount) * factor) + } + if emd.EgressByteCount != nil { + *emd.EgressByteCount = uint64(float64(*emd.EgressByteCount) * factor) + } + if emd.IngressByteCount != nil { + *emd.IngressByteCount = uint64(float64(*emd.IngressByteCount) * factor) + } + } + } +} + +// Packet is an intermediate, decoded form of a packet, with the information +// that the Scope data model cares about. Designed to decouple the packet data +// source loop, which should be as fast as possible, and the process of +// merging the packet information to a report, which may take some time and +// allocations. +type Packet struct { + SrcIP, DstIP string + SrcPort, DstPort string + Network, Transport int // byte counts +} + +func (s *Sniffer) read(src gopacket.ZeroCopyPacketDataSource, dst chan Packet, process, total, count *uint64) { + var ( + data []byte + err error + ) + for { + data, _, err = src.ZeroCopyReadPacketData() + if err == io.EOF { + return // done + } + if err != nil { + log.Printf("sniffer: read: %v", err) + continue + } + atomic.AddUint64(total, 1) + if atomic.LoadUint64(process) == 0 { + continue + } + + if err := s.parser.DecodeLayers(data, &s.decoded); err != nil { + // We'll always get an error when we encounter a layer type for + // which we haven't configured a decoder. + } + var p Packet + for _, t := range s.decoded { + switch t { + case layers.LayerTypeEthernet: + // + + case layers.LayerTypeICMPv4: + p.Network += len(s.icmp4.Payload) + + case layers.LayerTypeICMPv6: + p.Network += len(s.icmp6.Payload) + + case layers.LayerTypeIPv4: + p.SrcIP = s.ip4.SrcIP.String() + p.DstIP = s.ip4.DstIP.String() + p.Network += len(s.ip4.Payload) + + case layers.LayerTypeIPv6: + p.SrcIP = s.ip6.SrcIP.String() + p.DstIP = s.ip6.DstIP.String() + p.Network += len(s.ip6.Payload) + + case layers.LayerTypeTCP: + p.SrcPort = strconv.Itoa(int(s.tcp.SrcPort)) + p.DstPort = strconv.Itoa(int(s.tcp.DstPort)) + p.Transport += len(s.tcp.Payload) + + case layers.LayerTypeUDP: + p.SrcPort = strconv.Itoa(int(s.udp.SrcPort)) + p.DstPort = strconv.Itoa(int(s.udp.DstPort)) + p.Transport += len(s.udp.Payload) + } + } + select { + case dst <- p: + atomic.AddUint64(count, 1) + default: + log.Printf("sniffer dropped packet") + } + } +} + +// Merge puts the packet into the report. +// +// Note that, for the moment, we encode bidirectional traffic as ingress and +// egress traffic on a single edge whose src is local and dst is remote. That +// is, if we see a packet from the remote addr 9.8.7.6 to the local addr +// 1.2.3.4, we apply it as *ingress* on the edge (1.2.3.4 -> 9.8.7.6). +func (s *Sniffer) Merge(p Packet, rpt report.Report) { + if p.SrcIP == "" || p.DstIP == "" { + return + } + + // One end of the traffic has to be local. Otherwise, we don't know how to + // construct the edge. + // + // If we need to get around this limitation, we may be able to change the + // semantics of the report, and allow the src side of edges to be from + // anywhere. But that will have ramifications throughout Scope (read: it + // may violate implicit invariants) and needs to be thought through. + var ( + srcLocal = s.localNets.Contains(net.ParseIP(p.SrcIP)) + dstLocal = s.localNets.Contains(net.ParseIP(p.DstIP)) + localIP string + remoteIP string + localPort string + remotePort string + egress bool + ) + switch { + case srcLocal && !dstLocal: + localIP, localPort, remoteIP, remotePort, egress = p.SrcIP, p.SrcPort, p.DstIP, p.DstPort, true + case !srcLocal && dstLocal: + localIP, localPort, remoteIP, remotePort, egress = p.DstIP, p.DstPort, p.SrcIP, p.SrcPort, false + case srcLocal && dstLocal: + localIP, localPort, remoteIP, remotePort, egress = p.SrcIP, p.SrcPort, p.DstIP, p.DstPort, true // loopback + case !srcLocal && !dstLocal: + log.Printf("sniffer ignoring remote-to-remote (%s -> %s) traffic", p.SrcIP, p.DstIP) + return + } + + // For sure, we can add to the address topology. + { + var ( + srcNodeID = report.MakeAddressNodeID(s.hostID, localIP) + dstNodeID = report.MakeAddressNodeID(s.hostID, remoteIP) + edgeID = report.MakeEdgeID(srcNodeID, dstNodeID) + srcAdjacencyID = report.MakeAdjacencyID(srcNodeID) + ) + + if _, ok := rpt.Address.NodeMetadatas[srcNodeID]; !ok { + rpt.Address.NodeMetadatas[srcNodeID] = report.MakeNodeMetadata() + } + + emd := rpt.Address.EdgeMetadatas[edgeID] + + if egress { + if emd.EgressPacketCount == nil { + emd.EgressPacketCount = new(uint64) + } + *emd.EgressPacketCount++ + if emd.EgressByteCount == nil { + emd.EgressByteCount = new(uint64) + } + *emd.EgressByteCount += uint64(p.Network) + } else { + if emd.IngressPacketCount == nil { + emd.IngressPacketCount = new(uint64) + } + *emd.IngressPacketCount++ + if emd.IngressByteCount == nil { + emd.IngressByteCount = new(uint64) + } + *emd.IngressByteCount += uint64(p.Network) + } + + rpt.Address.EdgeMetadatas[edgeID] = emd + rpt.Address.Adjacency[srcAdjacencyID] = rpt.Address.Adjacency[srcAdjacencyID].Add(dstNodeID) + } + + // If we have ports, we can add to the endpoint topology, too. + if p.SrcPort != "" && p.DstPort != "" { + var ( + srcNodeID = report.MakeEndpointNodeID(s.hostID, localIP, localPort) + dstNodeID = report.MakeEndpointNodeID(s.hostID, remoteIP, remotePort) + edgeID = report.MakeEdgeID(srcNodeID, dstNodeID) + srcAdjacencyID = report.MakeAdjacencyID(srcNodeID) + ) + + if _, ok := rpt.Endpoint.NodeMetadatas[srcNodeID]; !ok { + rpt.Endpoint.NodeMetadatas[srcNodeID] = report.MakeNodeMetadata() + } + + emd := rpt.Endpoint.EdgeMetadatas[edgeID] + + if egress { + if emd.EgressPacketCount == nil { + emd.EgressPacketCount = new(uint64) + } + *emd.EgressPacketCount++ + if emd.EgressByteCount == nil { + emd.EgressByteCount = new(uint64) + } + *emd.EgressByteCount += uint64(p.Transport) + } else { + if emd.IngressPacketCount == nil { + emd.IngressPacketCount = new(uint64) + } + *emd.IngressPacketCount++ + if emd.IngressByteCount == nil { + emd.IngressByteCount = new(uint64) + } + *emd.IngressByteCount += uint64(p.Transport) + } + + rpt.Endpoint.EdgeMetadatas[edgeID] = emd + rpt.Endpoint.Adjacency[srcAdjacencyID] = rpt.Endpoint.Adjacency[srcAdjacencyID].Add(dstNodeID) + } +} diff --git a/probe/sniff/sniffer_internal_test.go b/probe/sniff/sniffer_internal_test.go new file mode 100644 index 000000000..72f78bf44 --- /dev/null +++ b/probe/sniff/sniffer_internal_test.go @@ -0,0 +1,53 @@ +package sniff + +import ( + "testing" + + "github.com/weaveworks/scope/report" +) + +func TestInterpolateCounts(t *testing.T) { + var ( + hostID = "macbook-air" + srcNodeID = report.MakeEndpointNodeID(hostID, "1.2.3.4", "5678") + dstNodeID = report.MakeEndpointNodeID(hostID, "5.6.7.8", "9012") + edgeID = report.MakeEdgeID(srcNodeID, dstNodeID) + samplingCount = uint64(200) + samplingTotal = uint64(2345) + packetCount = uint64(123) + byteCount = uint64(4096) + ) + + r := report.MakeReport() + r.Sampling.Count = samplingCount + r.Sampling.Total = samplingTotal + r.Endpoint.EdgeMetadatas[edgeID] = report.EdgeMetadata{ + EgressPacketCount: newu64(packetCount), + IngressPacketCount: newu64(packetCount), + EgressByteCount: newu64(byteCount), + IngressByteCount: newu64(byteCount), + } + + interpolateCounts(r) + + var ( + rate = float64(samplingCount) / float64(samplingTotal) + factor = 1.0 / rate + apply = func(v uint64) uint64 { return uint64(factor * float64(v)) } + emd = r.Endpoint.EdgeMetadatas[edgeID] + ) + if want, have := apply(packetCount), (*emd.EgressPacketCount); want != have { + t.Errorf("want %d packets, have %d", want, have) + } + if want, have := apply(packetCount), (*emd.IngressPacketCount); want != have { + t.Errorf("want %d packets, have %d", want, have) + } + if want, have := apply(byteCount), (*emd.EgressByteCount); want != have { + t.Errorf("want %d bytes, have %d", want, have) + } + if want, have := apply(byteCount), (*emd.IngressByteCount); want != have { + t.Errorf("want %d bytes, have %d", want, have) + } +} + +func newu64(value uint64) *uint64 { return &value } diff --git a/probe/sniff/sniffer_test.go b/probe/sniff/sniffer_test.go new file mode 100644 index 000000000..d939ee5ee --- /dev/null +++ b/probe/sniff/sniffer_test.go @@ -0,0 +1,139 @@ +package sniff_test + +import ( + "io" + "net" + "reflect" + "sync" + "testing" + "time" + + "github.com/google/gopacket" + + "github.com/weaveworks/scope/probe/sniff" + "github.com/weaveworks/scope/report" + "github.com/weaveworks/scope/test" +) + +func TestSnifferShutdown(t *testing.T) { + var ( + hostID = "abcd" + src = newMockSource([]byte{}, nil) + on = time.Millisecond + off = time.Millisecond + s = sniff.New(hostID, report.Networks{}, src, on, off) + ) + + // Stopping the source should terminate the sniffer. + src.Close() + time.Sleep(10 * time.Millisecond) + + // Try to get a report from the sniffer. It should block forever, as the + // loop goroutine should have exited. + report := make(chan struct{}) + go func() { _, _ = s.Report(); close(report) }() + select { + case <-time.After(time.Millisecond): + case <-report: + t.Errorf("shouldn't get report after Close") + } +} + +func TestMerge(t *testing.T) { + var ( + hostID = "xyz" + src = newMockSource([]byte{}, nil) + on = time.Millisecond + off = time.Millisecond + rpt = report.MakeReport() + p = sniff.Packet{ + SrcIP: "1.0.0.0", + SrcPort: "1000", + DstIP: "2.0.0.0", + DstPort: "2000", + Network: 512, + Transport: 256, + } + + _, ipnet, _ = net.ParseCIDR(p.SrcIP + "/24") // ;) + localNets = report.Networks([]*net.IPNet{ipnet}) + ) + sniff.New(hostID, localNets, src, on, off).Merge(p, rpt) + + var ( + srcEndpointNodeID = report.MakeEndpointNodeID(hostID, p.SrcIP, p.SrcPort) + dstEndpointNodeID = report.MakeEndpointNodeID(hostID, p.DstIP, p.DstPort) + ) + if want, have := (report.Topology{ + Adjacency: report.Adjacency{ + report.MakeAdjacencyID(srcEndpointNodeID): report.MakeIDList( + dstEndpointNodeID, + ), + }, + EdgeMetadatas: report.EdgeMetadatas{ + report.MakeEdgeID(srcEndpointNodeID, dstEndpointNodeID): report.EdgeMetadata{ + EgressPacketCount: newu64(1), + EgressByteCount: newu64(256), + }, + }, + NodeMetadatas: report.NodeMetadatas{ + srcEndpointNodeID: report.MakeNodeMetadata(), + }, + }), rpt.Endpoint; !reflect.DeepEqual(want, have) { + t.Errorf("%s", test.Diff(want, have)) + } + + var ( + srcAddressNodeID = report.MakeAddressNodeID(hostID, p.SrcIP) + dstAddressNodeID = report.MakeAddressNodeID(hostID, p.DstIP) + ) + if want, have := (report.Topology{ + Adjacency: report.Adjacency{ + report.MakeAdjacencyID(srcAddressNodeID): report.MakeIDList( + dstAddressNodeID, + ), + }, + EdgeMetadatas: report.EdgeMetadatas{ + report.MakeEdgeID(srcAddressNodeID, dstAddressNodeID): report.EdgeMetadata{ + EgressPacketCount: newu64(1), + EgressByteCount: newu64(512), + }, + }, + NodeMetadatas: report.NodeMetadatas{ + srcAddressNodeID: report.MakeNodeMetadata(), + }, + }), rpt.Address; !reflect.DeepEqual(want, have) { + t.Errorf("%s", test.Diff(want, have)) + } +} + +type mockSource struct { + mtx sync.RWMutex + data []byte + err error +} + +func newMockSource(data []byte, err error) *mockSource { + return &mockSource{ + data: data, + err: err, + } +} + +func (s *mockSource) ZeroCopyReadPacketData() ([]byte, gopacket.CaptureInfo, error) { + s.mtx.RLock() + defer s.mtx.RUnlock() + return s.data, gopacket.CaptureInfo{ + Timestamp: time.Now(), + CaptureLength: len(s.data), + Length: len(s.data), + }, s.err +} + +func (s *mockSource) Close() { + s.mtx.Lock() + defer s.mtx.Unlock() + s.err = io.EOF +} + +func newu64(value uint64) *uint64 { return &value } diff --git a/probe/sniff/source.go b/probe/sniff/source.go new file mode 100644 index 000000000..0ac4a4ff6 --- /dev/null +++ b/probe/sniff/source.go @@ -0,0 +1,24 @@ +package sniff + +import ( + "github.com/google/gopacket" + "github.com/google/gopacket/pcap" +) + +// Source describes a packet data source that can be terminated. +type Source interface { + gopacket.ZeroCopyPacketDataSource + Close() +} + +const ( + snaplen = 65535 + promisc = true + timeout = pcap.BlockForever +) + +// NewSource returns a live packet data source via the passed device +// (interface). +func NewSource(device string) (Source, error) { + return pcap.OpenLive(device, snaplen, promisc, timeout) +} diff --git a/probe/sniff/testclient/main.go b/probe/sniff/testclient/main.go new file mode 100644 index 000000000..8b568ad4f --- /dev/null +++ b/probe/sniff/testclient/main.go @@ -0,0 +1,87 @@ +package main + +import ( + "flag" + "io" + "log" + "time" + + "github.com/google/gopacket" + "github.com/google/gopacket/layers" + "github.com/google/gopacket/pcap" +) + +func main() { + var ( + device = flag.String("device", "eth0", "device to sniff") + ) + flag.Parse() + + const ( + snaplen = 1024 * 1024 + promisc = true + timeout = pcap.BlockForever + ) + handle, err := pcap.OpenLive(*device, snaplen, promisc, timeout) + if err != nil { + log.Fatal(err) + } + + go func() { + time.Sleep(5 * time.Second) + handle.Close() + }() + + var ( + eth layers.Ethernet + ip4 layers.IPv4 + ip6 layers.IPv6 + tcp layers.TCP + udp layers.UDP + icmp4 layers.ICMPv4 + icmp6 layers.ICMPv6 + ) + parser := gopacket.NewDecodingLayerParser( + layers.LayerTypeEthernet, + ð, &ip4, &ip6, &tcp, &udp, &icmp4, &icmp6, + ) + decoded := []gopacket.LayerType{} + + done := make(chan struct{}) + go func() { + defer close(done) + for { + data, ci, err := handle.ZeroCopyReadPacketData() + if err == io.EOF { + log.Print("read: EOF") + return + } + if err != nil { + log.Printf("read: %v", err) + continue + } + log.Println(ci.Timestamp.String()) + err = parser.DecodeLayers(data, &decoded) + for _, t := range decoded { + switch t { + case layers.LayerTypeEthernet: + log.Println(" Ethernet", eth.EthernetType, eth.SrcMAC, eth.DstMAC, eth.Length) + case layers.LayerTypeIPv6: + log.Println(" IP6", ip6.Version, ip6.SrcIP, ip6.DstIP, ip6.Length, ip6.TrafficClass) + case layers.LayerTypeIPv4: + log.Println(" IP4", ip4.Version, ip4.SrcIP, ip4.DstIP, ip4.Length, ip4.TTL, ip4.TOS) + case layers.LayerTypeTCP: + log.Println(" TCP", tcp.SrcPort, tcp.DstPort, tcp.Seq, tcp.Ack, tcp.Window) + case layers.LayerTypeUDP: + log.Println(" UDP", udp.SrcPort, udp.DstPort, udp.Length) + case layers.LayerTypeICMPv4: + log.Println(" ICMP4", icmp4.Id, icmp4.Seq) + case layers.LayerTypeICMPv6: + log.Println(" ICMP6") + } + } + log.Println() + } + }() + <-done +} diff --git a/render/detailed_node.go b/render/detailed_node.go index fbbd79f7d..b739eb104 100644 --- a/render/detailed_node.go +++ b/render/detailed_node.go @@ -58,17 +58,46 @@ func (t tables) Less(i, j int) bool { return t[i].Rank > t[j].Rank } // MakeDetailedNode transforms a renderable node to a detailed node. It uses // aggregate metadata, plus the set of origin node IDs, to produce tables. func MakeDetailedNode(r report.Report, n RenderableNode) DetailedNode { + sec := r.Window.Seconds() + rate := func(u *uint64) (float64, bool) { + if u == nil { + return 0.0, false + } + if sec <= 0 { + return 0.0, true + } + return float64(*u) / sec, true + } + shortenByteRate := func(rate float64) (major, minor string) { + switch { + case rate > 1024*1024: + return fmt.Sprintf("%.2f", rate/1024/1024), "MBps" + case rate > 1024: + return fmt.Sprintf("%.1f", rate/1024), "KBps" + default: + return fmt.Sprintf("%.0f", rate), "Bps" + } + } + tables := tables{} { rows := []Row{} - if val, ok := n.AggregateMetadata[KeyMaxConnCountTCP]; ok { - rows = append(rows, Row{"TCP connections", strconv.FormatInt(int64(val), 10), ""}) + if n.EdgeMetadata.MaxConnCountTCP != nil { + rows = append(rows, Row{"TCP connections", strconv.FormatUint(*n.EdgeMetadata.MaxConnCountTCP, 10), ""}) } - if val, ok := n.AggregateMetadata[KeyBytesIngress]; ok { - rows = append(rows, Row{"Bytes ingress", strconv.FormatInt(int64(val), 10), ""}) + if rate, ok := rate(n.EdgeMetadata.EgressPacketCount); ok { + rows = append(rows, Row{"Egress packet rate", fmt.Sprintf("%.0f", rate), "packets/sec"}) } - if val, ok := n.AggregateMetadata[KeyBytesEgress]; ok { - rows = append(rows, Row{"Bytes egress", strconv.FormatInt(int64(val), 10), ""}) + if rate, ok := rate(n.EdgeMetadata.IngressPacketCount); ok { + rows = append(rows, Row{"Ingress packet rate", fmt.Sprintf("%.0f", rate), "packets/sec"}) + } + if rate, ok := rate(n.EdgeMetadata.EgressByteCount); ok { + s, unit := shortenByteRate(rate) + rows = append(rows, Row{"Egress byte rate", s, unit}) + } + if rate, ok := rate(n.EdgeMetadata.IngressByteCount); ok { + s, unit := shortenByteRate(rate) + rows = append(rows, Row{"Ingress byte rate", s, unit}) } if len(rows) > 0 { tables = append(tables, Table{"Connections", true, connectionsRank, rows}) diff --git a/render/detailed_node_test.go b/render/detailed_node_test.go index bc754b4e1..7e561efb5 100644 --- a/render/detailed_node_test.go +++ b/render/detailed_node_test.go @@ -66,8 +66,8 @@ func TestMakeDetailedNode(t *testing.T) { Numeric: true, Rank: 100, Rows: []render.Row{ - {"Bytes ingress", "150", ""}, - {"Bytes egress", "1500", ""}, + {"Egress packet rate", "75", "packets/sec"}, + {"Egress byte rate", "750", "Bps"}, }, }, { diff --git a/render/expected/expected.go b/render/expected/expected.go index 577adb04f..8928bbdc0 100644 --- a/render/expected/expected.go +++ b/render/expected/expected.go @@ -14,25 +14,25 @@ var ( unknownPseudoNode1ID = render.MakePseudoNodeID("10.10.10.10", test.ServerIP, "80") unknownPseudoNode2ID = render.MakePseudoNodeID("10.10.10.11", test.ServerIP, "80") unknownPseudoNode1 = render.RenderableNode{ - ID: unknownPseudoNode1ID, - LabelMajor: "10.10.10.10", - Pseudo: true, - NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{}, + ID: unknownPseudoNode1ID, + LabelMajor: "10.10.10.10", + Pseudo: true, + NodeMetadata: report.MakeNodeMetadata(), + EdgeMetadata: report.EdgeMetadata{}, } unknownPseudoNode2 = render.RenderableNode{ - ID: unknownPseudoNode2ID, - LabelMajor: "10.10.10.11", - Pseudo: true, - NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{}, + ID: unknownPseudoNode2ID, + LabelMajor: "10.10.10.11", + Pseudo: true, + NodeMetadata: report.MakeNodeMetadata(), + EdgeMetadata: report.EdgeMetadata{}, } theInternetNode = render.RenderableNode{ - ID: render.TheInternetID, - LabelMajor: render.TheInternetMajor, - Pseudo: true, - NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{}, + ID: render.TheInternetID, + LabelMajor: render.TheInternetMajor, + Pseudo: true, + NodeMetadata: report.MakeNodeMetadata(), + EdgeMetadata: report.EdgeMetadata{}, } ClientProcess1ID = render.MakeProcessID(test.ClientHostID, test.Client1PID) @@ -54,9 +54,9 @@ var ( test.ClientHostNodeID, ), NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{ - render.KeyBytesIngress: 100, - render.KeyBytesEgress: 10, + EdgeMetadata: report.EdgeMetadata{ + EgressPacketCount: newu64(10), + EgressByteCount: newu64(100), }, }, ClientProcess2ID: { @@ -72,9 +72,9 @@ var ( test.ClientHostNodeID, ), NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{ - render.KeyBytesIngress: 200, - render.KeyBytesEgress: 20, + EdgeMetadata: report.EdgeMetadata{ + EgressPacketCount: newu64(20), + EgressByteCount: newu64(200), }, }, ServerProcessID: { @@ -96,9 +96,9 @@ var ( test.ServerHostNodeID, ), NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{ - render.KeyBytesIngress: 150, - render.KeyBytesEgress: 1500, + EdgeMetadata: report.EdgeMetadata{ + EgressPacketCount: newu64(150), + EgressByteCount: newu64(1500), }, }, nonContainerProcessID: { @@ -112,8 +112,8 @@ var ( test.NonContainerProcessNodeID, test.ServerHostNodeID, ), - NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{}, + NodeMetadata: report.MakeNodeMetadata(), + EdgeMetadata: report.EdgeMetadata{}, }, unknownPseudoNode1ID: unknownPseudoNode1, unknownPseudoNode2ID: unknownPseudoNode2, @@ -136,9 +136,9 @@ var ( test.ClientHostNodeID, ), NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{ - render.KeyBytesIngress: 300, - render.KeyBytesEgress: 30, + EdgeMetadata: report.EdgeMetadata{ + EgressPacketCount: newu64(30), + EgressByteCount: newu64(300), }, }, "apache": { @@ -159,9 +159,9 @@ var ( test.ServerHostNodeID, ), NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{ - render.KeyBytesIngress: 150, - render.KeyBytesEgress: 1500, + EdgeMetadata: report.EdgeMetadata{ + EgressPacketCount: newu64(150), + EgressByteCount: newu64(1500), }, }, "bash": { @@ -174,8 +174,8 @@ var ( test.NonContainerProcessNodeID, test.ServerHostNodeID, ), - NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{}, + NodeMetadata: report.MakeNodeMetadata(), + EdgeMetadata: report.EdgeMetadata{}, }, unknownPseudoNode1ID: unknownPseudoNode1, unknownPseudoNode2ID: unknownPseudoNode2, @@ -199,9 +199,9 @@ var ( test.ClientHostNodeID, ), NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{ - render.KeyBytesIngress: 300, - render.KeyBytesEgress: 30, + EdgeMetadata: report.EdgeMetadata{ + EgressPacketCount: newu64(30), + EgressByteCount: newu64(300), }, }, test.ServerContainerID: { @@ -218,9 +218,9 @@ var ( test.ServerHostNodeID, ), NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{ - render.KeyBytesIngress: 150, - render.KeyBytesEgress: 1500, + EdgeMetadata: report.EdgeMetadata{ + EgressPacketCount: newu64(150), + EgressByteCount: newu64(1500), }, }, uncontainedServerID: { @@ -233,8 +233,8 @@ var ( test.NonContainerProcessNodeID, test.ServerHostNodeID, ), - NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{}, + NodeMetadata: report.MakeNodeMetadata(), + EdgeMetadata: report.EdgeMetadata{}, }, render.TheInternetID: theInternetNode, } @@ -257,9 +257,9 @@ var ( test.ClientHostNodeID, ), NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{ - render.KeyBytesIngress: 300, - render.KeyBytesEgress: 30, + EdgeMetadata: report.EdgeMetadata{ + EgressPacketCount: newu64(30), + EgressByteCount: newu64(300), }, }, test.ServerContainerImageName: { @@ -276,9 +276,9 @@ var ( test.ServerProcessNodeID, test.ServerHostNodeID), NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{ - render.KeyBytesIngress: 150, - render.KeyBytesEgress: 1500, + EdgeMetadata: report.EdgeMetadata{ + EgressPacketCount: newu64(150), + EgressByteCount: newu64(1500), }, }, uncontainedServerID: { @@ -291,8 +291,8 @@ var ( test.NonContainerProcessNodeID, test.ServerHostNodeID, ), - NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{}, + NodeMetadata: report.MakeNodeMetadata(), + EdgeMetadata: report.EdgeMetadata{}, }, render.TheInternetID: theInternetNode, } @@ -315,8 +315,8 @@ var ( test.ServerAddressNodeID, ), NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{ - render.KeyMaxConnCountTCP: 3, + EdgeMetadata: report.EdgeMetadata{ + MaxConnCountTCP: newu64(3), }, }, ClientHostRenderedID: { @@ -331,24 +331,26 @@ var ( test.ClientAddressNodeID, ), NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{ - render.KeyMaxConnCountTCP: 3, + EdgeMetadata: report.EdgeMetadata{ + MaxConnCountTCP: newu64(3), }, }, pseudoHostID1: { - ID: pseudoHostID1, - LabelMajor: "10.10.10.10", - Pseudo: true, - NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{}, + ID: pseudoHostID1, + LabelMajor: "10.10.10.10", + Pseudo: true, + NodeMetadata: report.MakeNodeMetadata(), + EdgeMetadata: report.EdgeMetadata{}, }, pseudoHostID2: { - ID: pseudoHostID2, - LabelMajor: "10.10.10.11", - Pseudo: true, - NodeMetadata: report.MakeNodeMetadata(), - AggregateMetadata: render.AggregateMetadata{}, + ID: pseudoHostID2, + LabelMajor: "10.10.10.11", + Pseudo: true, + NodeMetadata: report.MakeNodeMetadata(), + EdgeMetadata: report.EdgeMetadata{}, }, render.TheInternetID: theInternetNode, } ) + +func newu64(value uint64) *uint64 { return &value } diff --git a/render/metadata.go b/render/metadata.go deleted file mode 100644 index 577b48b19..000000000 --- a/render/metadata.go +++ /dev/null @@ -1,46 +0,0 @@ -package render - -import ( - "github.com/weaveworks/scope/report" -) - -// AggregateMetadata is a composable version of an EdgeMetadata. It's used -// when we want to merge nodes/edges for any reason. -// -// Even though we base it on EdgeMetadata, we can apply it to nodes, by -// summing up (merging) all of the {ingress, egress} metadatas of the -// {incoming, outgoing} edges to the node. -type AggregateMetadata map[string]int - -const ( - // KeyBytesIngress is the aggregate metadata key for the total count of - // ingress bytes. - KeyBytesIngress = "ingress_bytes" - // KeyBytesEgress is the aggregate metadata key for the total count of - // egress bytes. - KeyBytesEgress = "egress_bytes" - // KeyMaxConnCountTCP is the aggregate metadata key for the maximum number - // of simultaneous observed TCP connections in the window. - KeyMaxConnCountTCP = "max_conn_count_tcp" -) - -// AggregateMetadataOf calculates a AggregateMetadata from an EdgeMetadata. -func AggregateMetadataOf(md report.EdgeMetadata) AggregateMetadata { - m := AggregateMetadata{} - if md.WithBytes { - m[KeyBytesIngress] = int(md.BytesIngress) - m[KeyBytesEgress] = int(md.BytesEgress) - } - if md.WithConnCountTCP { - // The maximum is the maximum. No need to calculate anything. - m[KeyMaxConnCountTCP] = int(md.MaxConnCountTCP) - } - return m -} - -// Merge adds the fields from AggregateMetadata to r. r must be initialized. -func (r *AggregateMetadata) Merge(other AggregateMetadata) { - for k, v := range other { - (*r)[k] += v - } -} diff --git a/render/metadata_test.go b/render/metadata_test.go deleted file mode 100644 index 805506650..000000000 --- a/render/metadata_test.go +++ /dev/null @@ -1,87 +0,0 @@ -package render_test - -import ( - "reflect" - "testing" - - "github.com/weaveworks/scope/render" - "github.com/weaveworks/scope/report" -) - -func TestAggregateMetadata(t *testing.T) { - for from, want := range map[report.EdgeMetadata]render.AggregateMetadata{ - - // Simple connection count - report.EdgeMetadata{ - WithConnCountTCP: true, - MaxConnCountTCP: 400, - }: { - render.KeyMaxConnCountTCP: 400, - }, - - // Connection count rounding - report.EdgeMetadata{ - WithConnCountTCP: true, - MaxConnCountTCP: 4, - }: { - render.KeyMaxConnCountTCP: 4, - }, - - // 0 connections. - report.EdgeMetadata{ - WithConnCountTCP: true, - MaxConnCountTCP: 0, - }: { - render.KeyMaxConnCountTCP: 0, - }, - - // Egress - report.EdgeMetadata{ - WithBytes: true, - BytesEgress: 24, - BytesIngress: 0, - }: { - render.KeyBytesEgress: 24, - render.KeyBytesIngress: 0, - }, - - // Ingress - report.EdgeMetadata{ - WithBytes: true, - BytesEgress: 0, - BytesIngress: 1200, - }: { - render.KeyBytesEgress: 0, - render.KeyBytesIngress: 1200, - }, - - // Nothing there. - report.EdgeMetadata{}: {}, - } { - if have := render.AggregateMetadataOf(from); !reflect.DeepEqual(have, want) { - t.Errorf("have: %#v, want %#v", have, want) - } - - } -} - -func TestAggregateMetadataSum(t *testing.T) { - var ( - this = render.AggregateMetadata{ - "ingress_bytes": 3, - } - other = render.AggregateMetadata{ - "ingress_bytes": 333, - "egress_bytes": 3, - } - want = render.AggregateMetadata{ - "ingress_bytes": 336, - "egress_bytes": 3, - } - ) - - this.Merge(other) - if have := this; !reflect.DeepEqual(have, want) { - t.Errorf("have: %#v, want %#v", have, want) - } -} diff --git a/render/render.go b/render/render.go index b7adf6e4d..e80de1781 100644 --- a/render/render.go +++ b/render/render.go @@ -9,7 +9,7 @@ import ( // Renderer is something that can render a report to a set of RenderableNodes. type Renderer interface { Render(report.Report) RenderableNodes - AggregateMetadata(rpt report.Report, localID, remoteID string) AggregateMetadata + EdgeMetadata(rpt report.Report, localID, remoteID string) report.EdgeMetadata } // Reduce renderer is a Renderer which merges together the output of several @@ -50,11 +50,11 @@ func (r Reduce) Render(rpt report.Report) RenderableNodes { return result } -// AggregateMetadata produces an AggregateMetadata for a given edge. -func (r Reduce) AggregateMetadata(rpt report.Report, localID, remoteID string) AggregateMetadata { - metadata := AggregateMetadata{} +// EdgeMetadata produces an EdgeMetadata for a given edge. +func (r Reduce) EdgeMetadata(rpt report.Report, localID, remoteID string) report.EdgeMetadata { + metadata := report.EdgeMetadata{} for _, renderer := range r { - metadata.Merge(renderer.AggregateMetadata(rpt, localID, remoteID)) + metadata.Merge(renderer.EdgeMetadata(rpt, localID, remoteID)) } return metadata } @@ -107,11 +107,11 @@ func (m Map) render(rpt report.Report) (RenderableNodes, map[string]string) { return output, mapped } -// AggregateMetadata gives the metadata of an edge from the perspective of the +// EdgeMetadata gives the metadata of an edge from the perspective of the // srcRenderableID. Since an edgeID can have multiple edges on the address // level, it uses the supplied mapping function to translate address IDs to // renderable node (mapped) IDs. -func (m Map) AggregateMetadata(rpt report.Report, srcRenderableID, dstRenderableID string) AggregateMetadata { +func (m Map) EdgeMetadata(rpt report.Report, srcRenderableID, dstRenderableID string) report.EdgeMetadata { // First we need to map the ids in this layer into the ids in the underlying layer _, mapped := m.render(rpt) // this maps from old -> new inverted := map[string][]string{} // this maps from new -> old(s) @@ -130,9 +130,9 @@ func (m Map) AggregateMetadata(rpt report.Report, srcRenderableID, dstRenderable } // Now recurse for each old edge - output := AggregateMetadata{} + output := report.EdgeMetadata{} for _, edge := range oldEdges { - metadata := m.Renderer.AggregateMetadata(rpt, edge.src, edge.dst) + metadata := m.Renderer.EdgeMetadata(rpt, edge.src, edge.dst) output.Merge(metadata) } return output @@ -205,8 +205,9 @@ func (m LeafMap) Render(rpt report.Report) RenderableNodes { srcRenderableNode.Origins = srcRenderableNode.Origins.Add(srcNodeID) edgeID := report.MakeEdgeID(srcNodeID, dstNodeID) if md, ok := t.EdgeMetadatas[edgeID]; ok { - srcRenderableNode.AggregateMetadata.Merge(AggregateMetadataOf(md)) + srcRenderableNode.EdgeMetadata.Merge(md) } + } nodes[srcRenderableID] = srcRenderableNode @@ -215,11 +216,11 @@ func (m LeafMap) Render(rpt report.Report) RenderableNodes { return nodes } -// AggregateMetadata gives the metadata of an edge from the perspective of the +// EdgeMetadata gives the metadata of an edge from the perspective of the // srcRenderableID. Since an edgeID can have multiple edges on the address // level, it uses the supplied mapping function to translate address IDs to // renderable node (mapped) IDs. -func (m LeafMap) AggregateMetadata(rpt report.Report, srcRenderableID, dstRenderableID string) AggregateMetadata { +func (m LeafMap) EdgeMetadata(rpt report.Report, srcRenderableID, dstRenderableID string) report.EdgeMetadata { t := m.Selector(rpt) metadata := report.EdgeMetadata{} for edgeID, edgeMeta := range t.EdgeMetadatas { @@ -240,7 +241,7 @@ func (m LeafMap) AggregateMetadata(rpt report.Report, srcRenderableID, dstRender metadata.Flatten(edgeMeta) } } - return AggregateMetadataOf(metadata) + return metadata } // Render produces a set of RenderableNodes given a Report diff --git a/render/render_test.go b/render/render_test.go index bfb2e9ed1..03a9ae232 100644 --- a/render/render_test.go +++ b/render/render_test.go @@ -10,14 +10,14 @@ import ( type mockRenderer struct { render.RenderableNodes - aggregateMetadata render.AggregateMetadata + edgeMetadata report.EdgeMetadata } func (m mockRenderer) Render(rpt report.Report) render.RenderableNodes { return m.RenderableNodes } -func (m mockRenderer) AggregateMetadata(rpt report.Report, localID, remoteID string) render.AggregateMetadata { - return m.aggregateMetadata +func (m mockRenderer) EdgeMetadata(rpt report.Report, localID, remoteID string) report.EdgeMetadata { + return m.edgeMetadata } func TestReduceRender(t *testing.T) { @@ -36,12 +36,12 @@ func TestReduceRender(t *testing.T) { func TestReduceEdge(t *testing.T) { renderer := render.Reduce([]render.Renderer{ - mockRenderer{aggregateMetadata: render.AggregateMetadata{"foo": 1}}, - mockRenderer{aggregateMetadata: render.AggregateMetadata{"bar": 2}}, + mockRenderer{edgeMetadata: report.EdgeMetadata{EgressPacketCount: newu64(1)}}, + mockRenderer{edgeMetadata: report.EdgeMetadata{EgressPacketCount: newu64(2)}}, }) - want := render.AggregateMetadata{"foo": 1, "bar": 2} - have := renderer.AggregateMetadata(report.MakeReport(), "", "") + want := report.EdgeMetadata{EgressPacketCount: newu64(3)} + have := renderer.EdgeMetadata(report.MakeReport(), "", "") if !reflect.DeepEqual(want, have) { t.Errorf("want %+v, have %+v", want, have) @@ -118,8 +118,8 @@ func TestMapEdge(t *testing.T) { ">bar": report.MakeIDList("foo"), }, EdgeMetadatas: report.EdgeMetadatas{ - "foo|bar": report.EdgeMetadata{WithBytes: true, BytesIngress: 1, BytesEgress: 2}, - "bar|foo": report.EdgeMetadata{WithBytes: true, BytesIngress: 3, BytesEgress: 4}, + "foo|bar": report.EdgeMetadata{EgressPacketCount: newu64(1), EgressByteCount: newu64(2)}, + "bar|foo": report.EdgeMetadata{EgressPacketCount: newu64(3), EgressByteCount: newu64(4)}, }, } } @@ -139,12 +139,10 @@ func TestMapEdge(t *testing.T) { }, } - want := render.AggregateMetadata{ - render.KeyBytesIngress: 1, - render.KeyBytesEgress: 2, - } - have := mapper.AggregateMetadata(report.MakeReport(), "_foo", "_bar") - if !reflect.DeepEqual(want, have) { + if want, have := (report.EdgeMetadata{ + EgressPacketCount: newu64(1), + EgressByteCount: newu64(2), + }), mapper.EdgeMetadata(report.MakeReport(), "_foo", "_bar"); !reflect.DeepEqual(want, have) { t.Errorf("want %+v, have %+v", want, have) } } @@ -166,3 +164,5 @@ func TestFilterRender(t *testing.T) { t.Errorf("want %+v, have %+v", want, have) } } + +func newu64(value uint64) *uint64 { return &value } diff --git a/render/renderable_node.go b/render/renderable_node.go index bdd492fbe..074ca3774 100644 --- a/render/renderable_node.go +++ b/render/renderable_node.go @@ -16,7 +16,7 @@ type RenderableNode struct { Adjacency report.IDList `json:"adjacency,omitempty"` // Node IDs (in the same topology domain) Origins report.IDList `json:"origins,omitempty"` // Core node IDs that contributed information - AggregateMetadata `json:"metadata"` // Numeric sums + report.EdgeMetadata `json:"metadata"` // Numeric sums report.NodeMetadata `json:"-"` // merged NodeMetadata of the nodes used to build this } @@ -56,57 +56,57 @@ func (rn *RenderableNode) Merge(other RenderableNode) { rn.Adjacency = rn.Adjacency.Merge(other.Adjacency) rn.Origins = rn.Origins.Merge(other.Origins) - rn.AggregateMetadata.Merge(other.AggregateMetadata) + rn.EdgeMetadata.Merge(other.EdgeMetadata) rn.NodeMetadata.Merge(other.NodeMetadata) } // NewRenderableNode makes a new RenderableNode func NewRenderableNode(id, major, minor, rank string, nmd report.NodeMetadata) RenderableNode { return RenderableNode{ - ID: id, - LabelMajor: major, - LabelMinor: minor, - Rank: rank, - Pseudo: false, - AggregateMetadata: AggregateMetadata{}, - NodeMetadata: nmd.Copy(), + ID: id, + LabelMajor: major, + LabelMinor: minor, + Rank: rank, + Pseudo: false, + EdgeMetadata: report.EdgeMetadata{}, + NodeMetadata: nmd.Copy(), } } func newDerivedNode(id string, node RenderableNode) RenderableNode { return RenderableNode{ - ID: id, - LabelMajor: "", - LabelMinor: "", - Rank: "", - Pseudo: node.Pseudo, - AggregateMetadata: node.AggregateMetadata, - Origins: node.Origins, - NodeMetadata: report.MakeNodeMetadata(), + ID: id, + LabelMajor: "", + LabelMinor: "", + Rank: "", + Pseudo: node.Pseudo, + EdgeMetadata: node.EdgeMetadata, + Origins: node.Origins, + NodeMetadata: report.MakeNodeMetadata(), } } func newPseudoNode(id, major, minor string) RenderableNode { return RenderableNode{ - ID: id, - LabelMajor: major, - LabelMinor: minor, - Rank: "", - Pseudo: true, - AggregateMetadata: AggregateMetadata{}, - NodeMetadata: report.MakeNodeMetadata(), + ID: id, + LabelMajor: major, + LabelMinor: minor, + Rank: "", + Pseudo: true, + EdgeMetadata: report.EdgeMetadata{}, + NodeMetadata: report.MakeNodeMetadata(), } } func newDerivedPseudoNode(id, major string, node RenderableNode) RenderableNode { return RenderableNode{ - ID: id, - LabelMajor: major, - LabelMinor: "", - Rank: "", - Pseudo: true, - AggregateMetadata: node.AggregateMetadata, - Origins: node.Origins, - NodeMetadata: report.MakeNodeMetadata(), + ID: id, + LabelMajor: major, + LabelMinor: "", + Rank: "", + Pseudo: true, + EdgeMetadata: node.EdgeMetadata, + Origins: node.Origins, + NodeMetadata: report.MakeNodeMetadata(), } } diff --git a/render/topologies_test.go b/render/topologies_test.go index f5496aec7..f8f0402f8 100644 --- a/render/topologies_test.go +++ b/render/topologies_test.go @@ -23,7 +23,7 @@ func TestProcessRenderer(t *testing.T) { have := render.ProcessRenderer.Render(test.Report) have = trimNodeMetadata(have) if !reflect.DeepEqual(expected.RenderedProcesses, have) { - t.Error("\n" + test.Diff(expected.RenderedProcesses, have)) + t.Error(test.Diff(expected.RenderedProcesses, have)) } } @@ -31,7 +31,7 @@ func TestProcessNameRenderer(t *testing.T) { have := render.ProcessNameRenderer.Render(test.Report) have = trimNodeMetadata(have) if !reflect.DeepEqual(expected.RenderedProcessNames, have) { - t.Error("\n" + test.Diff(expected.RenderedProcessNames, have)) + t.Error(test.Diff(expected.RenderedProcessNames, have)) } } @@ -39,7 +39,7 @@ func TestContainerRenderer(t *testing.T) { have := render.ContainerRenderer.Render(test.Report) have = trimNodeMetadata(have) if !reflect.DeepEqual(expected.RenderedContainers, have) { - t.Error("\n" + test.Diff(expected.RenderedContainers, have)) + t.Error(test.Diff(expected.RenderedContainers, have)) } } @@ -47,7 +47,7 @@ func TestContainerImageRenderer(t *testing.T) { have := render.ContainerImageRenderer.Render(test.Report) have = trimNodeMetadata(have) if !reflect.DeepEqual(expected.RenderedContainerImages, have) { - t.Error("\n" + test.Diff(expected.RenderedContainerImages, have)) + t.Error(test.Diff(expected.RenderedContainerImages, have)) } } @@ -55,6 +55,6 @@ func TestHostRenderer(t *testing.T) { have := render.HostRenderer.Render(test.Report) have = trimNodeMetadata(have) if !reflect.DeepEqual(expected.RenderedHosts, have) { - t.Error("\n" + test.Diff(expected.RenderedHosts, have)) + t.Error(test.Diff(expected.RenderedHosts, have)) } } diff --git a/report/merge.go b/report/merge.go index 025bd4755..406abe6ce 100644 --- a/report/merge.go +++ b/report/merge.go @@ -3,7 +3,8 @@ package report // Merge functions for all topology datatypes. The general semantics are that // the receiver is modified, and what's merged in isn't. -// Merge merges another Report into the receiver. +// Merge merges another Report into the receiver. Pass addWindows true if the +// reports represent distinct (non-overlapping) periods of time. func (r *Report) Merge(other Report) { r.Endpoint.Merge(other.Endpoint) r.Address.Merge(other.Address) @@ -12,6 +13,8 @@ func (r *Report) Merge(other Report) { r.ContainerImage.Merge(other.ContainerImage) r.Host.Merge(other.Host) r.Overlay.Merge(other.Overlay) + r.Sampling.Merge(other.Sampling) + r.Window += other.Window } // Merge merges another Topology into the receiver. @@ -62,31 +65,49 @@ func (e *EdgeMetadatas) Merge(other EdgeMetadatas) { // Merge merges another EdgeMetadata into the receiver. The two edge metadatas // should represent the same edge on different times. func (m *EdgeMetadata) Merge(other EdgeMetadata) { - if other.WithBytes { - m.WithBytes = true - m.BytesIngress += other.BytesIngress - m.BytesEgress += other.BytesEgress - } - if other.WithConnCountTCP { - m.WithConnCountTCP = true - if other.MaxConnCountTCP > m.MaxConnCountTCP { - m.MaxConnCountTCP = other.MaxConnCountTCP - } - } + m.EgressPacketCount = merge(m.EgressPacketCount, other.EgressPacketCount, sum) + m.IngressPacketCount = merge(m.IngressPacketCount, other.IngressPacketCount, sum) + m.EgressByteCount = merge(m.EgressByteCount, other.EgressByteCount, sum) + m.IngressByteCount = merge(m.IngressByteCount, other.IngressByteCount, sum) + m.MaxConnCountTCP = merge(m.MaxConnCountTCP, other.MaxConnCountTCP, max) } // Flatten sums two EdgeMetadatas. Their windows should be the same duration; // they should represent different edges at the same time. func (m *EdgeMetadata) Flatten(other EdgeMetadata) { - if other.WithBytes { - m.WithBytes = true - m.BytesIngress += other.BytesIngress - m.BytesEgress += other.BytesEgress - } - if other.WithConnCountTCP { - m.WithConnCountTCP = true - // Note: summing of two maximums doesn't always give the true maximum. - // But it's our Best Effort effort. - m.MaxConnCountTCP += other.MaxConnCountTCP - } + m.EgressPacketCount = merge(m.EgressPacketCount, other.EgressPacketCount, sum) + m.IngressPacketCount = merge(m.IngressPacketCount, other.IngressPacketCount, sum) + m.EgressByteCount = merge(m.EgressByteCount, other.EgressByteCount, sum) + m.IngressByteCount = merge(m.IngressByteCount, other.IngressByteCount, sum) + // Note that summing of two maximums doesn't always give us the true + // maximum. But it's a best effort. + m.MaxConnCountTCP = merge(m.MaxConnCountTCP, other.MaxConnCountTCP, sum) +} + +// Merge combines two sampling structures via simple addition. +func (s *Sampling) Merge(other Sampling) { + s.Count += other.Count + s.Total += other.Total +} + +func merge(dst, src *uint64, op func(uint64, uint64) uint64) *uint64 { + if src == nil { + return dst + } + if dst == nil { + dst = new(uint64) + } + (*dst) = op(*dst, *src) + return dst +} + +func sum(dst, src uint64) uint64 { + return dst + src +} + +func max(dst, src uint64) uint64 { + if dst > src { + return dst + } + return src } diff --git a/report/merge_test.go b/report/merge_test.go index 99d04a48c..1426ea17f 100644 --- a/report/merge_test.go +++ b/report/merge_test.go @@ -112,102 +112,82 @@ func TestMergeEdgeMetadatas(t *testing.T) { a: report.EdgeMetadatas{}, b: report.EdgeMetadatas{ "hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - WithBytes: true, - BytesEgress: 12, - BytesIngress: 0, - WithConnCountTCP: true, - MaxConnCountTCP: 2, + EgressPacketCount: newu64(1), + MaxConnCountTCP: newu64(2), }, }, want: report.EdgeMetadatas{ "hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - WithBytes: true, - BytesEgress: 12, - BytesIngress: 0, - WithConnCountTCP: true, - MaxConnCountTCP: 2, + EgressPacketCount: newu64(1), + MaxConnCountTCP: newu64(2), }, }, }, "Empty b": { a: report.EdgeMetadatas{ "hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - WithBytes: true, - BytesEgress: 12, - BytesIngress: 0, + EgressPacketCount: newu64(12), + EgressByteCount: newu64(999), }, }, b: report.EdgeMetadatas{}, want: report.EdgeMetadatas{ "hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - WithBytes: true, - BytesEgress: 12, - BytesIngress: 0, + EgressPacketCount: newu64(12), + EgressByteCount: newu64(999), }, }, }, "Host merge": { a: report.EdgeMetadatas{ "hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - WithBytes: true, - BytesEgress: 12, - BytesIngress: 0, - WithConnCountTCP: true, - MaxConnCountTCP: 4, + EgressPacketCount: newu64(12), + EgressByteCount: newu64(500), + MaxConnCountTCP: newu64(4), }, }, b: report.EdgeMetadatas{ "hostQ|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - WithBytes: true, - BytesEgress: 1, - BytesIngress: 2, - WithConnCountTCP: true, - MaxConnCountTCP: 6, + EgressPacketCount: newu64(1), + EgressByteCount: newu64(2), + MaxConnCountTCP: newu64(6), }, }, want: report.EdgeMetadatas{ "hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - WithBytes: true, - BytesEgress: 12, - BytesIngress: 0, - WithConnCountTCP: true, - MaxConnCountTCP: 4, + EgressPacketCount: newu64(12), + EgressByteCount: newu64(500), + MaxConnCountTCP: newu64(4), }, "hostQ|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - WithBytes: true, - BytesEgress: 1, - BytesIngress: 2, - WithConnCountTCP: true, - MaxConnCountTCP: 6, + EgressPacketCount: newu64(1), + EgressByteCount: newu64(2), + MaxConnCountTCP: newu64(6), }, }, }, "Edge merge": { a: report.EdgeMetadatas{ "hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - WithBytes: true, - BytesEgress: 12, - BytesIngress: 0, - WithConnCountTCP: true, - MaxConnCountTCP: 7, + EgressPacketCount: newu64(12), + EgressByteCount: newu64(1000), + MaxConnCountTCP: newu64(7), }, }, b: report.EdgeMetadatas{ "hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - WithBytes: true, - BytesEgress: 1, - BytesIngress: 2, - WithConnCountTCP: true, - MaxConnCountTCP: 9, + EgressPacketCount: newu64(1), + IngressByteCount: newu64(123), + EgressByteCount: newu64(2), + MaxConnCountTCP: newu64(9), }, }, want: report.EdgeMetadatas{ "hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{ - WithBytes: true, - BytesEgress: 13, - BytesIngress: 2, - WithConnCountTCP: true, - MaxConnCountTCP: 9, + EgressPacketCount: newu64(13), + IngressByteCount: newu64(123), + EgressByteCount: newu64(1002), + MaxConnCountTCP: newu64(9), }, }, }, @@ -319,3 +299,5 @@ func TestMergeNodeMetadatas(t *testing.T) { } } } + +func newu64(value uint64) *uint64 { return &value } diff --git a/report/networks.go b/report/networks.go index 4fe5f6fe4..a1aa9e6e4 100644 --- a/report/networks.go +++ b/report/networks.go @@ -12,12 +12,11 @@ type Interface interface { Addrs() ([]net.Addr, error) } -// Variables exposed for testing +// Variables exposed for testing. +// TODO this design is broken, make it consistent with probe networks. var ( LocalNetworks = Networks{} - InterfaceByNameStub = func(name string) (Interface, error) { - return net.InterfaceByName(name) - } + InterfaceByNameStub = func(name string) (Interface, error) { return net.InterfaceByName(name) } ) // Contains returns true if IP is in Networks. diff --git a/report/report.go b/report/report.go index dc8f72d72..990d5bab0 100644 --- a/report/report.go +++ b/report/report.go @@ -1,5 +1,11 @@ package report +import ( + "fmt" + "strings" + "time" +) + // Report is the core data type. It's produced by probes, and consumed and // stored by apps. It's composed of multiple topologies, each representing // a different (related, but not equivalent) view of the network. @@ -36,6 +42,80 @@ type Report struct { // overlaid on the infrastructure. The information is scraped by polling // their status endpoints. Edges could be present, but aren't currently. Overlay Topology + + // Sampling data for this report. + Sampling Sampling + + // Window is the amount of time that this report purports to represent. + // Windows must be carefully merged. They should only be added when + // reports cover non-overlapping periods of time. By default, we assume + // that's true, and add windows in merge operations. When that's not true, + // such as in the app, we expect the component to overwrite the window + // before serving it to consumers. + Window time.Duration +} + +// MakeReport makes a clean report, ready to Merge() other reports into. +func MakeReport() Report { + return Report{ + Endpoint: NewTopology(), + Address: NewTopology(), + Process: NewTopology(), + Container: NewTopology(), + ContainerImage: NewTopology(), + Host: NewTopology(), + Overlay: NewTopology(), + Sampling: Sampling{}, + Window: 0, + } +} + +// Topologies returns a slice of Topologies in this report +func (r Report) Topologies() []Topology { + return []Topology{ + r.Endpoint, + r.Address, + r.Process, + r.Container, + r.ContainerImage, + r.Host, + r.Overlay, + } +} + +// Validate checks the report for various inconsistencies. +func (r Report) Validate() error { + var errs []string + for _, topology := range r.Topologies() { + if err := topology.Validate(); err != nil { + errs = append(errs, err.Error()) + } + } + if r.Sampling.Count > r.Sampling.Total { + errs = append(errs, fmt.Sprintf("sampling count (%d) bigger than total (%d)", r.Sampling.Count, r.Sampling.Total)) + } + if len(errs) > 0 { + return fmt.Errorf("%d error(s): %s", len(errs), strings.Join(errs, "; ")) + } + return nil +} + +// Sampling describes how the packet data sources for this report were +// sampled. It can be used to calculate effective sample rates. We can't +// just put the rate here, because that can't be accurately merged. Counts +// in e.g. edge metadata structures have already been adjusted to +// compensate for the sample rate. +type Sampling struct { + Count uint64 // observed and processed + Total uint64 // observed overall +} + +// Rate returns the effective sampling rate. +func (s Sampling) Rate() float64 { + if s.Total <= 0 { + return 1.0 + } + return float64(s.Count) / float64(s.Total) } const ( @@ -77,32 +157,3 @@ func SelectAddress(r Report) Topology { func SelectHost(r Report) Topology { return r.Host } - -// MakeReport makes a clean report, ready to Merge() other reports into. -func MakeReport() Report { - return Report{ - Endpoint: NewTopology(), - Address: NewTopology(), - Process: NewTopology(), - Container: NewTopology(), - ContainerImage: NewTopology(), - Host: NewTopology(), - Overlay: NewTopology(), - } -} - -// Topologies returns a slice of Topologies in this report -func (r Report) Topologies() []Topology { - return []Topology{r.Endpoint, r.Address, r.Process, r.Container, - r.ContainerImage, r.Host, r.Overlay} -} - -// Validate checks the report for various inconsistencies. -func (r Report) Validate() error { - for _, topology := range r.Topologies() { - if err := topology.Validate(); err != nil { - return err - } - } - return nil -} diff --git a/report/report_test.go b/report/report_test.go new file mode 100644 index 000000000..aa330ce1f --- /dev/null +++ b/report/report_test.go @@ -0,0 +1,27 @@ +package report_test + +import ( + "reflect" + "testing" + + "github.com/weaveworks/scope/report" +) + +// Make sure we don't add a topology and miss it in the Topologies method. +func TestReportTopologies(t *testing.T) { + var ( + reportType = reflect.TypeOf(report.MakeReport()) + topologyType = reflect.TypeOf(report.NewTopology()) + ) + + var want int + for i := 0; i < reportType.NumField(); i++ { + if reportType.Field(i).Type == topologyType { + want++ + } + } + + if have := len(report.MakeReport().Topologies()); want != have { + t.Errorf("want %d, have %d", want, have) + } +} diff --git a/report/topology.go b/report/topology.go index ce9055997..2ca9b3e0a 100644 --- a/report/topology.go +++ b/report/topology.go @@ -28,16 +28,14 @@ type EdgeMetadatas map[string]EdgeMetadata // IDs. type NodeMetadatas map[string]NodeMetadata -// EdgeMetadata describes a superset of the metadata that probes can -// conceivably (and usefully) collect about an edge between two nodes in any -// topology. +// EdgeMetadata describes a superset of the metadata that probes can possibly +// collect about a directed edge between two nodes in any topology. type EdgeMetadata struct { - WithBytes bool `json:"with_bytes,omitempty"` - BytesIngress uint `json:"bytes_ingress,omitempty"` // dst -> src - BytesEgress uint `json:"bytes_egress,omitempty"` // src -> dst - - WithConnCountTCP bool `json:"with_conn_count_tcp,omitempty"` - MaxConnCountTCP uint `json:"max_conn_count_tcp,omitempty"` + EgressPacketCount *uint64 `json:"egress_packet_count,omitempty"` + IngressPacketCount *uint64 `json:"ingress_packet_count,omitempty"` + EgressByteCount *uint64 `json:"egress_byte_count,omitempty"` // Transport layer + IngressByteCount *uint64 `json:"ingress_byte_count,omitempty"` // Transport layer + MaxConnCountTCP *uint64 `json:"max_conn_count_tcp,omitempty"` } // NodeMetadata describes a superset of the metadata that probes can collect diff --git a/test/report_fixture.go b/test/report_fixture.go index d811fc69f..fe3d1db67 100644 --- a/test/report_fixture.go +++ b/test/report_fixture.go @@ -1,6 +1,8 @@ package test import ( + "time" + "github.com/weaveworks/scope/probe/docker" "github.com/weaveworks/scope/probe/endpoint" "github.com/weaveworks/scope/probe/process" @@ -108,40 +110,33 @@ var ( }, EdgeMetadatas: report.EdgeMetadatas{ report.MakeEdgeID(Client54001NodeID, Server80NodeID): report.EdgeMetadata{ - WithBytes: true, - BytesIngress: 100, - BytesEgress: 10, + EgressPacketCount: newu64(10), + EgressByteCount: newu64(100), }, report.MakeEdgeID(Client54002NodeID, Server80NodeID): report.EdgeMetadata{ - WithBytes: true, - BytesIngress: 200, - BytesEgress: 20, + EgressPacketCount: newu64(20), + EgressByteCount: newu64(200), }, report.MakeEdgeID(Server80NodeID, Client54001NodeID): report.EdgeMetadata{ - WithBytes: true, - BytesIngress: 10, - BytesEgress: 100, + EgressPacketCount: newu64(10), + EgressByteCount: newu64(100), }, report.MakeEdgeID(Server80NodeID, Client54002NodeID): report.EdgeMetadata{ - WithBytes: true, - BytesIngress: 20, - BytesEgress: 200, + EgressPacketCount: newu64(20), + EgressByteCount: newu64(200), }, report.MakeEdgeID(Server80NodeID, UnknownClient1NodeID): report.EdgeMetadata{ - WithBytes: true, - BytesIngress: 30, - BytesEgress: 300, + EgressPacketCount: newu64(30), + EgressByteCount: newu64(300), }, report.MakeEdgeID(Server80NodeID, UnknownClient2NodeID): report.EdgeMetadata{ - WithBytes: true, - BytesIngress: 40, - BytesEgress: 400, + EgressPacketCount: newu64(40), + EgressByteCount: newu64(400), }, report.MakeEdgeID(Server80NodeID, UnknownClient3NodeID): report.EdgeMetadata{ - WithBytes: true, - BytesIngress: 50, - BytesEgress: 500, + EgressPacketCount: newu64(50), + EgressByteCount: newu64(500), }, }, }, @@ -222,12 +217,10 @@ var ( }, EdgeMetadatas: report.EdgeMetadatas{ report.MakeEdgeID(ClientAddressNodeID, ServerAddressNodeID): report.EdgeMetadata{ - WithConnCountTCP: true, - MaxConnCountTCP: 3, + MaxConnCountTCP: newu64(3), }, report.MakeEdgeID(ServerAddressNodeID, ClientAddressNodeID): report.EdgeMetadata{ - WithConnCountTCP: true, - MaxConnCountTCP: 3, + MaxConnCountTCP: newu64(3), }, }, }, @@ -251,6 +244,11 @@ var ( }, EdgeMetadatas: report.EdgeMetadatas{}, }, + Sampling: report.Sampling{ + Count: 1024, + Total: 4096, + }, + Window: 2 * time.Second, } ) @@ -259,3 +257,5 @@ func init() { panic(err) } } + +func newu64(value uint64) *uint64 { return &value }