Merge pull request #317 from weaveworks/sniff

gopacket sniffer
This commit is contained in:
Peter Bourgon
2015-08-06 16:21:12 +02:00
34 changed files with 1149 additions and 485 deletions

View File

@@ -26,7 +26,7 @@ $(APP_EXE): app/*.go render/*.go report/*.go xfer/*.go
$(PROBE_EXE): probe/*.go probe/docker/*.go probe/endpoint/*.go probe/host/*.go probe/process/*.go probe/overlay/*.go report/*.go xfer/*.go
$(APP_EXE) $(PROBE_EXE):
go get -tags netgo ./$(@D)
go get -d -tags netgo ./$(@D)
go build -ldflags "-extldflags \"-static\" -X main.version $(SCOPE_VERSION)" -tags netgo -o $@ ./$(@D)
@strings $@ | grep cgo_stub\\\.go >/dev/null || { \
rm $@; \

View File

@@ -8,6 +8,7 @@ import (
"github.com/gorilla/websocket"
"github.com/weaveworks/scope/render"
"github.com/weaveworks/scope/report"
)
const (
@@ -27,7 +28,7 @@ type APINode struct {
// APIEdge is returned by the /api/topology/*/*/* handlers.
type APIEdge struct {
Metadata render.AggregateMetadata `json:"metadata"`
Metadata report.EdgeMetadata `json:"metadata"`
}
// Full topology.
@@ -76,7 +77,7 @@ func handleEdge(rep Reporter, t topologyView, w http.ResponseWriter, r *http.Req
localID = vars["local"]
remoteID = vars["remote"]
rpt = rep.Report()
metadata = t.renderer.AggregateMetadata(rpt, localID, remoteID)
metadata = t.renderer.EdgeMetadata(rpt, localID, remoteID)
)
respondWith(w, http.StatusOK, APIEdge{Metadata: metadata})

View File

@@ -12,6 +12,7 @@ import (
"github.com/weaveworks/scope/render"
"github.com/weaveworks/scope/render/expected"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/test"
)
@@ -71,6 +72,7 @@ func TestAPITopologyApplications(t *testing.T) {
if err := json.Unmarshal(body, &topo); err != nil {
t.Fatal(err)
}
if want, have := render.OnlyConnected(expected.RenderedProcesses), fixNodeMetadatas(topo.Nodes); !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
}
@@ -93,9 +95,9 @@ func TestAPITopologyApplications(t *testing.T) {
if err := json.Unmarshal(body, &edge); err != nil {
t.Fatalf("JSON parse error: %s", err)
}
if want, have := (render.AggregateMetadata{
"egress_bytes": 10,
"ingress_bytes": 100,
if want, have := (report.EdgeMetadata{
EgressPacketCount: newu64(10),
EgressByteCount: newu64(100),
}), edge.Metadata; !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
}
@@ -112,7 +114,8 @@ func TestAPITopologyHosts(t *testing.T) {
if err := json.Unmarshal(body, &topo); err != nil {
t.Fatal(err)
}
if want, have := (expected.RenderedHosts), fixNodeMetadatas(topo.Nodes); !reflect.DeepEqual(want, have) {
if want, have := expected.RenderedHosts, fixNodeMetadatas(topo.Nodes); !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
}
}
@@ -134,11 +137,10 @@ func TestAPITopologyHosts(t *testing.T) {
if err := json.Unmarshal(body, &edge); err != nil {
t.Fatalf("JSON parse error: %s", err)
}
want := render.AggregateMetadata{
"max_conn_count_tcp": 3,
}
if !reflect.DeepEqual(want, edge.Metadata) {
t.Errorf("Edge metadata error. Want %v, have %v", want, edge)
if want, have := (report.EdgeMetadata{
MaxConnCountTCP: newu64(3),
}), edge.Metadata; !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
}
}
}
@@ -176,3 +178,5 @@ func TestAPITopologyWebsocket(t *testing.T) {
equals(t, 0, len(d.Update))
equals(t, 0, len(d.Remove))
}
func newu64(value uint64) *uint64 { return &value }

View File

@@ -51,9 +51,14 @@ func NewReportLIFO(r reporter, maxAge time.Duration) *ReportLIFO {
case req := <-l.requests:
// Request for the current report.
report := report.MakeReport()
oldest := time.Now()
for _, r := range l.reports {
if r.Timestamp.Before(oldest) {
oldest = r.Timestamp
}
report.Merge(r.Report)
}
report.Window = time.Now().Sub(oldest)
req <- report
case q := <-l.quit:

View File

@@ -19,7 +19,7 @@ dependencies:
cache_directories:
- "~/docker"
override:
- sudo apt-get --only-upgrade install tar
- sudo apt-get --only-upgrade install tar libpcap0.8-dev
- bin/rebuild-ui-build-image
- curl https://sdk.cloud.google.com | bash
- test -z "$SECRET_PASSWORD" || bin/setup-circleci-secrets "$SECRET_PASSWORD"
@@ -40,8 +40,7 @@ test:
- cd $SRCDIR; ./bin/lint .
- cd $SRCDIR; make client-test
- cd $SRCDIR; make static
- cd $SRCDIR; rm -f app/app probe/probe; GOOS=darwin make && make clean
- cd $SRCDIR; rm -f app/app probe/probe; GOOS=linux make
- cd $SRCDIR; rm -f app/scope-app probe/scope-probe; make
- cd $SRCDIR; ./bin/test -slow
- cd $SRCDIR/experimental; make
- test -z "$SECRET_PASSWORD" || (cd $SRCDIR/integration; ./gce.sh setup)

View File

@@ -91,12 +91,10 @@ func DemoReport(nodeCount int) report.Report {
edgeKeyIngress = report.MakeEdgeID(dstPortID, srcPortID)
)
r.Endpoint.EdgeMetadatas[edgeKeyEgress] = report.EdgeMetadata{
WithConnCountTCP: true,
MaxConnCountTCP: uint(rand.Intn(100) + 10),
MaxConnCountTCP: newu64(uint64(rand.Intn(100) + 10)),
}
r.Endpoint.EdgeMetadatas[edgeKeyIngress] = report.EdgeMetadata{
WithConnCountTCP: true,
MaxConnCountTCP: uint(rand.Intn(100) + 10),
MaxConnCountTCP: newu64(uint64(rand.Intn(100) + 10)),
}
// Address topology
@@ -124,3 +122,5 @@ func DemoReport(nodeCount int) report.Report {
return r
}
func newu64(value uint64) *uint64 { return &value }

View File

@@ -89,12 +89,10 @@ func DemoReport(nodeCount int) report.Report {
edgeKeyIngress = report.MakeEdgeID(dstPortID, srcPortID)
)
r.Endpoint.EdgeMetadatas[edgeKeyEgress] = report.EdgeMetadata{
WithConnCountTCP: true,
MaxConnCountTCP: uint(rand.Intn(100) + 10),
MaxConnCountTCP: newu64(uint64(rand.Intn(100) + 10)),
}
r.Endpoint.EdgeMetadatas[edgeKeyIngress] = report.EdgeMetadata{
WithConnCountTCP: true,
MaxConnCountTCP: uint(rand.Intn(100) + 10),
MaxConnCountTCP: newu64(uint64(rand.Intn(100) + 10)),
}
// Address topology
@@ -122,3 +120,5 @@ func DemoReport(nodeCount int) report.Report {
return r
}
func newu64(value uint64) *uint64 { return &value }

View File

@@ -79,15 +79,15 @@ func (r *Reporter) addConnection(rpt *report.Report, c *procspy.Connection) {
var (
localAddressNodeID = report.MakeAddressNodeID(r.hostID, c.LocalAddress.String())
remoteAddressNodeID = report.MakeAddressNodeID(r.hostID, c.RemoteAddress.String())
adjacencyID = report.MakeAdjacencyID(localAddressNodeID)
adjecencyID = report.MakeAdjacencyID(localAddressNodeID)
edgeID = report.MakeEdgeID(localAddressNodeID, remoteAddressNodeID)
)
rpt.Address.Adjacency[adjacencyID] = rpt.Address.Adjacency[adjacencyID].Add(remoteAddressNodeID)
rpt.Address.Adjacency[adjecencyID] = rpt.Address.Adjacency[adjecencyID].Add(remoteAddressNodeID)
if _, ok := rpt.Address.NodeMetadatas[localAddressNodeID]; !ok {
rpt.Address.NodeMetadatas[localAddressNodeID] = report.MakeNodeMetadataWith(map[string]string{
"name": r.hostName, // TODO this is ambiguous, be more specific
"name": r.hostName,
Addr: c.LocalAddress.String(),
})
}
@@ -98,11 +98,11 @@ func (r *Reporter) addConnection(rpt *report.Report, c *procspy.Connection) {
var (
localEndpointNodeID = report.MakeEndpointNodeID(r.hostID, c.LocalAddress.String(), strconv.Itoa(int(c.LocalPort)))
remoteEndpointNodeID = report.MakeEndpointNodeID(r.hostID, c.RemoteAddress.String(), strconv.Itoa(int(c.RemotePort)))
adjacencyID = report.MakeAdjacencyID(localEndpointNodeID)
adjecencyID = report.MakeAdjacencyID(localEndpointNodeID)
edgeID = report.MakeEdgeID(localEndpointNodeID, remoteEndpointNodeID)
)
rpt.Endpoint.Adjacency[adjacencyID] = rpt.Endpoint.Adjacency[adjacencyID].Add(remoteEndpointNodeID)
rpt.Endpoint.Adjacency[adjecencyID] = rpt.Endpoint.Adjacency[adjecencyID].Add(remoteEndpointNodeID)
if _, ok := rpt.Endpoint.NodeMetadatas[localEndpointNodeID]; !ok {
// First hit establishes NodeMetadata for scoped local address + port
@@ -119,9 +119,11 @@ func (r *Reporter) addConnection(rpt *report.Report, c *procspy.Connection) {
}
}
func countTCPConnection(m report.EdgeMetadatas, edgeKey string) {
edgeMeta := m[edgeKey]
edgeMeta.WithConnCountTCP = true
edgeMeta.MaxConnCountTCP++
m[edgeKey] = edgeMeta
func countTCPConnection(mds report.EdgeMetadatas, key string) {
md := mds[key]
if md.MaxConnCountTCP == nil {
md.MaxConnCountTCP = new(uint64)
}
*md.MaxConnCountTCP++
mds[key] = md
}

View File

@@ -1,7 +1,6 @@
package host
import (
"net"
"runtime"
"strings"
"time"
@@ -9,7 +8,7 @@ import (
"github.com/weaveworks/scope/report"
)
// Keys for use in NodeMetadata
// Keys for use in NodeMetadata.
const (
Timestamp = "ts"
HostName = "host_name"
@@ -20,30 +19,31 @@ const (
Uptime = "uptime"
)
// Exposed for testing
// Exposed for testing.
const (
ProcUptime = "/proc/uptime"
ProcLoad = "/proc/loadavg"
)
// Exposed for testing
// Exposed for testing.
var (
InterfaceAddrs = net.InterfaceAddrs
Now = func() string { return time.Now().UTC().Format(time.RFC3339Nano) }
Now = func() string { return time.Now().UTC().Format(time.RFC3339Nano) }
)
// Reporter generates Reports containing the host topology.
type Reporter struct {
hostID string
hostName string
hostID string
hostName string
localNets report.Networks
}
// NewReporter returns a Reporter which produces a report containing host
// topology for this host.
func NewReporter(hostID, hostName string) *Reporter {
func NewReporter(hostID, hostName string, localNets report.Networks) *Reporter {
return &Reporter{
hostID: hostID,
hostName: hostName,
hostID: hostID,
hostName: hostName,
localNets: localNets,
}
}
@@ -54,15 +54,8 @@ func (r *Reporter) Report() (report.Report, error) {
localCIDRs []string
)
localNets, err := InterfaceAddrs()
if err != nil {
return rep, err
}
for _, localNet := range localNets {
// Not all networks are IP networks.
if ipNet, ok := localNet.(*net.IPNet); ok {
localCIDRs = append(localCIDRs, ipNet.String())
}
for _, localNet := range r.localNets {
localCIDRs = append(localCIDRs, localNet.String())
}
uptime, err := GetUptime()

View File

@@ -12,38 +12,37 @@ import (
"github.com/weaveworks/scope/test"
)
const (
release = "release"
version = "version"
network = "192.168.0.0/16"
hostID = "hostid"
now = "now"
hostname = "hostname"
load = "0.59 0.36 0.29"
uptime = "278h55m43s"
kernel = "release version"
)
func TestReporter(t *testing.T) {
var (
release = "release"
version = "version"
network = "192.168.0.0/16"
hostID = "hostid"
now = "now"
hostname = "hostname"
load = "0.59 0.36 0.29"
uptime = "278h55m43s"
kernel = "release version"
_, ipnet, _ = net.ParseCIDR(network)
localNets = report.Networks([]*net.IPNet{ipnet})
)
var (
oldGetKernelVersion = host.GetKernelVersion
oldGetLoad = host.GetLoad
oldGetUptime = host.GetUptime
oldInterfaceAddrs = host.InterfaceAddrs
oldNow = host.Now
)
defer func() {
host.GetKernelVersion = oldGetKernelVersion
host.GetLoad = oldGetLoad
host.GetUptime = oldGetUptime
host.InterfaceAddrs = oldInterfaceAddrs
host.Now = oldNow
}()
host.GetKernelVersion = func() (string, error) { return release + " " + version, nil }
host.GetLoad = func() string { return load }
host.GetUptime = func() (time.Duration, error) { return time.ParseDuration(uptime) }
host.Now = func() string { return now }
host.InterfaceAddrs = func() ([]net.Addr, error) { _, ipnet, _ := net.ParseCIDR(network); return []net.Addr{ipnet}, nil }
want := report.MakeReport()
want.Host.NodeMetadatas[report.MakeHostNodeID(hostID)] = report.MakeNodeMetadataWith(map[string]string{
@@ -55,8 +54,7 @@ func TestReporter(t *testing.T) {
host.Uptime: uptime,
host.KernelVersion: kernel,
})
r := host.NewReporter(hostID, hostname)
have, _ := r.Report()
have, _ := host.NewReporter(hostID, hostname, localNets).Report()
if !reflect.DeepEqual(want, have) {
t.Errorf("%s", test.Diff(want, have))
}

View File

@@ -3,11 +3,14 @@ package main
import (
"flag"
"log"
"net"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"runtime"
"strconv"
"strings"
"syscall"
"time"
@@ -17,6 +20,7 @@ import (
"github.com/weaveworks/scope/probe/host"
"github.com/weaveworks/scope/probe/overlay"
"github.com/weaveworks/scope/probe/process"
"github.com/weaveworks/scope/probe/sniff"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/xfer"
)
@@ -36,8 +40,13 @@ func main() {
dockerBridge = flag.String("docker.bridge", "docker0", "the docker bridge name")
weaveRouterAddr = flag.String("weave.router.addr", "", "IP address or FQDN of the Weave router")
procRoot = flag.String("proc.root", "/proc", "location of the proc filesystem")
captureEnabled = flag.Bool("capture", false, "perform sampled packet capture")
captureInterfaces = flag.String("capture.interfaces", interfaces(), "packet capture on these interfaces")
captureOn = flag.Duration("capture.on", 1*time.Second, "packet capture duty cycle 'on'")
captureOff = flag.Duration("capture.off", 5*time.Second, "packet capture duty cycle 'off'")
)
flag.Parse()
log.SetFlags(log.Lmicroseconds)
if len(flag.Args()) != 0 {
flag.Usage()
@@ -71,11 +80,23 @@ func main() {
}
defer publisher.Close()
addrs, err := net.InterfaceAddrs()
if err != nil {
log.Fatal(err)
}
localNets := report.Networks{}
for _, addr := range addrs {
// Not all addrs are IPNets.
if ipNet, ok := addr.(*net.IPNet); ok {
localNets = append(localNets, ipNet)
}
}
var (
hostName = hostname()
hostID = hostName // TODO: we should sanitize the hostname
taggers = []Tagger{newTopologyTagger(), host.NewTagger(hostID)}
reporters = []Reporter{host.NewReporter(hostID, hostName), endpoint.NewReporter(hostID, hostName, *spyProcs)}
reporters = []Reporter{host.NewReporter(hostID, hostName, localNets), endpoint.NewReporter(hostID, hostName, *spyProcs)}
processCache *process.CachingWalker
)
@@ -106,6 +127,26 @@ func main() {
reporters = append(reporters, weave)
}
if *captureEnabled {
var sniffers int
for _, iface := range strings.Split(*captureInterfaces, ",") {
source, err := sniff.NewSource(iface)
if err != nil {
log.Printf("warning: %v", err)
continue
}
defer source.Close()
log.Printf("capturing packets on %s", iface)
reporters = append(reporters, sniff.New(hostID, localNets, source, *captureOn, *captureOff))
sniffers++
}
// Packet capture can block OS threads on Linux, so we need to provide
// sufficient overhead in GOMAXPROCS.
if have, want := runtime.GOMAXPROCS(-1), (sniffers + 1); have < want {
runtime.GOMAXPROCS(want)
}
}
log.Printf("listening on %s", *listen)
quit := make(chan struct{})
@@ -121,6 +162,7 @@ func main() {
select {
case <-pubTick:
publishTicks.WithLabelValues().Add(1)
r.Window = *publishInterval
publisher.Publish(r)
r = report.MakeReport()
@@ -128,7 +170,6 @@ func main() {
if err := processCache.Update(); err != nil {
log.Printf("error reading processes: %v", err)
}
for _, reporter := range reporters {
newReport, err := reporter.Report()
if err != nil {
@@ -136,7 +177,6 @@ func main() {
}
r.Merge(newReport)
}
r = Apply(r, taggers)
case <-quit:
@@ -153,3 +193,16 @@ func interrupt() chan os.Signal {
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
return c
}
func interfaces() string {
ifaces, err := net.Interfaces()
if err != nil {
log.Print(err)
return ""
}
a := make([]string, 0, len(ifaces))
for _, iface := range ifaces {
a = append(a, iface.Name)
}
return strings.Join(a, ",")
}

View File

@@ -9,7 +9,6 @@ import (
)
func TestBasicWalk(t *testing.T) {
// Don't panic or error.
var (
procRoot = "/proc"
procFunc = func(process.Process) {}

330
probe/sniff/sniffer.go Normal file
View File

@@ -0,0 +1,330 @@
package sniff
import (
"io"
"log"
"net"
"strconv"
"sync/atomic"
"time"
"github.com/weaveworks/scope/report"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
)
// Sniffer is a packet-sniffing reporter.
type Sniffer struct {
hostID string
localNets report.Networks
reports chan chan report.Report
parser *gopacket.DecodingLayerParser
decoded []gopacket.LayerType
eth layers.Ethernet
ip4 layers.IPv4
ip6 layers.IPv6
tcp layers.TCP
udp layers.UDP
icmp4 layers.ICMPv4
icmp6 layers.ICMPv6
}
// New returns a new sniffing reporter that samples traffic by turning its
// packet capture facilities on and off. Note that the on and off durations
// represent a way to bound CPU burn. Effective sample rate needs to be
// calculated as (packets decoded / packets observed).
func New(hostID string, localNets report.Networks, src gopacket.ZeroCopyPacketDataSource, on, off time.Duration) *Sniffer {
s := &Sniffer{
hostID: hostID,
localNets: localNets,
reports: make(chan chan report.Report),
}
s.parser = gopacket.NewDecodingLayerParser(
layers.LayerTypeEthernet,
&s.eth, &s.ip4, &s.ip6, &s.tcp, &s.udp, &s.icmp4, &s.icmp6,
)
go s.loop(src, on, off)
return s
}
// Report implements the Reporter interface.
func (s *Sniffer) Report() (report.Report, error) {
c := make(chan report.Report)
s.reports <- c
return <-c, nil
}
func (s *Sniffer) loop(src gopacket.ZeroCopyPacketDataSource, on, off time.Duration) {
var (
process = uint64(1) // initially enabled
total = uint64(0) // total packets seen
count = uint64(0) // count of packets captured
packets = make(chan Packet, 1024) // decoded packets
rpt = report.MakeReport() // the report we build
turnOn = (<-chan time.Time)(nil) // signal to start capture (initially enabled)
turnOff = time.After(on) // signal to stop capture
done = make(chan struct{}) // when src is finished, we're done too
)
// As a special case, if our off duty cycle is zero, i.e. 100% sample
// rate, we simply disable the turn-off signal channel.
if off == 0 {
turnOff = nil
}
go func() {
s.read(src, packets, &process, &total, &count)
close(done)
}()
for {
select {
case p := <-packets:
s.Merge(p, rpt)
case <-turnOn:
atomic.StoreUint64(&process, 1) // enable packet capture
turnOn = nil // disable the on switch
turnOff = time.After(on) // enable the off switch
case <-turnOff:
atomic.StoreUint64(&process, 0) // disable packet capture
turnOn = time.After(off) // enable the on switch
turnOff = nil // disable the off switch
case c := <-s.reports:
rpt.Sampling.Count = atomic.LoadUint64(&count)
rpt.Sampling.Total = atomic.LoadUint64(&total)
interpolateCounts(rpt)
c <- rpt
atomic.StoreUint64(&count, 0)
atomic.StoreUint64(&total, 0)
rpt = report.MakeReport()
case <-done:
return
}
}
}
// interpolateCounts compensates for sampling by artifically inflating counts
// throughout the report. It should be run once for each report, within the
// probe, before it gets emitted into the rest of the system.
func interpolateCounts(r report.Report) {
rate := r.Sampling.Rate()
if rate >= 1.0 {
return
}
factor := 1.0 / rate
for _, topology := range r.Topologies() {
for _, emd := range topology.EdgeMetadatas {
if emd.EgressPacketCount != nil {
*emd.EgressPacketCount = uint64(float64(*emd.EgressPacketCount) * factor)
}
if emd.IngressPacketCount != nil {
*emd.IngressPacketCount = uint64(float64(*emd.IngressPacketCount) * factor)
}
if emd.EgressByteCount != nil {
*emd.EgressByteCount = uint64(float64(*emd.EgressByteCount) * factor)
}
if emd.IngressByteCount != nil {
*emd.IngressByteCount = uint64(float64(*emd.IngressByteCount) * factor)
}
}
}
}
// Packet is an intermediate, decoded form of a packet, with the information
// that the Scope data model cares about. Designed to decouple the packet data
// source loop, which should be as fast as possible, and the process of
// merging the packet information to a report, which may take some time and
// allocations.
type Packet struct {
SrcIP, DstIP string
SrcPort, DstPort string
Network, Transport int // byte counts
}
func (s *Sniffer) read(src gopacket.ZeroCopyPacketDataSource, dst chan Packet, process, total, count *uint64) {
var (
data []byte
err error
)
for {
data, _, err = src.ZeroCopyReadPacketData()
if err == io.EOF {
return // done
}
if err != nil {
log.Printf("sniffer: read: %v", err)
continue
}
atomic.AddUint64(total, 1)
if atomic.LoadUint64(process) == 0 {
continue
}
if err := s.parser.DecodeLayers(data, &s.decoded); err != nil {
// We'll always get an error when we encounter a layer type for
// which we haven't configured a decoder.
}
var p Packet
for _, t := range s.decoded {
switch t {
case layers.LayerTypeEthernet:
//
case layers.LayerTypeICMPv4:
p.Network += len(s.icmp4.Payload)
case layers.LayerTypeICMPv6:
p.Network += len(s.icmp6.Payload)
case layers.LayerTypeIPv4:
p.SrcIP = s.ip4.SrcIP.String()
p.DstIP = s.ip4.DstIP.String()
p.Network += len(s.ip4.Payload)
case layers.LayerTypeIPv6:
p.SrcIP = s.ip6.SrcIP.String()
p.DstIP = s.ip6.DstIP.String()
p.Network += len(s.ip6.Payload)
case layers.LayerTypeTCP:
p.SrcPort = strconv.Itoa(int(s.tcp.SrcPort))
p.DstPort = strconv.Itoa(int(s.tcp.DstPort))
p.Transport += len(s.tcp.Payload)
case layers.LayerTypeUDP:
p.SrcPort = strconv.Itoa(int(s.udp.SrcPort))
p.DstPort = strconv.Itoa(int(s.udp.DstPort))
p.Transport += len(s.udp.Payload)
}
}
select {
case dst <- p:
atomic.AddUint64(count, 1)
default:
log.Printf("sniffer dropped packet")
}
}
}
// Merge puts the packet into the report.
//
// Note that, for the moment, we encode bidirectional traffic as ingress and
// egress traffic on a single edge whose src is local and dst is remote. That
// is, if we see a packet from the remote addr 9.8.7.6 to the local addr
// 1.2.3.4, we apply it as *ingress* on the edge (1.2.3.4 -> 9.8.7.6).
func (s *Sniffer) Merge(p Packet, rpt report.Report) {
if p.SrcIP == "" || p.DstIP == "" {
return
}
// One end of the traffic has to be local. Otherwise, we don't know how to
// construct the edge.
//
// If we need to get around this limitation, we may be able to change the
// semantics of the report, and allow the src side of edges to be from
// anywhere. But that will have ramifications throughout Scope (read: it
// may violate implicit invariants) and needs to be thought through.
var (
srcLocal = s.localNets.Contains(net.ParseIP(p.SrcIP))
dstLocal = s.localNets.Contains(net.ParseIP(p.DstIP))
localIP string
remoteIP string
localPort string
remotePort string
egress bool
)
switch {
case srcLocal && !dstLocal:
localIP, localPort, remoteIP, remotePort, egress = p.SrcIP, p.SrcPort, p.DstIP, p.DstPort, true
case !srcLocal && dstLocal:
localIP, localPort, remoteIP, remotePort, egress = p.DstIP, p.DstPort, p.SrcIP, p.SrcPort, false
case srcLocal && dstLocal:
localIP, localPort, remoteIP, remotePort, egress = p.SrcIP, p.SrcPort, p.DstIP, p.DstPort, true // loopback
case !srcLocal && !dstLocal:
log.Printf("sniffer ignoring remote-to-remote (%s -> %s) traffic", p.SrcIP, p.DstIP)
return
}
// For sure, we can add to the address topology.
{
var (
srcNodeID = report.MakeAddressNodeID(s.hostID, localIP)
dstNodeID = report.MakeAddressNodeID(s.hostID, remoteIP)
edgeID = report.MakeEdgeID(srcNodeID, dstNodeID)
srcAdjacencyID = report.MakeAdjacencyID(srcNodeID)
)
if _, ok := rpt.Address.NodeMetadatas[srcNodeID]; !ok {
rpt.Address.NodeMetadatas[srcNodeID] = report.MakeNodeMetadata()
}
emd := rpt.Address.EdgeMetadatas[edgeID]
if egress {
if emd.EgressPacketCount == nil {
emd.EgressPacketCount = new(uint64)
}
*emd.EgressPacketCount++
if emd.EgressByteCount == nil {
emd.EgressByteCount = new(uint64)
}
*emd.EgressByteCount += uint64(p.Network)
} else {
if emd.IngressPacketCount == nil {
emd.IngressPacketCount = new(uint64)
}
*emd.IngressPacketCount++
if emd.IngressByteCount == nil {
emd.IngressByteCount = new(uint64)
}
*emd.IngressByteCount += uint64(p.Network)
}
rpt.Address.EdgeMetadatas[edgeID] = emd
rpt.Address.Adjacency[srcAdjacencyID] = rpt.Address.Adjacency[srcAdjacencyID].Add(dstNodeID)
}
// If we have ports, we can add to the endpoint topology, too.
if p.SrcPort != "" && p.DstPort != "" {
var (
srcNodeID = report.MakeEndpointNodeID(s.hostID, localIP, localPort)
dstNodeID = report.MakeEndpointNodeID(s.hostID, remoteIP, remotePort)
edgeID = report.MakeEdgeID(srcNodeID, dstNodeID)
srcAdjacencyID = report.MakeAdjacencyID(srcNodeID)
)
if _, ok := rpt.Endpoint.NodeMetadatas[srcNodeID]; !ok {
rpt.Endpoint.NodeMetadatas[srcNodeID] = report.MakeNodeMetadata()
}
emd := rpt.Endpoint.EdgeMetadatas[edgeID]
if egress {
if emd.EgressPacketCount == nil {
emd.EgressPacketCount = new(uint64)
}
*emd.EgressPacketCount++
if emd.EgressByteCount == nil {
emd.EgressByteCount = new(uint64)
}
*emd.EgressByteCount += uint64(p.Transport)
} else {
if emd.IngressPacketCount == nil {
emd.IngressPacketCount = new(uint64)
}
*emd.IngressPacketCount++
if emd.IngressByteCount == nil {
emd.IngressByteCount = new(uint64)
}
*emd.IngressByteCount += uint64(p.Transport)
}
rpt.Endpoint.EdgeMetadatas[edgeID] = emd
rpt.Endpoint.Adjacency[srcAdjacencyID] = rpt.Endpoint.Adjacency[srcAdjacencyID].Add(dstNodeID)
}
}

View File

@@ -0,0 +1,53 @@
package sniff
import (
"testing"
"github.com/weaveworks/scope/report"
)
func TestInterpolateCounts(t *testing.T) {
var (
hostID = "macbook-air"
srcNodeID = report.MakeEndpointNodeID(hostID, "1.2.3.4", "5678")
dstNodeID = report.MakeEndpointNodeID(hostID, "5.6.7.8", "9012")
edgeID = report.MakeEdgeID(srcNodeID, dstNodeID)
samplingCount = uint64(200)
samplingTotal = uint64(2345)
packetCount = uint64(123)
byteCount = uint64(4096)
)
r := report.MakeReport()
r.Sampling.Count = samplingCount
r.Sampling.Total = samplingTotal
r.Endpoint.EdgeMetadatas[edgeID] = report.EdgeMetadata{
EgressPacketCount: newu64(packetCount),
IngressPacketCount: newu64(packetCount),
EgressByteCount: newu64(byteCount),
IngressByteCount: newu64(byteCount),
}
interpolateCounts(r)
var (
rate = float64(samplingCount) / float64(samplingTotal)
factor = 1.0 / rate
apply = func(v uint64) uint64 { return uint64(factor * float64(v)) }
emd = r.Endpoint.EdgeMetadatas[edgeID]
)
if want, have := apply(packetCount), (*emd.EgressPacketCount); want != have {
t.Errorf("want %d packets, have %d", want, have)
}
if want, have := apply(packetCount), (*emd.IngressPacketCount); want != have {
t.Errorf("want %d packets, have %d", want, have)
}
if want, have := apply(byteCount), (*emd.EgressByteCount); want != have {
t.Errorf("want %d bytes, have %d", want, have)
}
if want, have := apply(byteCount), (*emd.IngressByteCount); want != have {
t.Errorf("want %d bytes, have %d", want, have)
}
}
func newu64(value uint64) *uint64 { return &value }

139
probe/sniff/sniffer_test.go Normal file
View File

@@ -0,0 +1,139 @@
package sniff_test
import (
"io"
"net"
"reflect"
"sync"
"testing"
"time"
"github.com/google/gopacket"
"github.com/weaveworks/scope/probe/sniff"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/test"
)
func TestSnifferShutdown(t *testing.T) {
var (
hostID = "abcd"
src = newMockSource([]byte{}, nil)
on = time.Millisecond
off = time.Millisecond
s = sniff.New(hostID, report.Networks{}, src, on, off)
)
// Stopping the source should terminate the sniffer.
src.Close()
time.Sleep(10 * time.Millisecond)
// Try to get a report from the sniffer. It should block forever, as the
// loop goroutine should have exited.
report := make(chan struct{})
go func() { _, _ = s.Report(); close(report) }()
select {
case <-time.After(time.Millisecond):
case <-report:
t.Errorf("shouldn't get report after Close")
}
}
func TestMerge(t *testing.T) {
var (
hostID = "xyz"
src = newMockSource([]byte{}, nil)
on = time.Millisecond
off = time.Millisecond
rpt = report.MakeReport()
p = sniff.Packet{
SrcIP: "1.0.0.0",
SrcPort: "1000",
DstIP: "2.0.0.0",
DstPort: "2000",
Network: 512,
Transport: 256,
}
_, ipnet, _ = net.ParseCIDR(p.SrcIP + "/24") // ;)
localNets = report.Networks([]*net.IPNet{ipnet})
)
sniff.New(hostID, localNets, src, on, off).Merge(p, rpt)
var (
srcEndpointNodeID = report.MakeEndpointNodeID(hostID, p.SrcIP, p.SrcPort)
dstEndpointNodeID = report.MakeEndpointNodeID(hostID, p.DstIP, p.DstPort)
)
if want, have := (report.Topology{
Adjacency: report.Adjacency{
report.MakeAdjacencyID(srcEndpointNodeID): report.MakeIDList(
dstEndpointNodeID,
),
},
EdgeMetadatas: report.EdgeMetadatas{
report.MakeEdgeID(srcEndpointNodeID, dstEndpointNodeID): report.EdgeMetadata{
EgressPacketCount: newu64(1),
EgressByteCount: newu64(256),
},
},
NodeMetadatas: report.NodeMetadatas{
srcEndpointNodeID: report.MakeNodeMetadata(),
},
}), rpt.Endpoint; !reflect.DeepEqual(want, have) {
t.Errorf("%s", test.Diff(want, have))
}
var (
srcAddressNodeID = report.MakeAddressNodeID(hostID, p.SrcIP)
dstAddressNodeID = report.MakeAddressNodeID(hostID, p.DstIP)
)
if want, have := (report.Topology{
Adjacency: report.Adjacency{
report.MakeAdjacencyID(srcAddressNodeID): report.MakeIDList(
dstAddressNodeID,
),
},
EdgeMetadatas: report.EdgeMetadatas{
report.MakeEdgeID(srcAddressNodeID, dstAddressNodeID): report.EdgeMetadata{
EgressPacketCount: newu64(1),
EgressByteCount: newu64(512),
},
},
NodeMetadatas: report.NodeMetadatas{
srcAddressNodeID: report.MakeNodeMetadata(),
},
}), rpt.Address; !reflect.DeepEqual(want, have) {
t.Errorf("%s", test.Diff(want, have))
}
}
type mockSource struct {
mtx sync.RWMutex
data []byte
err error
}
func newMockSource(data []byte, err error) *mockSource {
return &mockSource{
data: data,
err: err,
}
}
func (s *mockSource) ZeroCopyReadPacketData() ([]byte, gopacket.CaptureInfo, error) {
s.mtx.RLock()
defer s.mtx.RUnlock()
return s.data, gopacket.CaptureInfo{
Timestamp: time.Now(),
CaptureLength: len(s.data),
Length: len(s.data),
}, s.err
}
func (s *mockSource) Close() {
s.mtx.Lock()
defer s.mtx.Unlock()
s.err = io.EOF
}
func newu64(value uint64) *uint64 { return &value }

24
probe/sniff/source.go Normal file
View File

@@ -0,0 +1,24 @@
package sniff
import (
"github.com/google/gopacket"
"github.com/google/gopacket/pcap"
)
// Source describes a packet data source that can be terminated.
type Source interface {
gopacket.ZeroCopyPacketDataSource
Close()
}
const (
snaplen = 65535
promisc = true
timeout = pcap.BlockForever
)
// NewSource returns a live packet data source via the passed device
// (interface).
func NewSource(device string) (Source, error) {
return pcap.OpenLive(device, snaplen, promisc, timeout)
}

View File

@@ -0,0 +1,87 @@
package main
import (
"flag"
"io"
"log"
"time"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
)
func main() {
var (
device = flag.String("device", "eth0", "device to sniff")
)
flag.Parse()
const (
snaplen = 1024 * 1024
promisc = true
timeout = pcap.BlockForever
)
handle, err := pcap.OpenLive(*device, snaplen, promisc, timeout)
if err != nil {
log.Fatal(err)
}
go func() {
time.Sleep(5 * time.Second)
handle.Close()
}()
var (
eth layers.Ethernet
ip4 layers.IPv4
ip6 layers.IPv6
tcp layers.TCP
udp layers.UDP
icmp4 layers.ICMPv4
icmp6 layers.ICMPv6
)
parser := gopacket.NewDecodingLayerParser(
layers.LayerTypeEthernet,
&eth, &ip4, &ip6, &tcp, &udp, &icmp4, &icmp6,
)
decoded := []gopacket.LayerType{}
done := make(chan struct{})
go func() {
defer close(done)
for {
data, ci, err := handle.ZeroCopyReadPacketData()
if err == io.EOF {
log.Print("read: EOF")
return
}
if err != nil {
log.Printf("read: %v", err)
continue
}
log.Println(ci.Timestamp.String())
err = parser.DecodeLayers(data, &decoded)
for _, t := range decoded {
switch t {
case layers.LayerTypeEthernet:
log.Println(" Ethernet", eth.EthernetType, eth.SrcMAC, eth.DstMAC, eth.Length)
case layers.LayerTypeIPv6:
log.Println(" IP6", ip6.Version, ip6.SrcIP, ip6.DstIP, ip6.Length, ip6.TrafficClass)
case layers.LayerTypeIPv4:
log.Println(" IP4", ip4.Version, ip4.SrcIP, ip4.DstIP, ip4.Length, ip4.TTL, ip4.TOS)
case layers.LayerTypeTCP:
log.Println(" TCP", tcp.SrcPort, tcp.DstPort, tcp.Seq, tcp.Ack, tcp.Window)
case layers.LayerTypeUDP:
log.Println(" UDP", udp.SrcPort, udp.DstPort, udp.Length)
case layers.LayerTypeICMPv4:
log.Println(" ICMP4", icmp4.Id, icmp4.Seq)
case layers.LayerTypeICMPv6:
log.Println(" ICMP6")
}
}
log.Println()
}
}()
<-done
}

View File

@@ -58,17 +58,46 @@ func (t tables) Less(i, j int) bool { return t[i].Rank > t[j].Rank }
// MakeDetailedNode transforms a renderable node to a detailed node. It uses
// aggregate metadata, plus the set of origin node IDs, to produce tables.
func MakeDetailedNode(r report.Report, n RenderableNode) DetailedNode {
sec := r.Window.Seconds()
rate := func(u *uint64) (float64, bool) {
if u == nil {
return 0.0, false
}
if sec <= 0 {
return 0.0, true
}
return float64(*u) / sec, true
}
shortenByteRate := func(rate float64) (major, minor string) {
switch {
case rate > 1024*1024:
return fmt.Sprintf("%.2f", rate/1024/1024), "MBps"
case rate > 1024:
return fmt.Sprintf("%.1f", rate/1024), "KBps"
default:
return fmt.Sprintf("%.0f", rate), "Bps"
}
}
tables := tables{}
{
rows := []Row{}
if val, ok := n.AggregateMetadata[KeyMaxConnCountTCP]; ok {
rows = append(rows, Row{"TCP connections", strconv.FormatInt(int64(val), 10), ""})
if n.EdgeMetadata.MaxConnCountTCP != nil {
rows = append(rows, Row{"TCP connections", strconv.FormatUint(*n.EdgeMetadata.MaxConnCountTCP, 10), ""})
}
if val, ok := n.AggregateMetadata[KeyBytesIngress]; ok {
rows = append(rows, Row{"Bytes ingress", strconv.FormatInt(int64(val), 10), ""})
if rate, ok := rate(n.EdgeMetadata.EgressPacketCount); ok {
rows = append(rows, Row{"Egress packet rate", fmt.Sprintf("%.0f", rate), "packets/sec"})
}
if val, ok := n.AggregateMetadata[KeyBytesEgress]; ok {
rows = append(rows, Row{"Bytes egress", strconv.FormatInt(int64(val), 10), ""})
if rate, ok := rate(n.EdgeMetadata.IngressPacketCount); ok {
rows = append(rows, Row{"Ingress packet rate", fmt.Sprintf("%.0f", rate), "packets/sec"})
}
if rate, ok := rate(n.EdgeMetadata.EgressByteCount); ok {
s, unit := shortenByteRate(rate)
rows = append(rows, Row{"Egress byte rate", s, unit})
}
if rate, ok := rate(n.EdgeMetadata.IngressByteCount); ok {
s, unit := shortenByteRate(rate)
rows = append(rows, Row{"Ingress byte rate", s, unit})
}
if len(rows) > 0 {
tables = append(tables, Table{"Connections", true, connectionsRank, rows})

View File

@@ -66,8 +66,8 @@ func TestMakeDetailedNode(t *testing.T) {
Numeric: true,
Rank: 100,
Rows: []render.Row{
{"Bytes ingress", "150", ""},
{"Bytes egress", "1500", ""},
{"Egress packet rate", "75", "packets/sec"},
{"Egress byte rate", "750", "Bps"},
},
},
{

View File

@@ -14,25 +14,25 @@ var (
unknownPseudoNode1ID = render.MakePseudoNodeID("10.10.10.10", test.ServerIP, "80")
unknownPseudoNode2ID = render.MakePseudoNodeID("10.10.10.11", test.ServerIP, "80")
unknownPseudoNode1 = render.RenderableNode{
ID: unknownPseudoNode1ID,
LabelMajor: "10.10.10.10",
Pseudo: true,
NodeMetadata: report.MakeNodeMetadata(),
AggregateMetadata: render.AggregateMetadata{},
ID: unknownPseudoNode1ID,
LabelMajor: "10.10.10.10",
Pseudo: true,
NodeMetadata: report.MakeNodeMetadata(),
EdgeMetadata: report.EdgeMetadata{},
}
unknownPseudoNode2 = render.RenderableNode{
ID: unknownPseudoNode2ID,
LabelMajor: "10.10.10.11",
Pseudo: true,
NodeMetadata: report.MakeNodeMetadata(),
AggregateMetadata: render.AggregateMetadata{},
ID: unknownPseudoNode2ID,
LabelMajor: "10.10.10.11",
Pseudo: true,
NodeMetadata: report.MakeNodeMetadata(),
EdgeMetadata: report.EdgeMetadata{},
}
theInternetNode = render.RenderableNode{
ID: render.TheInternetID,
LabelMajor: render.TheInternetMajor,
Pseudo: true,
NodeMetadata: report.MakeNodeMetadata(),
AggregateMetadata: render.AggregateMetadata{},
ID: render.TheInternetID,
LabelMajor: render.TheInternetMajor,
Pseudo: true,
NodeMetadata: report.MakeNodeMetadata(),
EdgeMetadata: report.EdgeMetadata{},
}
ClientProcess1ID = render.MakeProcessID(test.ClientHostID, test.Client1PID)
@@ -54,9 +54,9 @@ var (
test.ClientHostNodeID,
),
NodeMetadata: report.MakeNodeMetadata(),
AggregateMetadata: render.AggregateMetadata{
render.KeyBytesIngress: 100,
render.KeyBytesEgress: 10,
EdgeMetadata: report.EdgeMetadata{
EgressPacketCount: newu64(10),
EgressByteCount: newu64(100),
},
},
ClientProcess2ID: {
@@ -72,9 +72,9 @@ var (
test.ClientHostNodeID,
),
NodeMetadata: report.MakeNodeMetadata(),
AggregateMetadata: render.AggregateMetadata{
render.KeyBytesIngress: 200,
render.KeyBytesEgress: 20,
EdgeMetadata: report.EdgeMetadata{
EgressPacketCount: newu64(20),
EgressByteCount: newu64(200),
},
},
ServerProcessID: {
@@ -96,9 +96,9 @@ var (
test.ServerHostNodeID,
),
NodeMetadata: report.MakeNodeMetadata(),
AggregateMetadata: render.AggregateMetadata{
render.KeyBytesIngress: 150,
render.KeyBytesEgress: 1500,
EdgeMetadata: report.EdgeMetadata{
EgressPacketCount: newu64(150),
EgressByteCount: newu64(1500),
},
},
nonContainerProcessID: {
@@ -112,8 +112,8 @@ var (
test.NonContainerProcessNodeID,
test.ServerHostNodeID,
),
NodeMetadata: report.MakeNodeMetadata(),
AggregateMetadata: render.AggregateMetadata{},
NodeMetadata: report.MakeNodeMetadata(),
EdgeMetadata: report.EdgeMetadata{},
},
unknownPseudoNode1ID: unknownPseudoNode1,
unknownPseudoNode2ID: unknownPseudoNode2,
@@ -136,9 +136,9 @@ var (
test.ClientHostNodeID,
),
NodeMetadata: report.MakeNodeMetadata(),
AggregateMetadata: render.AggregateMetadata{
render.KeyBytesIngress: 300,
render.KeyBytesEgress: 30,
EdgeMetadata: report.EdgeMetadata{
EgressPacketCount: newu64(30),
EgressByteCount: newu64(300),
},
},
"apache": {
@@ -159,9 +159,9 @@ var (
test.ServerHostNodeID,
),
NodeMetadata: report.MakeNodeMetadata(),
AggregateMetadata: render.AggregateMetadata{
render.KeyBytesIngress: 150,
render.KeyBytesEgress: 1500,
EdgeMetadata: report.EdgeMetadata{
EgressPacketCount: newu64(150),
EgressByteCount: newu64(1500),
},
},
"bash": {
@@ -174,8 +174,8 @@ var (
test.NonContainerProcessNodeID,
test.ServerHostNodeID,
),
NodeMetadata: report.MakeNodeMetadata(),
AggregateMetadata: render.AggregateMetadata{},
NodeMetadata: report.MakeNodeMetadata(),
EdgeMetadata: report.EdgeMetadata{},
},
unknownPseudoNode1ID: unknownPseudoNode1,
unknownPseudoNode2ID: unknownPseudoNode2,
@@ -199,9 +199,9 @@ var (
test.ClientHostNodeID,
),
NodeMetadata: report.MakeNodeMetadata(),
AggregateMetadata: render.AggregateMetadata{
render.KeyBytesIngress: 300,
render.KeyBytesEgress: 30,
EdgeMetadata: report.EdgeMetadata{
EgressPacketCount: newu64(30),
EgressByteCount: newu64(300),
},
},
test.ServerContainerID: {
@@ -218,9 +218,9 @@ var (
test.ServerHostNodeID,
),
NodeMetadata: report.MakeNodeMetadata(),
AggregateMetadata: render.AggregateMetadata{
render.KeyBytesIngress: 150,
render.KeyBytesEgress: 1500,
EdgeMetadata: report.EdgeMetadata{
EgressPacketCount: newu64(150),
EgressByteCount: newu64(1500),
},
},
uncontainedServerID: {
@@ -233,8 +233,8 @@ var (
test.NonContainerProcessNodeID,
test.ServerHostNodeID,
),
NodeMetadata: report.MakeNodeMetadata(),
AggregateMetadata: render.AggregateMetadata{},
NodeMetadata: report.MakeNodeMetadata(),
EdgeMetadata: report.EdgeMetadata{},
},
render.TheInternetID: theInternetNode,
}
@@ -257,9 +257,9 @@ var (
test.ClientHostNodeID,
),
NodeMetadata: report.MakeNodeMetadata(),
AggregateMetadata: render.AggregateMetadata{
render.KeyBytesIngress: 300,
render.KeyBytesEgress: 30,
EdgeMetadata: report.EdgeMetadata{
EgressPacketCount: newu64(30),
EgressByteCount: newu64(300),
},
},
test.ServerContainerImageName: {
@@ -276,9 +276,9 @@ var (
test.ServerProcessNodeID,
test.ServerHostNodeID),
NodeMetadata: report.MakeNodeMetadata(),
AggregateMetadata: render.AggregateMetadata{
render.KeyBytesIngress: 150,
render.KeyBytesEgress: 1500,
EdgeMetadata: report.EdgeMetadata{
EgressPacketCount: newu64(150),
EgressByteCount: newu64(1500),
},
},
uncontainedServerID: {
@@ -291,8 +291,8 @@ var (
test.NonContainerProcessNodeID,
test.ServerHostNodeID,
),
NodeMetadata: report.MakeNodeMetadata(),
AggregateMetadata: render.AggregateMetadata{},
NodeMetadata: report.MakeNodeMetadata(),
EdgeMetadata: report.EdgeMetadata{},
},
render.TheInternetID: theInternetNode,
}
@@ -315,8 +315,8 @@ var (
test.ServerAddressNodeID,
),
NodeMetadata: report.MakeNodeMetadata(),
AggregateMetadata: render.AggregateMetadata{
render.KeyMaxConnCountTCP: 3,
EdgeMetadata: report.EdgeMetadata{
MaxConnCountTCP: newu64(3),
},
},
ClientHostRenderedID: {
@@ -331,24 +331,26 @@ var (
test.ClientAddressNodeID,
),
NodeMetadata: report.MakeNodeMetadata(),
AggregateMetadata: render.AggregateMetadata{
render.KeyMaxConnCountTCP: 3,
EdgeMetadata: report.EdgeMetadata{
MaxConnCountTCP: newu64(3),
},
},
pseudoHostID1: {
ID: pseudoHostID1,
LabelMajor: "10.10.10.10",
Pseudo: true,
NodeMetadata: report.MakeNodeMetadata(),
AggregateMetadata: render.AggregateMetadata{},
ID: pseudoHostID1,
LabelMajor: "10.10.10.10",
Pseudo: true,
NodeMetadata: report.MakeNodeMetadata(),
EdgeMetadata: report.EdgeMetadata{},
},
pseudoHostID2: {
ID: pseudoHostID2,
LabelMajor: "10.10.10.11",
Pseudo: true,
NodeMetadata: report.MakeNodeMetadata(),
AggregateMetadata: render.AggregateMetadata{},
ID: pseudoHostID2,
LabelMajor: "10.10.10.11",
Pseudo: true,
NodeMetadata: report.MakeNodeMetadata(),
EdgeMetadata: report.EdgeMetadata{},
},
render.TheInternetID: theInternetNode,
}
)
func newu64(value uint64) *uint64 { return &value }

View File

@@ -1,46 +0,0 @@
package render
import (
"github.com/weaveworks/scope/report"
)
// AggregateMetadata is a composable version of an EdgeMetadata. It's used
// when we want to merge nodes/edges for any reason.
//
// Even though we base it on EdgeMetadata, we can apply it to nodes, by
// summing up (merging) all of the {ingress, egress} metadatas of the
// {incoming, outgoing} edges to the node.
type AggregateMetadata map[string]int
const (
// KeyBytesIngress is the aggregate metadata key for the total count of
// ingress bytes.
KeyBytesIngress = "ingress_bytes"
// KeyBytesEgress is the aggregate metadata key for the total count of
// egress bytes.
KeyBytesEgress = "egress_bytes"
// KeyMaxConnCountTCP is the aggregate metadata key for the maximum number
// of simultaneous observed TCP connections in the window.
KeyMaxConnCountTCP = "max_conn_count_tcp"
)
// AggregateMetadataOf calculates a AggregateMetadata from an EdgeMetadata.
func AggregateMetadataOf(md report.EdgeMetadata) AggregateMetadata {
m := AggregateMetadata{}
if md.WithBytes {
m[KeyBytesIngress] = int(md.BytesIngress)
m[KeyBytesEgress] = int(md.BytesEgress)
}
if md.WithConnCountTCP {
// The maximum is the maximum. No need to calculate anything.
m[KeyMaxConnCountTCP] = int(md.MaxConnCountTCP)
}
return m
}
// Merge adds the fields from AggregateMetadata to r. r must be initialized.
func (r *AggregateMetadata) Merge(other AggregateMetadata) {
for k, v := range other {
(*r)[k] += v
}
}

View File

@@ -1,87 +0,0 @@
package render_test
import (
"reflect"
"testing"
"github.com/weaveworks/scope/render"
"github.com/weaveworks/scope/report"
)
func TestAggregateMetadata(t *testing.T) {
for from, want := range map[report.EdgeMetadata]render.AggregateMetadata{
// Simple connection count
report.EdgeMetadata{
WithConnCountTCP: true,
MaxConnCountTCP: 400,
}: {
render.KeyMaxConnCountTCP: 400,
},
// Connection count rounding
report.EdgeMetadata{
WithConnCountTCP: true,
MaxConnCountTCP: 4,
}: {
render.KeyMaxConnCountTCP: 4,
},
// 0 connections.
report.EdgeMetadata{
WithConnCountTCP: true,
MaxConnCountTCP: 0,
}: {
render.KeyMaxConnCountTCP: 0,
},
// Egress
report.EdgeMetadata{
WithBytes: true,
BytesEgress: 24,
BytesIngress: 0,
}: {
render.KeyBytesEgress: 24,
render.KeyBytesIngress: 0,
},
// Ingress
report.EdgeMetadata{
WithBytes: true,
BytesEgress: 0,
BytesIngress: 1200,
}: {
render.KeyBytesEgress: 0,
render.KeyBytesIngress: 1200,
},
// Nothing there.
report.EdgeMetadata{}: {},
} {
if have := render.AggregateMetadataOf(from); !reflect.DeepEqual(have, want) {
t.Errorf("have: %#v, want %#v", have, want)
}
}
}
func TestAggregateMetadataSum(t *testing.T) {
var (
this = render.AggregateMetadata{
"ingress_bytes": 3,
}
other = render.AggregateMetadata{
"ingress_bytes": 333,
"egress_bytes": 3,
}
want = render.AggregateMetadata{
"ingress_bytes": 336,
"egress_bytes": 3,
}
)
this.Merge(other)
if have := this; !reflect.DeepEqual(have, want) {
t.Errorf("have: %#v, want %#v", have, want)
}
}

View File

@@ -9,7 +9,7 @@ import (
// Renderer is something that can render a report to a set of RenderableNodes.
type Renderer interface {
Render(report.Report) RenderableNodes
AggregateMetadata(rpt report.Report, localID, remoteID string) AggregateMetadata
EdgeMetadata(rpt report.Report, localID, remoteID string) report.EdgeMetadata
}
// Reduce renderer is a Renderer which merges together the output of several
@@ -50,11 +50,11 @@ func (r Reduce) Render(rpt report.Report) RenderableNodes {
return result
}
// AggregateMetadata produces an AggregateMetadata for a given edge.
func (r Reduce) AggregateMetadata(rpt report.Report, localID, remoteID string) AggregateMetadata {
metadata := AggregateMetadata{}
// EdgeMetadata produces an EdgeMetadata for a given edge.
func (r Reduce) EdgeMetadata(rpt report.Report, localID, remoteID string) report.EdgeMetadata {
metadata := report.EdgeMetadata{}
for _, renderer := range r {
metadata.Merge(renderer.AggregateMetadata(rpt, localID, remoteID))
metadata.Merge(renderer.EdgeMetadata(rpt, localID, remoteID))
}
return metadata
}
@@ -107,11 +107,11 @@ func (m Map) render(rpt report.Report) (RenderableNodes, map[string]string) {
return output, mapped
}
// AggregateMetadata gives the metadata of an edge from the perspective of the
// EdgeMetadata gives the metadata of an edge from the perspective of the
// srcRenderableID. Since an edgeID can have multiple edges on the address
// level, it uses the supplied mapping function to translate address IDs to
// renderable node (mapped) IDs.
func (m Map) AggregateMetadata(rpt report.Report, srcRenderableID, dstRenderableID string) AggregateMetadata {
func (m Map) EdgeMetadata(rpt report.Report, srcRenderableID, dstRenderableID string) report.EdgeMetadata {
// First we need to map the ids in this layer into the ids in the underlying layer
_, mapped := m.render(rpt) // this maps from old -> new
inverted := map[string][]string{} // this maps from new -> old(s)
@@ -130,9 +130,9 @@ func (m Map) AggregateMetadata(rpt report.Report, srcRenderableID, dstRenderable
}
// Now recurse for each old edge
output := AggregateMetadata{}
output := report.EdgeMetadata{}
for _, edge := range oldEdges {
metadata := m.Renderer.AggregateMetadata(rpt, edge.src, edge.dst)
metadata := m.Renderer.EdgeMetadata(rpt, edge.src, edge.dst)
output.Merge(metadata)
}
return output
@@ -205,8 +205,9 @@ func (m LeafMap) Render(rpt report.Report) RenderableNodes {
srcRenderableNode.Origins = srcRenderableNode.Origins.Add(srcNodeID)
edgeID := report.MakeEdgeID(srcNodeID, dstNodeID)
if md, ok := t.EdgeMetadatas[edgeID]; ok {
srcRenderableNode.AggregateMetadata.Merge(AggregateMetadataOf(md))
srcRenderableNode.EdgeMetadata.Merge(md)
}
}
nodes[srcRenderableID] = srcRenderableNode
@@ -215,11 +216,11 @@ func (m LeafMap) Render(rpt report.Report) RenderableNodes {
return nodes
}
// AggregateMetadata gives the metadata of an edge from the perspective of the
// EdgeMetadata gives the metadata of an edge from the perspective of the
// srcRenderableID. Since an edgeID can have multiple edges on the address
// level, it uses the supplied mapping function to translate address IDs to
// renderable node (mapped) IDs.
func (m LeafMap) AggregateMetadata(rpt report.Report, srcRenderableID, dstRenderableID string) AggregateMetadata {
func (m LeafMap) EdgeMetadata(rpt report.Report, srcRenderableID, dstRenderableID string) report.EdgeMetadata {
t := m.Selector(rpt)
metadata := report.EdgeMetadata{}
for edgeID, edgeMeta := range t.EdgeMetadatas {
@@ -240,7 +241,7 @@ func (m LeafMap) AggregateMetadata(rpt report.Report, srcRenderableID, dstRender
metadata.Flatten(edgeMeta)
}
}
return AggregateMetadataOf(metadata)
return metadata
}
// Render produces a set of RenderableNodes given a Report

View File

@@ -10,14 +10,14 @@ import (
type mockRenderer struct {
render.RenderableNodes
aggregateMetadata render.AggregateMetadata
edgeMetadata report.EdgeMetadata
}
func (m mockRenderer) Render(rpt report.Report) render.RenderableNodes {
return m.RenderableNodes
}
func (m mockRenderer) AggregateMetadata(rpt report.Report, localID, remoteID string) render.AggregateMetadata {
return m.aggregateMetadata
func (m mockRenderer) EdgeMetadata(rpt report.Report, localID, remoteID string) report.EdgeMetadata {
return m.edgeMetadata
}
func TestReduceRender(t *testing.T) {
@@ -36,12 +36,12 @@ func TestReduceRender(t *testing.T) {
func TestReduceEdge(t *testing.T) {
renderer := render.Reduce([]render.Renderer{
mockRenderer{aggregateMetadata: render.AggregateMetadata{"foo": 1}},
mockRenderer{aggregateMetadata: render.AggregateMetadata{"bar": 2}},
mockRenderer{edgeMetadata: report.EdgeMetadata{EgressPacketCount: newu64(1)}},
mockRenderer{edgeMetadata: report.EdgeMetadata{EgressPacketCount: newu64(2)}},
})
want := render.AggregateMetadata{"foo": 1, "bar": 2}
have := renderer.AggregateMetadata(report.MakeReport(), "", "")
want := report.EdgeMetadata{EgressPacketCount: newu64(3)}
have := renderer.EdgeMetadata(report.MakeReport(), "", "")
if !reflect.DeepEqual(want, have) {
t.Errorf("want %+v, have %+v", want, have)
@@ -118,8 +118,8 @@ func TestMapEdge(t *testing.T) {
">bar": report.MakeIDList("foo"),
},
EdgeMetadatas: report.EdgeMetadatas{
"foo|bar": report.EdgeMetadata{WithBytes: true, BytesIngress: 1, BytesEgress: 2},
"bar|foo": report.EdgeMetadata{WithBytes: true, BytesIngress: 3, BytesEgress: 4},
"foo|bar": report.EdgeMetadata{EgressPacketCount: newu64(1), EgressByteCount: newu64(2)},
"bar|foo": report.EdgeMetadata{EgressPacketCount: newu64(3), EgressByteCount: newu64(4)},
},
}
}
@@ -139,12 +139,10 @@ func TestMapEdge(t *testing.T) {
},
}
want := render.AggregateMetadata{
render.KeyBytesIngress: 1,
render.KeyBytesEgress: 2,
}
have := mapper.AggregateMetadata(report.MakeReport(), "_foo", "_bar")
if !reflect.DeepEqual(want, have) {
if want, have := (report.EdgeMetadata{
EgressPacketCount: newu64(1),
EgressByteCount: newu64(2),
}), mapper.EdgeMetadata(report.MakeReport(), "_foo", "_bar"); !reflect.DeepEqual(want, have) {
t.Errorf("want %+v, have %+v", want, have)
}
}
@@ -166,3 +164,5 @@ func TestFilterRender(t *testing.T) {
t.Errorf("want %+v, have %+v", want, have)
}
}
func newu64(value uint64) *uint64 { return &value }

View File

@@ -16,7 +16,7 @@ type RenderableNode struct {
Adjacency report.IDList `json:"adjacency,omitempty"` // Node IDs (in the same topology domain)
Origins report.IDList `json:"origins,omitempty"` // Core node IDs that contributed information
AggregateMetadata `json:"metadata"` // Numeric sums
report.EdgeMetadata `json:"metadata"` // Numeric sums
report.NodeMetadata `json:"-"` // merged NodeMetadata of the nodes used to build this
}
@@ -56,57 +56,57 @@ func (rn *RenderableNode) Merge(other RenderableNode) {
rn.Adjacency = rn.Adjacency.Merge(other.Adjacency)
rn.Origins = rn.Origins.Merge(other.Origins)
rn.AggregateMetadata.Merge(other.AggregateMetadata)
rn.EdgeMetadata.Merge(other.EdgeMetadata)
rn.NodeMetadata.Merge(other.NodeMetadata)
}
// NewRenderableNode makes a new RenderableNode
func NewRenderableNode(id, major, minor, rank string, nmd report.NodeMetadata) RenderableNode {
return RenderableNode{
ID: id,
LabelMajor: major,
LabelMinor: minor,
Rank: rank,
Pseudo: false,
AggregateMetadata: AggregateMetadata{},
NodeMetadata: nmd.Copy(),
ID: id,
LabelMajor: major,
LabelMinor: minor,
Rank: rank,
Pseudo: false,
EdgeMetadata: report.EdgeMetadata{},
NodeMetadata: nmd.Copy(),
}
}
func newDerivedNode(id string, node RenderableNode) RenderableNode {
return RenderableNode{
ID: id,
LabelMajor: "",
LabelMinor: "",
Rank: "",
Pseudo: node.Pseudo,
AggregateMetadata: node.AggregateMetadata,
Origins: node.Origins,
NodeMetadata: report.MakeNodeMetadata(),
ID: id,
LabelMajor: "",
LabelMinor: "",
Rank: "",
Pseudo: node.Pseudo,
EdgeMetadata: node.EdgeMetadata,
Origins: node.Origins,
NodeMetadata: report.MakeNodeMetadata(),
}
}
func newPseudoNode(id, major, minor string) RenderableNode {
return RenderableNode{
ID: id,
LabelMajor: major,
LabelMinor: minor,
Rank: "",
Pseudo: true,
AggregateMetadata: AggregateMetadata{},
NodeMetadata: report.MakeNodeMetadata(),
ID: id,
LabelMajor: major,
LabelMinor: minor,
Rank: "",
Pseudo: true,
EdgeMetadata: report.EdgeMetadata{},
NodeMetadata: report.MakeNodeMetadata(),
}
}
func newDerivedPseudoNode(id, major string, node RenderableNode) RenderableNode {
return RenderableNode{
ID: id,
LabelMajor: major,
LabelMinor: "",
Rank: "",
Pseudo: true,
AggregateMetadata: node.AggregateMetadata,
Origins: node.Origins,
NodeMetadata: report.MakeNodeMetadata(),
ID: id,
LabelMajor: major,
LabelMinor: "",
Rank: "",
Pseudo: true,
EdgeMetadata: node.EdgeMetadata,
Origins: node.Origins,
NodeMetadata: report.MakeNodeMetadata(),
}
}

View File

@@ -23,7 +23,7 @@ func TestProcessRenderer(t *testing.T) {
have := render.ProcessRenderer.Render(test.Report)
have = trimNodeMetadata(have)
if !reflect.DeepEqual(expected.RenderedProcesses, have) {
t.Error("\n" + test.Diff(expected.RenderedProcesses, have))
t.Error(test.Diff(expected.RenderedProcesses, have))
}
}
@@ -31,7 +31,7 @@ func TestProcessNameRenderer(t *testing.T) {
have := render.ProcessNameRenderer.Render(test.Report)
have = trimNodeMetadata(have)
if !reflect.DeepEqual(expected.RenderedProcessNames, have) {
t.Error("\n" + test.Diff(expected.RenderedProcessNames, have))
t.Error(test.Diff(expected.RenderedProcessNames, have))
}
}
@@ -39,7 +39,7 @@ func TestContainerRenderer(t *testing.T) {
have := render.ContainerRenderer.Render(test.Report)
have = trimNodeMetadata(have)
if !reflect.DeepEqual(expected.RenderedContainers, have) {
t.Error("\n" + test.Diff(expected.RenderedContainers, have))
t.Error(test.Diff(expected.RenderedContainers, have))
}
}
@@ -47,7 +47,7 @@ func TestContainerImageRenderer(t *testing.T) {
have := render.ContainerImageRenderer.Render(test.Report)
have = trimNodeMetadata(have)
if !reflect.DeepEqual(expected.RenderedContainerImages, have) {
t.Error("\n" + test.Diff(expected.RenderedContainerImages, have))
t.Error(test.Diff(expected.RenderedContainerImages, have))
}
}
@@ -55,6 +55,6 @@ func TestHostRenderer(t *testing.T) {
have := render.HostRenderer.Render(test.Report)
have = trimNodeMetadata(have)
if !reflect.DeepEqual(expected.RenderedHosts, have) {
t.Error("\n" + test.Diff(expected.RenderedHosts, have))
t.Error(test.Diff(expected.RenderedHosts, have))
}
}

View File

@@ -3,7 +3,8 @@ package report
// Merge functions for all topology datatypes. The general semantics are that
// the receiver is modified, and what's merged in isn't.
// Merge merges another Report into the receiver.
// Merge merges another Report into the receiver. Pass addWindows true if the
// reports represent distinct (non-overlapping) periods of time.
func (r *Report) Merge(other Report) {
r.Endpoint.Merge(other.Endpoint)
r.Address.Merge(other.Address)
@@ -12,6 +13,8 @@ func (r *Report) Merge(other Report) {
r.ContainerImage.Merge(other.ContainerImage)
r.Host.Merge(other.Host)
r.Overlay.Merge(other.Overlay)
r.Sampling.Merge(other.Sampling)
r.Window += other.Window
}
// Merge merges another Topology into the receiver.
@@ -62,31 +65,49 @@ func (e *EdgeMetadatas) Merge(other EdgeMetadatas) {
// Merge merges another EdgeMetadata into the receiver. The two edge metadatas
// should represent the same edge on different times.
func (m *EdgeMetadata) Merge(other EdgeMetadata) {
if other.WithBytes {
m.WithBytes = true
m.BytesIngress += other.BytesIngress
m.BytesEgress += other.BytesEgress
}
if other.WithConnCountTCP {
m.WithConnCountTCP = true
if other.MaxConnCountTCP > m.MaxConnCountTCP {
m.MaxConnCountTCP = other.MaxConnCountTCP
}
}
m.EgressPacketCount = merge(m.EgressPacketCount, other.EgressPacketCount, sum)
m.IngressPacketCount = merge(m.IngressPacketCount, other.IngressPacketCount, sum)
m.EgressByteCount = merge(m.EgressByteCount, other.EgressByteCount, sum)
m.IngressByteCount = merge(m.IngressByteCount, other.IngressByteCount, sum)
m.MaxConnCountTCP = merge(m.MaxConnCountTCP, other.MaxConnCountTCP, max)
}
// Flatten sums two EdgeMetadatas. Their windows should be the same duration;
// they should represent different edges at the same time.
func (m *EdgeMetadata) Flatten(other EdgeMetadata) {
if other.WithBytes {
m.WithBytes = true
m.BytesIngress += other.BytesIngress
m.BytesEgress += other.BytesEgress
}
if other.WithConnCountTCP {
m.WithConnCountTCP = true
// Note: summing of two maximums doesn't always give the true maximum.
// But it's our Best Effort effort.
m.MaxConnCountTCP += other.MaxConnCountTCP
}
m.EgressPacketCount = merge(m.EgressPacketCount, other.EgressPacketCount, sum)
m.IngressPacketCount = merge(m.IngressPacketCount, other.IngressPacketCount, sum)
m.EgressByteCount = merge(m.EgressByteCount, other.EgressByteCount, sum)
m.IngressByteCount = merge(m.IngressByteCount, other.IngressByteCount, sum)
// Note that summing of two maximums doesn't always give us the true
// maximum. But it's a best effort.
m.MaxConnCountTCP = merge(m.MaxConnCountTCP, other.MaxConnCountTCP, sum)
}
// Merge combines two sampling structures via simple addition.
func (s *Sampling) Merge(other Sampling) {
s.Count += other.Count
s.Total += other.Total
}
func merge(dst, src *uint64, op func(uint64, uint64) uint64) *uint64 {
if src == nil {
return dst
}
if dst == nil {
dst = new(uint64)
}
(*dst) = op(*dst, *src)
return dst
}
func sum(dst, src uint64) uint64 {
return dst + src
}
func max(dst, src uint64) uint64 {
if dst > src {
return dst
}
return src
}

View File

@@ -112,102 +112,82 @@ func TestMergeEdgeMetadatas(t *testing.T) {
a: report.EdgeMetadatas{},
b: report.EdgeMetadatas{
"hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{
WithBytes: true,
BytesEgress: 12,
BytesIngress: 0,
WithConnCountTCP: true,
MaxConnCountTCP: 2,
EgressPacketCount: newu64(1),
MaxConnCountTCP: newu64(2),
},
},
want: report.EdgeMetadatas{
"hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{
WithBytes: true,
BytesEgress: 12,
BytesIngress: 0,
WithConnCountTCP: true,
MaxConnCountTCP: 2,
EgressPacketCount: newu64(1),
MaxConnCountTCP: newu64(2),
},
},
},
"Empty b": {
a: report.EdgeMetadatas{
"hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{
WithBytes: true,
BytesEgress: 12,
BytesIngress: 0,
EgressPacketCount: newu64(12),
EgressByteCount: newu64(999),
},
},
b: report.EdgeMetadatas{},
want: report.EdgeMetadatas{
"hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{
WithBytes: true,
BytesEgress: 12,
BytesIngress: 0,
EgressPacketCount: newu64(12),
EgressByteCount: newu64(999),
},
},
},
"Host merge": {
a: report.EdgeMetadatas{
"hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{
WithBytes: true,
BytesEgress: 12,
BytesIngress: 0,
WithConnCountTCP: true,
MaxConnCountTCP: 4,
EgressPacketCount: newu64(12),
EgressByteCount: newu64(500),
MaxConnCountTCP: newu64(4),
},
},
b: report.EdgeMetadatas{
"hostQ|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{
WithBytes: true,
BytesEgress: 1,
BytesIngress: 2,
WithConnCountTCP: true,
MaxConnCountTCP: 6,
EgressPacketCount: newu64(1),
EgressByteCount: newu64(2),
MaxConnCountTCP: newu64(6),
},
},
want: report.EdgeMetadatas{
"hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{
WithBytes: true,
BytesEgress: 12,
BytesIngress: 0,
WithConnCountTCP: true,
MaxConnCountTCP: 4,
EgressPacketCount: newu64(12),
EgressByteCount: newu64(500),
MaxConnCountTCP: newu64(4),
},
"hostQ|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{
WithBytes: true,
BytesEgress: 1,
BytesIngress: 2,
WithConnCountTCP: true,
MaxConnCountTCP: 6,
EgressPacketCount: newu64(1),
EgressByteCount: newu64(2),
MaxConnCountTCP: newu64(6),
},
},
},
"Edge merge": {
a: report.EdgeMetadatas{
"hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{
WithBytes: true,
BytesEgress: 12,
BytesIngress: 0,
WithConnCountTCP: true,
MaxConnCountTCP: 7,
EgressPacketCount: newu64(12),
EgressByteCount: newu64(1000),
MaxConnCountTCP: newu64(7),
},
},
b: report.EdgeMetadatas{
"hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{
WithBytes: true,
BytesEgress: 1,
BytesIngress: 2,
WithConnCountTCP: true,
MaxConnCountTCP: 9,
EgressPacketCount: newu64(1),
IngressByteCount: newu64(123),
EgressByteCount: newu64(2),
MaxConnCountTCP: newu64(9),
},
},
want: report.EdgeMetadatas{
"hostA|:192.168.1.1:12345|:192.168.1.2:80": report.EdgeMetadata{
WithBytes: true,
BytesEgress: 13,
BytesIngress: 2,
WithConnCountTCP: true,
MaxConnCountTCP: 9,
EgressPacketCount: newu64(13),
IngressByteCount: newu64(123),
EgressByteCount: newu64(1002),
MaxConnCountTCP: newu64(9),
},
},
},
@@ -319,3 +299,5 @@ func TestMergeNodeMetadatas(t *testing.T) {
}
}
}
func newu64(value uint64) *uint64 { return &value }

View File

@@ -12,12 +12,11 @@ type Interface interface {
Addrs() ([]net.Addr, error)
}
// Variables exposed for testing
// Variables exposed for testing.
// TODO this design is broken, make it consistent with probe networks.
var (
LocalNetworks = Networks{}
InterfaceByNameStub = func(name string) (Interface, error) {
return net.InterfaceByName(name)
}
InterfaceByNameStub = func(name string) (Interface, error) { return net.InterfaceByName(name) }
)
// Contains returns true if IP is in Networks.

View File

@@ -1,5 +1,11 @@
package report
import (
"fmt"
"strings"
"time"
)
// Report is the core data type. It's produced by probes, and consumed and
// stored by apps. It's composed of multiple topologies, each representing
// a different (related, but not equivalent) view of the network.
@@ -36,6 +42,80 @@ type Report struct {
// overlaid on the infrastructure. The information is scraped by polling
// their status endpoints. Edges could be present, but aren't currently.
Overlay Topology
// Sampling data for this report.
Sampling Sampling
// Window is the amount of time that this report purports to represent.
// Windows must be carefully merged. They should only be added when
// reports cover non-overlapping periods of time. By default, we assume
// that's true, and add windows in merge operations. When that's not true,
// such as in the app, we expect the component to overwrite the window
// before serving it to consumers.
Window time.Duration
}
// MakeReport makes a clean report, ready to Merge() other reports into.
func MakeReport() Report {
return Report{
Endpoint: NewTopology(),
Address: NewTopology(),
Process: NewTopology(),
Container: NewTopology(),
ContainerImage: NewTopology(),
Host: NewTopology(),
Overlay: NewTopology(),
Sampling: Sampling{},
Window: 0,
}
}
// Topologies returns a slice of Topologies in this report
func (r Report) Topologies() []Topology {
return []Topology{
r.Endpoint,
r.Address,
r.Process,
r.Container,
r.ContainerImage,
r.Host,
r.Overlay,
}
}
// Validate checks the report for various inconsistencies.
func (r Report) Validate() error {
var errs []string
for _, topology := range r.Topologies() {
if err := topology.Validate(); err != nil {
errs = append(errs, err.Error())
}
}
if r.Sampling.Count > r.Sampling.Total {
errs = append(errs, fmt.Sprintf("sampling count (%d) bigger than total (%d)", r.Sampling.Count, r.Sampling.Total))
}
if len(errs) > 0 {
return fmt.Errorf("%d error(s): %s", len(errs), strings.Join(errs, "; "))
}
return nil
}
// Sampling describes how the packet data sources for this report were
// sampled. It can be used to calculate effective sample rates. We can't
// just put the rate here, because that can't be accurately merged. Counts
// in e.g. edge metadata structures have already been adjusted to
// compensate for the sample rate.
type Sampling struct {
Count uint64 // observed and processed
Total uint64 // observed overall
}
// Rate returns the effective sampling rate.
func (s Sampling) Rate() float64 {
if s.Total <= 0 {
return 1.0
}
return float64(s.Count) / float64(s.Total)
}
const (
@@ -77,32 +157,3 @@ func SelectAddress(r Report) Topology {
func SelectHost(r Report) Topology {
return r.Host
}
// MakeReport makes a clean report, ready to Merge() other reports into.
func MakeReport() Report {
return Report{
Endpoint: NewTopology(),
Address: NewTopology(),
Process: NewTopology(),
Container: NewTopology(),
ContainerImage: NewTopology(),
Host: NewTopology(),
Overlay: NewTopology(),
}
}
// Topologies returns a slice of Topologies in this report
func (r Report) Topologies() []Topology {
return []Topology{r.Endpoint, r.Address, r.Process, r.Container,
r.ContainerImage, r.Host, r.Overlay}
}
// Validate checks the report for various inconsistencies.
func (r Report) Validate() error {
for _, topology := range r.Topologies() {
if err := topology.Validate(); err != nil {
return err
}
}
return nil
}

27
report/report_test.go Normal file
View File

@@ -0,0 +1,27 @@
package report_test
import (
"reflect"
"testing"
"github.com/weaveworks/scope/report"
)
// Make sure we don't add a topology and miss it in the Topologies method.
func TestReportTopologies(t *testing.T) {
var (
reportType = reflect.TypeOf(report.MakeReport())
topologyType = reflect.TypeOf(report.NewTopology())
)
var want int
for i := 0; i < reportType.NumField(); i++ {
if reportType.Field(i).Type == topologyType {
want++
}
}
if have := len(report.MakeReport().Topologies()); want != have {
t.Errorf("want %d, have %d", want, have)
}
}

View File

@@ -28,16 +28,14 @@ type EdgeMetadatas map[string]EdgeMetadata
// IDs.
type NodeMetadatas map[string]NodeMetadata
// EdgeMetadata describes a superset of the metadata that probes can
// conceivably (and usefully) collect about an edge between two nodes in any
// topology.
// EdgeMetadata describes a superset of the metadata that probes can possibly
// collect about a directed edge between two nodes in any topology.
type EdgeMetadata struct {
WithBytes bool `json:"with_bytes,omitempty"`
BytesIngress uint `json:"bytes_ingress,omitempty"` // dst -> src
BytesEgress uint `json:"bytes_egress,omitempty"` // src -> dst
WithConnCountTCP bool `json:"with_conn_count_tcp,omitempty"`
MaxConnCountTCP uint `json:"max_conn_count_tcp,omitempty"`
EgressPacketCount *uint64 `json:"egress_packet_count,omitempty"`
IngressPacketCount *uint64 `json:"ingress_packet_count,omitempty"`
EgressByteCount *uint64 `json:"egress_byte_count,omitempty"` // Transport layer
IngressByteCount *uint64 `json:"ingress_byte_count,omitempty"` // Transport layer
MaxConnCountTCP *uint64 `json:"max_conn_count_tcp,omitempty"`
}
// NodeMetadata describes a superset of the metadata that probes can collect

View File

@@ -1,6 +1,8 @@
package test
import (
"time"
"github.com/weaveworks/scope/probe/docker"
"github.com/weaveworks/scope/probe/endpoint"
"github.com/weaveworks/scope/probe/process"
@@ -108,40 +110,33 @@ var (
},
EdgeMetadatas: report.EdgeMetadatas{
report.MakeEdgeID(Client54001NodeID, Server80NodeID): report.EdgeMetadata{
WithBytes: true,
BytesIngress: 100,
BytesEgress: 10,
EgressPacketCount: newu64(10),
EgressByteCount: newu64(100),
},
report.MakeEdgeID(Client54002NodeID, Server80NodeID): report.EdgeMetadata{
WithBytes: true,
BytesIngress: 200,
BytesEgress: 20,
EgressPacketCount: newu64(20),
EgressByteCount: newu64(200),
},
report.MakeEdgeID(Server80NodeID, Client54001NodeID): report.EdgeMetadata{
WithBytes: true,
BytesIngress: 10,
BytesEgress: 100,
EgressPacketCount: newu64(10),
EgressByteCount: newu64(100),
},
report.MakeEdgeID(Server80NodeID, Client54002NodeID): report.EdgeMetadata{
WithBytes: true,
BytesIngress: 20,
BytesEgress: 200,
EgressPacketCount: newu64(20),
EgressByteCount: newu64(200),
},
report.MakeEdgeID(Server80NodeID, UnknownClient1NodeID): report.EdgeMetadata{
WithBytes: true,
BytesIngress: 30,
BytesEgress: 300,
EgressPacketCount: newu64(30),
EgressByteCount: newu64(300),
},
report.MakeEdgeID(Server80NodeID, UnknownClient2NodeID): report.EdgeMetadata{
WithBytes: true,
BytesIngress: 40,
BytesEgress: 400,
EgressPacketCount: newu64(40),
EgressByteCount: newu64(400),
},
report.MakeEdgeID(Server80NodeID, UnknownClient3NodeID): report.EdgeMetadata{
WithBytes: true,
BytesIngress: 50,
BytesEgress: 500,
EgressPacketCount: newu64(50),
EgressByteCount: newu64(500),
},
},
},
@@ -222,12 +217,10 @@ var (
},
EdgeMetadatas: report.EdgeMetadatas{
report.MakeEdgeID(ClientAddressNodeID, ServerAddressNodeID): report.EdgeMetadata{
WithConnCountTCP: true,
MaxConnCountTCP: 3,
MaxConnCountTCP: newu64(3),
},
report.MakeEdgeID(ServerAddressNodeID, ClientAddressNodeID): report.EdgeMetadata{
WithConnCountTCP: true,
MaxConnCountTCP: 3,
MaxConnCountTCP: newu64(3),
},
},
},
@@ -251,6 +244,11 @@ var (
},
EdgeMetadatas: report.EdgeMetadatas{},
},
Sampling: report.Sampling{
Count: 1024,
Total: 4096,
},
Window: 2 * time.Second,
}
)
@@ -259,3 +257,5 @@ func init() {
panic(err)
}
}
func newu64(value uint64) *uint64 { return &value }