diff --git a/app/report_lifo.go b/app/report_lifo.go index c1c580585..31824ef11 100644 --- a/app/report_lifo.go +++ b/app/report_lifo.go @@ -51,9 +51,14 @@ func NewReportLIFO(r reporter, maxAge time.Duration) *ReportLIFO { case req := <-l.requests: // Request for the current report. report := report.MakeReport() + oldest := time.Now() for _, r := range l.reports { + if r.Timestamp.Before(oldest) { + oldest = r.Timestamp + } report.Merge(r.Report) } + report.Window = time.Now().Sub(oldest) req <- report case q := <-l.quit: diff --git a/probe/main.go b/probe/main.go index 29f7626d1..6b4e4f979 100644 --- a/probe/main.go +++ b/probe/main.go @@ -153,6 +153,7 @@ func main() { select { case <-pubTick: publishTicks.WithLabelValues().Add(1) + r.Window = *publishInterval publisher.Publish(r) r = report.MakeReport() @@ -160,7 +161,6 @@ func main() { if err := processCache.Update(); err != nil { log.Printf("error reading processes: %v", err) } - for _, reporter := range reporters { newReport, err := reporter.Report() if err != nil { @@ -168,7 +168,6 @@ func main() { } r.Merge(newReport) } - r = Apply(r, taggers) case <-quit: diff --git a/render/detailed_node.go b/render/detailed_node.go index 29e872bc4..b739eb104 100644 --- a/render/detailed_node.go +++ b/render/detailed_node.go @@ -58,23 +58,46 @@ func (t tables) Less(i, j int) bool { return t[i].Rank > t[j].Rank } // MakeDetailedNode transforms a renderable node to a detailed node. It uses // aggregate metadata, plus the set of origin node IDs, to produce tables. func MakeDetailedNode(r report.Report, n RenderableNode) DetailedNode { + sec := r.Window.Seconds() + rate := func(u *uint64) (float64, bool) { + if u == nil { + return 0.0, false + } + if sec <= 0 { + return 0.0, true + } + return float64(*u) / sec, true + } + shortenByteRate := func(rate float64) (major, minor string) { + switch { + case rate > 1024*1024: + return fmt.Sprintf("%.2f", rate/1024/1024), "MBps" + case rate > 1024: + return fmt.Sprintf("%.1f", rate/1024), "KBps" + default: + return fmt.Sprintf("%.0f", rate), "Bps" + } + } + tables := tables{} { rows := []Row{} if n.EdgeMetadata.MaxConnCountTCP != nil { rows = append(rows, Row{"TCP connections", strconv.FormatUint(*n.EdgeMetadata.MaxConnCountTCP, 10), ""}) } - if n.EdgeMetadata.EgressPacketCount != nil { - rows = append(rows, Row{"Egress packets", strconv.FormatUint(*n.EdgeMetadata.EgressPacketCount, 10), ""}) + if rate, ok := rate(n.EdgeMetadata.EgressPacketCount); ok { + rows = append(rows, Row{"Egress packet rate", fmt.Sprintf("%.0f", rate), "packets/sec"}) } - if n.EdgeMetadata.IngressPacketCount != nil { - rows = append(rows, Row{"Ingress packets", strconv.FormatUint(*n.EdgeMetadata.IngressPacketCount, 10), ""}) + if rate, ok := rate(n.EdgeMetadata.IngressPacketCount); ok { + rows = append(rows, Row{"Ingress packet rate", fmt.Sprintf("%.0f", rate), "packets/sec"}) } - if n.EdgeMetadata.EgressByteCount != nil { - rows = append(rows, Row{"Egress bytes", strconv.FormatUint(*n.EdgeMetadata.EgressByteCount, 10), ""}) // TODO rate + if rate, ok := rate(n.EdgeMetadata.EgressByteCount); ok { + s, unit := shortenByteRate(rate) + rows = append(rows, Row{"Egress byte rate", s, unit}) } - if n.EdgeMetadata.IngressByteCount != nil { - rows = append(rows, Row{"Ingress bytes", strconv.FormatUint(*n.EdgeMetadata.IngressByteCount, 10), ""}) // TODO rate + if rate, ok := rate(n.EdgeMetadata.IngressByteCount); ok { + s, unit := shortenByteRate(rate) + rows = append(rows, Row{"Ingress byte rate", s, unit}) } if len(rows) > 0 { tables = append(tables, Table{"Connections", true, connectionsRank, rows}) diff --git a/render/detailed_node_test.go b/render/detailed_node_test.go index a0b84a85e..7e561efb5 100644 --- a/render/detailed_node_test.go +++ b/render/detailed_node_test.go @@ -66,8 +66,8 @@ func TestMakeDetailedNode(t *testing.T) { Numeric: true, Rank: 100, Rows: []render.Row{ - {"Egress packets", "150", ""}, - {"Egress bytes", "1500", ""}, + {"Egress packet rate", "75", "packets/sec"}, + {"Egress byte rate", "750", "Bps"}, }, }, { diff --git a/report/merge.go b/report/merge.go index 9ccf99518..406abe6ce 100644 --- a/report/merge.go +++ b/report/merge.go @@ -14,6 +14,7 @@ func (r *Report) Merge(other Report) { r.Host.Merge(other.Host) r.Overlay.Merge(other.Overlay) r.Sampling.Merge(other.Sampling) + r.Window += other.Window } // Merge merges another Topology into the receiver. diff --git a/report/report.go b/report/report.go index 152760bc2..990d5bab0 100644 --- a/report/report.go +++ b/report/report.go @@ -3,6 +3,7 @@ package report import ( "fmt" "strings" + "time" ) // Report is the core data type. It's produced by probes, and consumed and @@ -44,6 +45,14 @@ type Report struct { // Sampling data for this report. Sampling Sampling + + // Window is the amount of time that this report purports to represent. + // Windows must be carefully merged. They should only be added when + // reports cover non-overlapping periods of time. By default, we assume + // that's true, and add windows in merge operations. When that's not true, + // such as in the app, we expect the component to overwrite the window + // before serving it to consumers. + Window time.Duration } // MakeReport makes a clean report, ready to Merge() other reports into. @@ -57,6 +66,7 @@ func MakeReport() Report { Host: NewTopology(), Overlay: NewTopology(), Sampling: Sampling{}, + Window: 0, } } diff --git a/test/report_fixture.go b/test/report_fixture.go index a8a7d0f51..fe3d1db67 100644 --- a/test/report_fixture.go +++ b/test/report_fixture.go @@ -1,6 +1,8 @@ package test import ( + "time" + "github.com/weaveworks/scope/probe/docker" "github.com/weaveworks/scope/probe/endpoint" "github.com/weaveworks/scope/probe/process" @@ -246,6 +248,7 @@ var ( Count: 1024, Total: 4096, }, + Window: 2 * time.Second, } )