diff --git a/Makefile b/Makefile index 972e76f09..915ff4be0 100644 --- a/Makefile +++ b/Makefile @@ -18,6 +18,7 @@ DOCKER_DISTRIB=docker/docker-$(DOCKER_VERSION).tgz DOCKER_DISTRIB_URL=https://get.docker.com/builds/Linux/x86_64/docker-$(DOCKER_VERSION).tgz RUNSVINIT=vendor/runsvinit/runsvinit RM=--rm +RUN_FLAGS=-ti BUILD_IN_CONTAINER=true all: $(SCOPE_EXPORT) @@ -43,7 +44,7 @@ $(PROBE_EXE): probe/*.go probe/docker/*.go probe/kubernetes/*.go probe/endpoint/ ifeq ($(BUILD_IN_CONTAINER),true) $(APP_EXE) $(PROBE_EXE) $(RUNSVINIT): $(SCOPE_BACKEND_BUILD_UPTODATE) - $(SUDO) docker run $(RM) -v $(shell pwd):/go/src/github.com/weaveworks/scope -e GOARCH -e GOOS \ + $(SUDO) docker run $(RM) $(RUN_FLAGS) -v $(shell pwd):/go/src/github.com/weaveworks/scope -e GOARCH -e GOOS \ $(SCOPE_BACKEND_BUILD_IMAGE) SCOPE_VERSION=$(SCOPE_VERSION) $@ else $(APP_EXE) $(PROBE_EXE): $(SCOPE_BACKEND_BUILD_UPTODATE) @@ -67,22 +68,22 @@ static: client/build/app.js ifeq ($(BUILD_IN_CONTAINER),true) client/build/app.js: $(shell find client/app/scripts -type f) mkdir -p client/build - $(SUDO) docker run $(RM) -v $(shell pwd)/client/app:/home/weave/app \ + $(SUDO) docker run $(RM) $(RUN_FLAGS) -v $(shell pwd)/client/app:/home/weave/app \ -v $(shell pwd)/client/build:/home/weave/build \ $(SCOPE_UI_BUILD_IMAGE) npm run build client-test: $(shell find client/app/scripts -type f) - $(SUDO) docker run $(RM) -v $(shell pwd)/client/app:/home/weave/app \ + $(SUDO) docker run $(RM) $(RUN_FLAGS) -v $(shell pwd)/client/app:/home/weave/app \ -v $(shell pwd)/client/test:/home/weave/test \ $(SCOPE_UI_BUILD_IMAGE) npm test client-lint: - $(SUDO) docker run $(RM) -v $(shell pwd)/client/app:/home/weave/app \ + $(SUDO) docker run $(RM) $(RUN_FLAGS) -v $(shell pwd)/client/app:/home/weave/app \ -v $(shell pwd)/client/test:/home/weave/test \ $(SCOPE_UI_BUILD_IMAGE) npm run lint client-start: - $(SUDO) docker run $(RM) --net=host -v $(shell pwd)/client/app:/home/weave/app \ + $(SUDO) docker run $(RM) $(RUN_FLAGS) --net=host -v $(shell pwd)/client/app:/home/weave/app \ -v $(shell pwd)/client/build:/home/weave/build \ $(SCOPE_UI_BUILD_IMAGE) npm start endif @@ -105,7 +106,7 @@ clean: ifeq ($(BUILD_IN_CONTAINER),true) tests: $(SCOPE_BACKEND_BUILD_UPTODATE) - $(SUDO) docker run $(RM) -v $(shell pwd):/go/src/github.com/weaveworks/scope \ + $(SUDO) docker run $(RM) $(RUN_FLAGS) -v $(shell pwd):/go/src/github.com/weaveworks/scope \ -e GOARCH -e GOOS -e CIRCLECI -e CIRCLE_BUILD_NUM -e CIRCLE_NODE_TOTAL -e CIRCLE_NODE_INDEX -e COVERDIR\ $(SCOPE_BACKEND_BUILD_IMAGE) tests else diff --git a/render/detailed_node.go b/render/detailed_node.go index 50a9aeb41..18367e28c 100644 --- a/render/detailed_node.go +++ b/render/detailed_node.go @@ -13,7 +13,6 @@ import ( ) const ( - mb = 1 << 20 containerImageRank = 4 containerRank = 3 processRank = 2 @@ -43,10 +42,12 @@ type Table struct { // Row is a single entry in a Table dataset. type Row struct { - Key string `json:"key"` // e.g. Ingress - ValueMajor string `json:"value_major"` // e.g. 25 - ValueMinor string `json:"value_minor,omitempty"` // e.g. KB/s - Expandable bool `json:"expandable,omitempty"` // Whether it can be expanded (hidden by default) + Key string `json:"key"` // e.g. Ingress + ValueMajor string `json:"value_major"` // e.g. 25 + ValueMinor string `json:"value_minor,omitempty"` // e.g. KB/s + Expandable bool `json:"expandable,omitempty"` // Whether it can be expanded (hidden by default) + ValueType string `json:"value_type,omitempty"` // e.g. sparkline + Metric *report.Metric `json:"metric,omitempty"` // e.g. sparkline data samples } // ControlInstance contains a control description, and all the info @@ -168,21 +169,21 @@ func connectionsTable(connections []Row, r report.Report, n RenderableNode) (Tab rows := []Row{} if n.EdgeMetadata.MaxConnCountTCP != nil { - rows = append(rows, Row{"TCP connections", strconv.FormatUint(*n.EdgeMetadata.MaxConnCountTCP, 10), "", false}) + rows = append(rows, Row{Key: "TCP connections", ValueMajor: strconv.FormatUint(*n.EdgeMetadata.MaxConnCountTCP, 10)}) } if rate, ok := rate(n.EdgeMetadata.EgressPacketCount); ok { - rows = append(rows, Row{"Egress packet rate", fmt.Sprintf("%.0f", rate), "packets/sec", false}) + rows = append(rows, Row{Key: "Egress packet rate", ValueMajor: fmt.Sprintf("%.0f", rate), ValueMinor: "packets/sec"}) } if rate, ok := rate(n.EdgeMetadata.IngressPacketCount); ok { - rows = append(rows, Row{"Ingress packet rate", fmt.Sprintf("%.0f", rate), "packets/sec", false}) + rows = append(rows, Row{Key: "Ingress packet rate", ValueMajor: fmt.Sprintf("%.0f", rate), ValueMinor: "packets/sec"}) } if rate, ok := rate(n.EdgeMetadata.EgressByteCount); ok { s, unit := shortenByteRate(rate) - rows = append(rows, Row{"Egress byte rate", s, unit, false}) + rows = append(rows, Row{Key: "Egress byte rate", ValueMajor: s, ValueMinor: unit}) } if rate, ok := rate(n.EdgeMetadata.IngressByteCount); ok { s, unit := shortenByteRate(rate) - rows = append(rows, Row{"Ingress byte rate", s, unit, false}) + rows = append(rows, Row{Key: "Ingress byte rate", ValueMajor: s, ValueMinor: unit}) } if len(connections) > 0 { sort.Sort(sortableRows(connections)) @@ -343,6 +344,58 @@ func processOriginTable(nmd report.Node, addHostTag bool, addContainerTag bool) }, len(rows) > 0 || commFound || pidFound } +func sparklineRow(human string, metric report.Metric, format func(report.Metric) (report.Metric, string)) Row { + if format == nil { + format = formatDefault + } + metric, lastStr := format(metric) + return Row{Key: human, ValueMajor: lastStr, Metric: &metric, ValueType: "sparkline"} +} + +func formatDefault(m report.Metric) (report.Metric, string) { + if s := m.LastSample(); s != nil { + return m, fmt.Sprintf("%0.2f", s.Value) + } + return m, "" +} + +func memoryScale(n float64) (string, float64) { + brackets := []struct { + human string + shift uint + }{ + {"bytes", 0}, + {"KB", 10}, + {"MB", 20}, + {"GB", 30}, + {"TB", 40}, + {"PB", 50}, + } + for _, bracket := range brackets { + unit := (1 << bracket.shift) + if n < float64(unit<<10) { + return bracket.human, float64(unit) + } + } + return "PB", float64(1 << 50) +} + +func formatMemory(m report.Metric) (report.Metric, string) { + s := m.LastSample() + if s == nil { + return m, "" + } + human, divisor := memoryScale(s.Value) + return m.Div(divisor), fmt.Sprintf("%0.2f %s", s.Value/divisor, human) +} + +func formatPercent(m report.Metric) (report.Metric, string) { + if s := m.LastSample(); s != nil { + return m, fmt.Sprintf("%0.2f%%", s.Value) + } + return m, "" +} + func containerOriginTable(nmd report.Node, addHostTag bool) (Table, bool) { rows := []Row{} for _, tuple := range []struct{ key, human string }{ @@ -372,17 +425,17 @@ func containerOriginTable(nmd report.Node, addHostTag bool) (Table, bool) { } rows = append(rows, getDockerLabelRows(nmd)...) - if val, ok := nmd.Metadata[docker.MemoryUsage]; ok { - memory, err := strconv.ParseFloat(val, 64) - if err == nil { - memoryStr := fmt.Sprintf("%0.2f", memory/float64(mb)) - rows = append(rows, Row{Key: "Memory Usage (MB):", ValueMajor: memoryStr, ValueMinor: ""}) - } - } if addHostTag { rows = append([]Row{{Key: "Host", ValueMajor: report.ExtractHostID(nmd)}}, rows...) } + if val, ok := nmd.Metrics[docker.MemoryUsage]; ok { + rows = append(rows, sparklineRow("Memory Usage", val, formatMemory)) + } + if val, ok := nmd.Metrics[docker.CPUTotalUsage]; ok { + rows = append(rows, sparklineRow("CPU Usage", val, formatPercent)) + } + var ( title = "Container" name, nameFound = GetRenderableContainerName(nmd) @@ -441,9 +494,31 @@ func getDockerLabelRows(nmd report.Node) []Row { } func hostOriginTable(nmd report.Node) (Table, bool) { + // Ensure that all metrics have the same max + maxLoad := 0.0 + for _, key := range []string{host.Load1, host.Load5, host.Load15} { + if metric, ok := nmd.Metrics[key]; ok { + if metric.Len() == 0 { + continue + } + if metric.Max > maxLoad { + maxLoad = metric.Max + } + } + } + rows := []Row{} for _, tuple := range []struct{ key, human string }{ - {host.Load, "Load"}, + {host.Load1, "Load (1m)"}, + {host.Load5, "Load (5m)"}, + {host.Load15, "Load (15m)"}, + } { + if val, ok := nmd.Metrics[tuple.key]; ok { + val.Max = maxLoad + rows = append(rows, sparklineRow(tuple.human, val, nil)) + } + } + for _, tuple := range []struct{ key, human string }{ {host.OS, "Operating system"}, {host.KernelVersion, "Kernel version"}, {host.Uptime, "Uptime"}, diff --git a/render/detailed_node_test.go b/render/detailed_node_test.go index 4b992c410..0d8f48625 100644 --- a/render/detailed_node_test.go +++ b/render/detailed_node_test.go @@ -26,8 +26,10 @@ func TestOriginTable(t *testing.T) { Numeric: false, Rank: 1, Rows: []render.Row{ - {"Load", "0.01 0.01 0.01", "", false}, - {"Operating system", "Linux", "", false}, + {Key: "Load (1m)", ValueMajor: "0.01", Metric: &fixture.LoadMetric, ValueType: "sparkline"}, + {Key: "Load (5m)", ValueMajor: "0.01", Metric: &fixture.LoadMetric, ValueType: "sparkline"}, + {Key: "Load (15m)", ValueMajor: "0.01", Metric: &fixture.LoadMetric, ValueType: "sparkline"}, + {Key: "Operating system", ValueMajor: "Linux"}, }, }, } { @@ -48,8 +50,8 @@ func TestOriginTable(t *testing.T) { Numeric: false, Rank: 2, Rows: []render.Row{ - {"Host", fixture.ServerHostID, "", false}, - {"Container ID", fixture.ServerContainerID, "", false}, + {Key: "Host", ValueMajor: fixture.ServerHostID}, + {Key: "Container ID", ValueMajor: fixture.ServerContainerID}, }, }, fixture.ServerContainerNodeID: { @@ -57,14 +59,14 @@ func TestOriginTable(t *testing.T) { Numeric: false, Rank: 3, Rows: []render.Row{ - {"Host", fixture.ServerHostID, "", false}, - {"State", "running", "", false}, - {"ID", fixture.ServerContainerID, "", false}, - {"Image ID", fixture.ServerContainerImageID, "", false}, - {fmt.Sprintf(`Label %q`, render.AmazonECSContainerNameLabel), `server`, "", false}, - {`Label "foo1"`, `bar1`, "", false}, - {`Label "foo2"`, `bar2`, "", false}, - {`Label "io.kubernetes.pod.name"`, "ping/pong-b", "", false}, + {Key: "Host", ValueMajor: fixture.ServerHostID}, + {Key: "State", ValueMajor: "running"}, + {Key: "ID", ValueMajor: fixture.ServerContainerID}, + {Key: "Image ID", ValueMajor: fixture.ServerContainerImageID}, + {Key: fmt.Sprintf(`Label %q`, render.AmazonECSContainerNameLabel), ValueMajor: `server`}, + {Key: `Label "foo1"`, ValueMajor: `bar1`}, + {Key: `Label "foo2"`, ValueMajor: `bar2`}, + {Key: `Label "io.kubernetes.pod.name"`, ValueMajor: "ping/pong-b"}, }, }, } { @@ -95,14 +97,26 @@ func TestMakeDetailedHostNode(t *testing.T) { Rank: 1, Rows: []render.Row{ { - Key: "Load", - ValueMajor: "0.01 0.01 0.01", - ValueMinor: "", + Key: "Load (1m)", + ValueMajor: "0.01", + Metric: &fixture.LoadMetric, + ValueType: "sparkline", + }, + { + Key: "Load (5m)", + ValueMajor: "0.01", + Metric: &fixture.LoadMetric, + ValueType: "sparkline", + }, + { + Key: "Load (15m)", + ValueMajor: "0.01", + Metric: &fixture.LoadMetric, + ValueType: "sparkline", }, { Key: "Operating system", ValueMajor: "Linux", - ValueMinor: "", }, }, }, @@ -114,18 +128,15 @@ func TestMakeDetailedHostNode(t *testing.T) { { Key: "TCP connections", ValueMajor: "3", - ValueMinor: "", }, { Key: "Client", ValueMajor: "Server", - ValueMinor: "", Expandable: true, }, { Key: "10.10.10.20", ValueMajor: "192.168.1.1", - ValueMinor: "", Expandable: true, }, }, @@ -152,9 +163,9 @@ func TestMakeDetailedContainerNode(t *testing.T) { Numeric: false, Rank: 4, Rows: []render.Row{ - {"Image ID", fixture.ServerContainerImageID, "", false}, - {`Label "foo1"`, `bar1`, "", false}, - {`Label "foo2"`, `bar2`, "", false}, + {Key: "Image ID", ValueMajor: fixture.ServerContainerImageID}, + {Key: `Label "foo1"`, ValueMajor: `bar1`}, + {Key: `Label "foo2"`, ValueMajor: `bar2`}, }, }, { @@ -162,13 +173,13 @@ func TestMakeDetailedContainerNode(t *testing.T) { Numeric: false, Rank: 3, Rows: []render.Row{ - {"State", "running", "", false}, - {"ID", fixture.ServerContainerID, "", false}, - {"Image ID", fixture.ServerContainerImageID, "", false}, - {fmt.Sprintf(`Label %q`, render.AmazonECSContainerNameLabel), `server`, "", false}, - {`Label "foo1"`, `bar1`, "", false}, - {`Label "foo2"`, `bar2`, "", false}, - {`Label "io.kubernetes.pod.name"`, "ping/pong-b", "", false}, + {Key: "State", ValueMajor: "running"}, + {Key: "ID", ValueMajor: fixture.ServerContainerID}, + {Key: "Image ID", ValueMajor: fixture.ServerContainerImageID}, + {Key: fmt.Sprintf(`Label %q`, render.AmazonECSContainerNameLabel), ValueMajor: `server`}, + {Key: `Label "foo1"`, ValueMajor: `bar1`}, + {Key: `Label "foo2"`, ValueMajor: `bar2`}, + {Key: `Label "io.kubernetes.pod.name"`, ValueMajor: "ping/pong-b"}, }, }, { @@ -182,8 +193,10 @@ func TestMakeDetailedContainerNode(t *testing.T) { Numeric: false, Rank: 1, Rows: []render.Row{ - {"Load", "0.01 0.01 0.01", "", false}, - {"Operating system", "Linux", "", false}, + {Key: "Load (1m)", ValueMajor: "0.01", Metric: &fixture.LoadMetric, ValueType: "sparkline"}, + {Key: "Load (5m)", ValueMajor: "0.01", Metric: &fixture.LoadMetric, ValueType: "sparkline"}, + {Key: "Load (15m)", ValueMajor: "0.01", Metric: &fixture.LoadMetric, ValueType: "sparkline"}, + {Key: "Operating system", ValueMajor: "Linux"}, }, }, { @@ -191,44 +204,38 @@ func TestMakeDetailedContainerNode(t *testing.T) { Numeric: false, Rank: 0, Rows: []render.Row{ - {"Ingress packet rate", "105", "packets/sec", false}, - {"Ingress byte rate", "1.0", "KBps", false}, - {"Client", "Server", "", true}, + {Key: "Ingress packet rate", ValueMajor: "105", ValueMinor: "packets/sec"}, + {Key: "Ingress byte rate", ValueMajor: "1.0", ValueMinor: "KBps"}, + {Key: "Client", ValueMajor: "Server", Expandable: true}, { - fmt.Sprintf("%s:%s", fixture.UnknownClient1IP, fixture.UnknownClient1Port), - fmt.Sprintf("%s:%s", fixture.ServerIP, fixture.ServerPort), - "", - true, + Key: fmt.Sprintf("%s:%s", fixture.UnknownClient1IP, fixture.UnknownClient1Port), + ValueMajor: fmt.Sprintf("%s:%s", fixture.ServerIP, fixture.ServerPort), + Expandable: true, }, { - fmt.Sprintf("%s:%s", fixture.UnknownClient2IP, fixture.UnknownClient2Port), - fmt.Sprintf("%s:%s", fixture.ServerIP, fixture.ServerPort), - "", - true, + Key: fmt.Sprintf("%s:%s", fixture.UnknownClient2IP, fixture.UnknownClient2Port), + ValueMajor: fmt.Sprintf("%s:%s", fixture.ServerIP, fixture.ServerPort), + Expandable: true, }, { - fmt.Sprintf("%s:%s", fixture.UnknownClient3IP, fixture.UnknownClient3Port), - fmt.Sprintf("%s:%s", fixture.ServerIP, fixture.ServerPort), - "", - true, + Key: fmt.Sprintf("%s:%s", fixture.UnknownClient3IP, fixture.UnknownClient3Port), + ValueMajor: fmt.Sprintf("%s:%s", fixture.ServerIP, fixture.ServerPort), + Expandable: true, }, { - fmt.Sprintf("%s:%s", fixture.ClientIP, fixture.ClientPort54001), - fmt.Sprintf("%s:%s", fixture.ServerIP, fixture.ServerPort), - "", - true, + Key: fmt.Sprintf("%s:%s", fixture.ClientIP, fixture.ClientPort54001), + ValueMajor: fmt.Sprintf("%s:%s", fixture.ServerIP, fixture.ServerPort), + Expandable: true, }, { - fmt.Sprintf("%s:%s", fixture.ClientIP, fixture.ClientPort54002), - fmt.Sprintf("%s:%s", fixture.ServerIP, fixture.ServerPort), - "", - true, + Key: fmt.Sprintf("%s:%s", fixture.ClientIP, fixture.ClientPort54002), + ValueMajor: fmt.Sprintf("%s:%s", fixture.ServerIP, fixture.ServerPort), + Expandable: true, }, { - fmt.Sprintf("%s:%s", fixture.RandomClientIP, fixture.RandomClientPort), - fmt.Sprintf("%s:%s", fixture.ServerIP, fixture.ServerPort), - "", - true, + Key: fmt.Sprintf("%s:%s", fixture.RandomClientIP, fixture.RandomClientPort), + ValueMajor: fmt.Sprintf("%s:%s", fixture.ServerIP, fixture.ServerPort), + Expandable: true, }, }, }, diff --git a/report/latest_map.go b/report/latest_map.go index 97748db8c..f04f035a9 100644 --- a/report/latest_map.go +++ b/report/latest_map.go @@ -79,7 +79,7 @@ func (m LatestMap) toIntermediate() map[string]LatestEntry { return intermediate } -func fromIntermediate(in map[string]LatestEntry) LatestMap { +func (m LatestMap) fromIntermediate(in map[string]LatestEntry) LatestMap { out := ps.NewMap() for k, v := range in { out = out.Set(k, v) @@ -105,7 +105,7 @@ func (m *LatestMap) UnmarshalJSON(input []byte) error { if err := json.NewDecoder(bytes.NewBuffer(input)).Decode(&in); err != nil { return err } - *m = fromIntermediate(in) + *m = LatestMap{}.fromIntermediate(in) return nil } @@ -122,6 +122,6 @@ func (m *LatestMap) GobDecode(input []byte) error { if err := gob.NewDecoder(bytes.NewBuffer(input)).Decode(&in); err != nil { return err } - *m = fromIntermediate(in) + *m = LatestMap{}.fromIntermediate(in) return nil } diff --git a/report/metrics.go b/report/metrics.go new file mode 100644 index 000000000..b8ae93c4f --- /dev/null +++ b/report/metrics.go @@ -0,0 +1,278 @@ +package report + +import ( + "bytes" + "encoding/gob" + "encoding/json" + "math" + "time" + + "github.com/mndrix/ps" +) + +// Metrics is a string->metric map. +type Metrics map[string]Metric + +// Merge merges two sets maps into a fresh set, performing set-union merges as +// appropriate. +func (m Metrics) Merge(other Metrics) Metrics { + result := m.Copy() + for k, v := range other { + result[k] = result[k].Merge(v) + } + return result +} + +// Copy returns a value copy of the sets map. +func (m Metrics) Copy() Metrics { + result := Metrics{} + for k, v := range m { + result[k] = v + } + return result +} + +// Metric is a list of timeseries data with some metadata. Clients must use the +// Add method to add values. Metrics are immutable. +type Metric struct { + Samples ps.List + Min, Max float64 + First, Last time.Time +} + +// Sample is a single datapoint of a metric. +type Sample struct { + Timestamp time.Time `json:"date"` + Value float64 `json:"value"` +} + +var nilMetric = Metric{Samples: ps.NewList()} + +// MakeMetric makes a new Metric. +func MakeMetric() Metric { + return nilMetric +} + +// Copy returns a value copy of the Metric. Metric is immutable, so we can skip +// this. +func (m Metric) Copy() Metric { + return m +} + +// WithFirst returns a fresh copy of m, with First set to t. +func (m Metric) WithFirst(t time.Time) Metric { + return Metric{ + Samples: m.Samples, + Max: m.Max, + Min: m.Min, + First: t, + Last: m.Last, + } +} + +// Len returns the number of samples in the metric. +func (m Metric) Len() int { + if m.Samples == nil { + return 0 + } + return m.Samples.Size() +} + +func first(t1, t2 time.Time) time.Time { + if !t1.IsZero() && t1.Before(t2) { + return t1 + } + return t2 +} + +func last(t1, t2 time.Time) time.Time { + if !t1.IsZero() && t1.After(t2) { + return t1 + } + return t2 +} + +// revCons appends acc to the head of curr, where acc is in reverse order. +// acc must never be nil, curr can be. +func revCons(acc, curr ps.List) ps.List { + if curr == nil { + return acc.Reverse() + } + for !acc.IsNil() { + acc, curr = acc.Tail(), curr.Cons(acc.Head()) + } + return curr +} + +// Add returns a new Metric with (t, v) added to its Samples. Add is the only +// valid way to grow a Metric. +func (m Metric) Add(t time.Time, v float64) Metric { + // Find the first element which is before you element, and insert + // your new element in the list. NB we want to dedupe entries with + // equal timestamps. + // This should be O(1) to insert a latest element, and O(n) in general. + curr, acc := m.Samples, ps.NewList() + for { + if curr == nil || curr.IsNil() { + acc = acc.Cons(Sample{t, v}) + break + } + + currSample := curr.Head().(Sample) + if currSample.Timestamp.Equal(t) { + acc, curr = acc.Cons(Sample{t, v}), curr.Tail() + break + } + if currSample.Timestamp.Before(t) { + acc = acc.Cons(Sample{t, v}) + break + } + + acc, curr = acc.Cons(curr.Head()), curr.Tail() + } + acc = revCons(acc, curr) + + return Metric{ + Samples: acc, + Max: math.Max(m.Max, v), + Min: math.Min(m.Min, v), + First: first(m.First, t), + Last: last(m.Last, t), + } +} + +// Merge combines the two Metrics and returns a new result. +func (m Metric) Merge(other Metric) Metric { + // Merge two lists of samples in O(n) + curr1, curr2, acc := m.Samples, other.Samples, ps.NewList() + + for { + if curr1 == nil || curr1.IsNil() { + acc = revCons(acc, curr2) + break + } else if curr2 == nil || curr2.IsNil() { + acc = revCons(acc, curr1) + break + } + + s1 := curr1.Head().(Sample) + s2 := curr2.Head().(Sample) + + if s1.Timestamp.Equal(s2.Timestamp) { + curr1, curr2, acc = curr1.Tail(), curr2.Tail(), acc.Cons(s1) + } else if s1.Timestamp.After(s2.Timestamp) { + curr1, acc = curr1.Tail(), acc.Cons(s1) + } else { + curr2, acc = curr2.Tail(), acc.Cons(s2) + } + } + + return Metric{ + Samples: acc, + Max: math.Max(m.Max, other.Max), + Min: math.Min(m.Min, other.Min), + First: first(m.First, other.First), + Last: last(m.Last, other.Last), + } +} + +// Div returns a new copy of the metric, with each value divided by n. +func (m Metric) Div(n float64) Metric { + curr, acc := m.Samples, ps.NewList() + for curr != nil && !curr.IsNil() { + s := curr.Head().(Sample) + curr, acc = curr.Tail(), acc.Cons(Sample{s.Timestamp, s.Value / n}) + } + acc = acc.Reverse() + return Metric{ + Samples: acc, + Max: m.Max / n, + Min: m.Min / n, + First: m.First, + Last: m.Last, + } +} + +// LastSample returns the last sample in the metric, or nil if there are no +// samples. +func (m Metric) LastSample() *Sample { + if m.Samples == nil || m.Samples.IsNil() { + return nil + } + s := m.Samples.Head().(Sample) + return &s +} + +// WireMetrics is the on-the-wire representation of Metrics. +type WireMetrics struct { + Samples []Sample `json:"samples"` // On the wire, samples are sorted oldest to newest, + Min float64 `json:"min"` // the opposite order to how we store them internally. + Max float64 `json:"max"` + First time.Time `json:"first"` + Last time.Time `json:"last"` +} + +func (m Metric) toIntermediate() WireMetrics { + samples := []Sample{} + if m.Samples != nil { + m.Samples.Reverse().ForEach(func(s interface{}) { + samples = append(samples, s.(Sample)) + }) + } + return WireMetrics{ + Samples: samples, + Max: m.Max, + Min: m.Min, + First: m.First, + Last: m.Last, + } +} + +func (m WireMetrics) fromIntermediate() Metric { + samples := ps.NewList() + for _, s := range m.Samples { + samples = samples.Cons(s) + } + return Metric{ + Samples: samples, + Max: m.Max, + Min: m.Min, + First: m.First, + Last: m.Last, + } +} + +// MarshalJSON implements json.Marshaller +func (m Metric) MarshalJSON() ([]byte, error) { + buf := bytes.Buffer{} + in := m.toIntermediate() + err := json.NewEncoder(&buf).Encode(in) + return buf.Bytes(), err +} + +// UnmarshalJSON implements json.Unmarshaler +func (m *Metric) UnmarshalJSON(input []byte) error { + in := WireMetrics{} + if err := json.NewDecoder(bytes.NewBuffer(input)).Decode(&in); err != nil { + return err + } + *m = in.fromIntermediate() + return nil +} + +// GobEncode implements gob.Marshaller +func (m Metric) GobEncode() ([]byte, error) { + buf := bytes.Buffer{} + err := gob.NewEncoder(&buf).Encode(m.toIntermediate()) + return buf.Bytes(), err +} + +// GobDecode implements gob.Unmarshaller +func (m *Metric) GobDecode(input []byte) error { + in := WireMetrics{} + if err := gob.NewDecoder(bytes.NewBuffer(input)).Decode(&in); err != nil { + return err + } + *m = in.fromIntermediate() + return nil +} diff --git a/report/metrics_test.go b/report/metrics_test.go new file mode 100644 index 000000000..bfa243efb --- /dev/null +++ b/report/metrics_test.go @@ -0,0 +1,283 @@ +package report_test + +import ( + "bytes" + "encoding/gob" + "encoding/json" + "reflect" + "testing" + "time" + + "github.com/weaveworks/scope/report" + "github.com/weaveworks/scope/test" +) + +func TestMetricsMerge(t *testing.T) { + t1 := time.Now() + t2 := time.Now().Add(1 * time.Minute) + t3 := time.Now().Add(2 * time.Minute) + t4 := time.Now().Add(3 * time.Minute) + + metrics1 := report.Metrics{ + "metric1": report.MakeMetric().Add(t1, 0.1).Add(t2, 0.2), + "metric2": report.MakeMetric().Add(t3, 0.3), + } + metrics2 := report.Metrics{ + "metric2": report.MakeMetric().Add(t4, 0.4), + "metric3": report.MakeMetric().Add(t1, 0.1).Add(t2, 0.2), + } + want := report.Metrics{ + "metric1": report.MakeMetric().Add(t1, 0.1).Add(t2, 0.2), + "metric2": report.MakeMetric().Add(t3, 0.3).Add(t4, 0.4), + "metric3": report.MakeMetric().Add(t1, 0.1).Add(t2, 0.2), + } + have := metrics1.Merge(metrics2) + if !reflect.DeepEqual(want, have) { + t.Errorf("diff: %s", test.Diff(want, have)) + } +} + +func TestMetricsCopy(t *testing.T) { + t1 := time.Now() + want := report.Metrics{ + "metric1": report.MakeMetric().Add(t1, 0.1), + } + delete(want.Copy(), "metric1") // Modify a copy + have := want.Copy() // Check the original wasn't affected + if !reflect.DeepEqual(want, have) { + t.Errorf("diff: %s", test.Diff(want, have)) + } +} + +func checkMetric(t *testing.T, metric report.Metric, first, last time.Time, min, max float64) { + if !metric.First.Equal(first) { + t.Errorf("Expected metric.First == %q, but was: %q", first, metric.First) + } + if !metric.Last.Equal(last) { + t.Errorf("Expected metric.Last == %q, but was: %q", last, metric.Last) + } + if metric.Min != min { + t.Errorf("Expected metric.Min == %f, but was: %f", min, metric.Min) + } + if metric.Max != max { + t.Errorf("Expected metric.Max == %f, but was: %f", max, metric.Max) + } +} + +func TestMetricFirstLastMinMax(t *testing.T) { + metric := report.MakeMetric() + var zero time.Time + t1 := time.Now() + t2 := time.Now().Add(1 * time.Minute) + t3 := time.Now().Add(2 * time.Minute) + t4 := time.Now().Add(3 * time.Minute) + other := report.MakeMetric() + other.Max = 5 + other.Min = -5 + other.First = t1.Add(-1 * time.Minute) + other.Last = t4.Add(1 * time.Minute) + + tests := []struct { + f func(report.Metric) report.Metric + first, last time.Time + min, max float64 + }{ + {nil, zero, zero, 0, 0}, + {func(m report.Metric) report.Metric { return m.Add(t2, 2) }, t2, t2, 0, 2}, + {func(m report.Metric) report.Metric { return m.Add(t1, 1) }, t1, t2, 0, 2}, + {func(m report.Metric) report.Metric { return m.Add(t3, -1) }, t1, t3, -1, 2}, + {func(m report.Metric) report.Metric { return m.Add(t4, 3) }, t1, t4, -1, 3}, + {func(m report.Metric) report.Metric { return m.Merge(other) }, t1.Add(-1 * time.Minute), t4.Add(1 * time.Minute), -5, 5}, + } + for _, test := range tests { + oldFirst, oldLast, oldMin, oldMax := metric.First, metric.Last, metric.Min, metric.Max + oldMetric := metric + if test.f != nil { + metric = test.f(metric) + } + + // Check it didn't modify the old one + checkMetric(t, oldMetric, oldFirst, oldLast, oldMin, oldMax) + + // Check the new one is as expected + checkMetric(t, metric, test.first, test.last, test.min, test.max) + } +} + +func TestMetricAdd(t *testing.T) { + s := []report.Sample{ + {time.Now(), 0.1}, + {time.Now().Add(1 * time.Minute), 0.2}, + {time.Now().Add(2 * time.Minute), 0.3}, + } + + have := report.MakeMetric(). + Add(s[0].Timestamp, s[0].Value). + Add(s[2].Timestamp, s[2].Value). // Keeps sorted + Add(s[1].Timestamp, s[1].Value). + Add(s[2].Timestamp, 0.5) // Overwrites duplicate timestamps + + want := report.MakeMetric(). + Add(s[0].Timestamp, s[0].Value). + Add(s[1].Timestamp, s[1].Value). + Add(s[2].Timestamp, 0.5) + + if !reflect.DeepEqual(want, have) { + t.Errorf("diff: %s", test.Diff(want, have)) + } +} + +func TestMetricMerge(t *testing.T) { + t1 := time.Now() + t2 := time.Now().Add(1 * time.Minute) + t3 := time.Now().Add(2 * time.Minute) + t4 := time.Now().Add(3 * time.Minute) + + metric1 := report.MakeMetric(). + Add(t2, 0.2). + Add(t3, 0.31) + + metric2 := report.MakeMetric(). + Add(t1, -0.1). + Add(t3, 0.3). + Add(t4, 0.4) + + want := report.MakeMetric(). + Add(t1, -0.1). + Add(t2, 0.2). + Add(t3, 0.31). + Add(t4, 0.4) + have := metric1.Merge(metric2) + if !reflect.DeepEqual(want, have) { + t.Errorf("diff: %s", test.Diff(want, have)) + } + + // Check it didn't modify metric1 + if !metric1.First.Equal(t2) { + t.Errorf("Expected metric1.First == %q, but was: %q", t2, metric1.First) + } + if !metric1.Last.Equal(t3) { + t.Errorf("Expected metric1.Last == %q, but was: %q", t3, metric1.Last) + } + if metric1.Min != 0.0 { + t.Errorf("Expected metric1.Min == %f, but was: %f", 0.0, metric1.Min) + } + if metric1.Max != 0.31 { + t.Errorf("Expected metric1.Max == %f, but was: %f", 0.31, metric1.Max) + } + + // Check the result is not the same instance as metric1 + if &metric1 == &have { + t.Errorf("Expected different pointers for metric1 and have, but both were: %p", &have) + } +} + +func TestMetricCopy(t *testing.T) { + want := report.MakeMetric() + have := want.Copy() + if !reflect.DeepEqual(want, have) { + t.Errorf("diff: %s", test.Diff(want, have)) + } + + want = report.MakeMetric().Add(time.Now(), 1) + have = want.Copy() + if !reflect.DeepEqual(want, have) { + t.Errorf("diff: %s", test.Diff(want, have)) + } +} + +func TestMetricDiv(t *testing.T) { + t1 := time.Now() + t2 := time.Now().Add(1 * time.Minute) + + want := report.MakeMetric(). + Add(t1, -2). + Add(t2, 2) + beforeDiv := report.MakeMetric(). + Add(t1, -2048). + Add(t2, 2048) + have := beforeDiv.Div(1024) + if !reflect.DeepEqual(want, have) { + t.Errorf("diff: %s", test.Diff(want, have)) + } + + // Check the original was unmodified + checkMetric(t, beforeDiv, t1, t2, -2048, 2048) +} + +type codec struct { + name string + encode func(interface{}) ([]byte, error) + decode func([]byte, interface{}) error + extraChecks func(*testing.T, codec, []byte) +} + +func TestMetricMarshalling(t *testing.T) { + t1 := time.Now().UTC() + t2 := time.Now().UTC().Add(1 * time.Minute) + t3 := time.Now().UTC().Add(2 * time.Minute) + t4 := time.Now().UTC().Add(3 * time.Minute) + + wantSamples := []report.Sample{ + {Timestamp: t1, Value: 0.1}, + {Timestamp: t2, Value: 0.2}, + {Timestamp: t3, Value: 0.3}, + {Timestamp: t4, Value: 0.4}, + } + + want := report.MakeMetric() + for _, sample := range wantSamples { + want = want.Add(sample.Timestamp, sample.Value) + } + + codecs := []codec{ + { + "json", + json.Marshal, + json.Unmarshal, + func(t *testing.T, codec codec, b []byte) { + var wire struct { + Samples []report.Sample `json:"samples"` + } + if err := codec.decode(b, &wire); err != nil { + t.Fatalf("[%s] %s", codec.name, err) + } + if !reflect.DeepEqual(wantSamples, wire.Samples) { + t.Errorf("[%s] diff: %sencoded: %s", codec.name, test.Diff(wantSamples, wire.Samples), b) + } + }, + }, + { + "gob", + func(v interface{}) ([]byte, error) { + buf := &bytes.Buffer{} + err := gob.NewEncoder(buf).Encode(v) + return buf.Bytes(), err + }, + func(b []byte, v interface{}) error { + return gob.NewDecoder(bytes.NewReader(b)).Decode(v) + }, + nil, + }, + } + for _, codec := range codecs { + b, err := codec.encode(want) + if err != nil { + t.Fatalf("[%s] %s", codec.name, err) + } + + var have report.Metric + err = codec.decode(b, &have) + if err != nil { + t.Fatalf("[%s] %s", codec.name, err) + } + + if !reflect.DeepEqual(want, have) { + t.Errorf("[%s] diff: %sencoded: %s", codec.name, test.Diff(want, have), b) + } + + if codec.extraChecks != nil { + codec.extraChecks(t, codec, b) + } + } +} diff --git a/report/topology.go b/report/topology.go index 3700b1496..76cf3b97f 100644 --- a/report/topology.go +++ b/report/topology.go @@ -91,6 +91,7 @@ type Node struct { Edges EdgeMetadatas `json:"edges,omitempty"` Controls NodeControls `json:"controls,omitempty"` Latest LatestMap `json:"latest,omitempty"` + Metrics Metrics `json:"metrics,omitempty"` } // MakeNode creates a new Node with no initial metadata. @@ -103,6 +104,7 @@ func MakeNode() Node { Edges: EdgeMetadatas{}, Controls: MakeNodeControls(), Latest: MakeLatestMap(), + Metrics: Metrics{}, } } @@ -140,6 +142,20 @@ func (n Node) WithSets(sets Sets) Node { return result } +// WithMetric returns a fresh copy of n, with metric merged in at key. +func (n Node) WithMetric(key string, metric Metric) Node { + result := n.Copy() + n.Metrics[key] = n.Metrics[key].Merge(metric) + return result +} + +// WithMetrics returns a fresh copy of n, with metrics merged in. +func (n Node) WithMetrics(metrics Metrics) Node { + result := n.Copy() + result.Metrics = result.Metrics.Merge(metrics) + return result +} + // WithAdjacent returns a fresh copy of n, with 'a' added to Adjacency func (n Node) WithAdjacent(a ...string) Node { result := n.Copy() @@ -180,6 +196,7 @@ func (n Node) Copy() Node { cp.Edges = n.Edges.Copy() cp.Controls = n.Controls.Copy() cp.Latest = n.Latest.Copy() + cp.Metrics = n.Metrics.Copy() return cp } @@ -194,6 +211,7 @@ func (n Node) Merge(other Node) Node { cp.Edges = cp.Edges.Merge(other.Edges) cp.Controls = cp.Controls.Merge(other.Controls) cp.Latest = cp.Latest.Merge(other.Latest) + cp.Metrics = cp.Metrics.Merge(other.Metrics) return cp } diff --git a/test/fixture/report_fixture.go b/test/fixture/report_fixture.go index e0567760e..d4d6f6050 100644 --- a/test/fixture/report_fixture.go +++ b/test/fixture/report_fixture.go @@ -98,6 +98,13 @@ var ( ServiceID = "ping/pongservice" ServiceNodeID = report.MakeServiceNodeID("ping", "pongservice") + LoadMetric = report.MakeMetric().Add(Now, 0.01).WithFirst(Now.Add(-15 * time.Second)) + LoadMetrics = report.Metrics{ + host.Load1: LoadMetric, + host.Load5: LoadMetric, + host.Load15: LoadMetric, + } + Report = report.Report{ Endpoint: report.Topology{ Nodes: report.Nodes{ @@ -286,18 +293,24 @@ var ( ClientHostNodeID: report.MakeNodeWith(map[string]string{ "host_name": ClientHostName, "os": "Linux", - "load": "0.01 0.01 0.01", report.HostNodeID: ClientHostNodeID, }).WithSets(report.Sets{ host.LocalNetworks: report.MakeStringSet("10.10.10.0/24"), + }).WithMetrics(report.Metrics{ + host.Load1: LoadMetric, + host.Load5: LoadMetric, + host.Load15: LoadMetric, }), ServerHostNodeID: report.MakeNodeWith(map[string]string{ "host_name": ServerHostName, "os": "Linux", - "load": "0.01 0.01 0.01", report.HostNodeID: ServerHostNodeID, }).WithSets(report.Sets{ host.LocalNetworks: report.MakeStringSet("10.10.10.0/24"), + }).WithMetrics(report.Metrics{ + host.Load1: LoadMetric, + host.Load5: LoadMetric, + host.Load15: LoadMetric, }), }, },