Metrics plumbing for reports.

- base load graph x-axis on data, not a hardcoded window
- format memory dynamically to scale
- add some tests for the immutability of metrics
- use ps.List for Sample storage, so it is immutable. Have to implement custom marshalling
- adding tests for report.Metrics
- check the ordering of the json samples
- Make the nil value for Metrics valid.
- Sort samples from oldest to newest on the wire.
This commit is contained in:
Paul Bellamy
2015-11-09 17:32:16 +00:00
committed by Tom Wilkie
parent 57e2046d1a
commit 8f63d7be7f
8 changed files with 762 additions and 87 deletions

View File

@@ -18,6 +18,7 @@ DOCKER_DISTRIB=docker/docker-$(DOCKER_VERSION).tgz
DOCKER_DISTRIB_URL=https://get.docker.com/builds/Linux/x86_64/docker-$(DOCKER_VERSION).tgz
RUNSVINIT=vendor/runsvinit/runsvinit
RM=--rm
RUN_FLAGS=-ti
BUILD_IN_CONTAINER=true
all: $(SCOPE_EXPORT)
@@ -43,7 +44,7 @@ $(PROBE_EXE): probe/*.go probe/docker/*.go probe/kubernetes/*.go probe/endpoint/
ifeq ($(BUILD_IN_CONTAINER),true)
$(APP_EXE) $(PROBE_EXE) $(RUNSVINIT): $(SCOPE_BACKEND_BUILD_UPTODATE)
$(SUDO) docker run $(RM) -v $(shell pwd):/go/src/github.com/weaveworks/scope -e GOARCH -e GOOS \
$(SUDO) docker run $(RM) $(RUN_FLAGS) -v $(shell pwd):/go/src/github.com/weaveworks/scope -e GOARCH -e GOOS \
$(SCOPE_BACKEND_BUILD_IMAGE) SCOPE_VERSION=$(SCOPE_VERSION) $@
else
$(APP_EXE) $(PROBE_EXE): $(SCOPE_BACKEND_BUILD_UPTODATE)
@@ -67,22 +68,22 @@ static: client/build/app.js
ifeq ($(BUILD_IN_CONTAINER),true)
client/build/app.js: $(shell find client/app/scripts -type f)
mkdir -p client/build
$(SUDO) docker run $(RM) -v $(shell pwd)/client/app:/home/weave/app \
$(SUDO) docker run $(RM) $(RUN_FLAGS) -v $(shell pwd)/client/app:/home/weave/app \
-v $(shell pwd)/client/build:/home/weave/build \
$(SCOPE_UI_BUILD_IMAGE) npm run build
client-test: $(shell find client/app/scripts -type f)
$(SUDO) docker run $(RM) -v $(shell pwd)/client/app:/home/weave/app \
$(SUDO) docker run $(RM) $(RUN_FLAGS) -v $(shell pwd)/client/app:/home/weave/app \
-v $(shell pwd)/client/test:/home/weave/test \
$(SCOPE_UI_BUILD_IMAGE) npm test
client-lint:
$(SUDO) docker run $(RM) -v $(shell pwd)/client/app:/home/weave/app \
$(SUDO) docker run $(RM) $(RUN_FLAGS) -v $(shell pwd)/client/app:/home/weave/app \
-v $(shell pwd)/client/test:/home/weave/test \
$(SCOPE_UI_BUILD_IMAGE) npm run lint
client-start:
$(SUDO) docker run $(RM) --net=host -v $(shell pwd)/client/app:/home/weave/app \
$(SUDO) docker run $(RM) $(RUN_FLAGS) --net=host -v $(shell pwd)/client/app:/home/weave/app \
-v $(shell pwd)/client/build:/home/weave/build \
$(SCOPE_UI_BUILD_IMAGE) npm start
endif
@@ -105,7 +106,7 @@ clean:
ifeq ($(BUILD_IN_CONTAINER),true)
tests: $(SCOPE_BACKEND_BUILD_UPTODATE)
$(SUDO) docker run $(RM) -v $(shell pwd):/go/src/github.com/weaveworks/scope \
$(SUDO) docker run $(RM) $(RUN_FLAGS) -v $(shell pwd):/go/src/github.com/weaveworks/scope \
-e GOARCH -e GOOS -e CIRCLECI -e CIRCLE_BUILD_NUM -e CIRCLE_NODE_TOTAL -e CIRCLE_NODE_INDEX -e COVERDIR\
$(SCOPE_BACKEND_BUILD_IMAGE) tests
else

View File

@@ -13,7 +13,6 @@ import (
)
const (
mb = 1 << 20
containerImageRank = 4
containerRank = 3
processRank = 2
@@ -43,10 +42,12 @@ type Table struct {
// Row is a single entry in a Table dataset.
type Row struct {
Key string `json:"key"` // e.g. Ingress
ValueMajor string `json:"value_major"` // e.g. 25
ValueMinor string `json:"value_minor,omitempty"` // e.g. KB/s
Expandable bool `json:"expandable,omitempty"` // Whether it can be expanded (hidden by default)
Key string `json:"key"` // e.g. Ingress
ValueMajor string `json:"value_major"` // e.g. 25
ValueMinor string `json:"value_minor,omitempty"` // e.g. KB/s
Expandable bool `json:"expandable,omitempty"` // Whether it can be expanded (hidden by default)
ValueType string `json:"value_type,omitempty"` // e.g. sparkline
Metric *report.Metric `json:"metric,omitempty"` // e.g. sparkline data samples
}
// ControlInstance contains a control description, and all the info
@@ -168,21 +169,21 @@ func connectionsTable(connections []Row, r report.Report, n RenderableNode) (Tab
rows := []Row{}
if n.EdgeMetadata.MaxConnCountTCP != nil {
rows = append(rows, Row{"TCP connections", strconv.FormatUint(*n.EdgeMetadata.MaxConnCountTCP, 10), "", false})
rows = append(rows, Row{Key: "TCP connections", ValueMajor: strconv.FormatUint(*n.EdgeMetadata.MaxConnCountTCP, 10)})
}
if rate, ok := rate(n.EdgeMetadata.EgressPacketCount); ok {
rows = append(rows, Row{"Egress packet rate", fmt.Sprintf("%.0f", rate), "packets/sec", false})
rows = append(rows, Row{Key: "Egress packet rate", ValueMajor: fmt.Sprintf("%.0f", rate), ValueMinor: "packets/sec"})
}
if rate, ok := rate(n.EdgeMetadata.IngressPacketCount); ok {
rows = append(rows, Row{"Ingress packet rate", fmt.Sprintf("%.0f", rate), "packets/sec", false})
rows = append(rows, Row{Key: "Ingress packet rate", ValueMajor: fmt.Sprintf("%.0f", rate), ValueMinor: "packets/sec"})
}
if rate, ok := rate(n.EdgeMetadata.EgressByteCount); ok {
s, unit := shortenByteRate(rate)
rows = append(rows, Row{"Egress byte rate", s, unit, false})
rows = append(rows, Row{Key: "Egress byte rate", ValueMajor: s, ValueMinor: unit})
}
if rate, ok := rate(n.EdgeMetadata.IngressByteCount); ok {
s, unit := shortenByteRate(rate)
rows = append(rows, Row{"Ingress byte rate", s, unit, false})
rows = append(rows, Row{Key: "Ingress byte rate", ValueMajor: s, ValueMinor: unit})
}
if len(connections) > 0 {
sort.Sort(sortableRows(connections))
@@ -343,6 +344,58 @@ func processOriginTable(nmd report.Node, addHostTag bool, addContainerTag bool)
}, len(rows) > 0 || commFound || pidFound
}
func sparklineRow(human string, metric report.Metric, format func(report.Metric) (report.Metric, string)) Row {
if format == nil {
format = formatDefault
}
metric, lastStr := format(metric)
return Row{Key: human, ValueMajor: lastStr, Metric: &metric, ValueType: "sparkline"}
}
func formatDefault(m report.Metric) (report.Metric, string) {
if s := m.LastSample(); s != nil {
return m, fmt.Sprintf("%0.2f", s.Value)
}
return m, ""
}
func memoryScale(n float64) (string, float64) {
brackets := []struct {
human string
shift uint
}{
{"bytes", 0},
{"KB", 10},
{"MB", 20},
{"GB", 30},
{"TB", 40},
{"PB", 50},
}
for _, bracket := range brackets {
unit := (1 << bracket.shift)
if n < float64(unit<<10) {
return bracket.human, float64(unit)
}
}
return "PB", float64(1 << 50)
}
func formatMemory(m report.Metric) (report.Metric, string) {
s := m.LastSample()
if s == nil {
return m, ""
}
human, divisor := memoryScale(s.Value)
return m.Div(divisor), fmt.Sprintf("%0.2f %s", s.Value/divisor, human)
}
func formatPercent(m report.Metric) (report.Metric, string) {
if s := m.LastSample(); s != nil {
return m, fmt.Sprintf("%0.2f%%", s.Value)
}
return m, ""
}
func containerOriginTable(nmd report.Node, addHostTag bool) (Table, bool) {
rows := []Row{}
for _, tuple := range []struct{ key, human string }{
@@ -372,17 +425,17 @@ func containerOriginTable(nmd report.Node, addHostTag bool) (Table, bool) {
}
rows = append(rows, getDockerLabelRows(nmd)...)
if val, ok := nmd.Metadata[docker.MemoryUsage]; ok {
memory, err := strconv.ParseFloat(val, 64)
if err == nil {
memoryStr := fmt.Sprintf("%0.2f", memory/float64(mb))
rows = append(rows, Row{Key: "Memory Usage (MB):", ValueMajor: memoryStr, ValueMinor: ""})
}
}
if addHostTag {
rows = append([]Row{{Key: "Host", ValueMajor: report.ExtractHostID(nmd)}}, rows...)
}
if val, ok := nmd.Metrics[docker.MemoryUsage]; ok {
rows = append(rows, sparklineRow("Memory Usage", val, formatMemory))
}
if val, ok := nmd.Metrics[docker.CPUTotalUsage]; ok {
rows = append(rows, sparklineRow("CPU Usage", val, formatPercent))
}
var (
title = "Container"
name, nameFound = GetRenderableContainerName(nmd)
@@ -441,9 +494,31 @@ func getDockerLabelRows(nmd report.Node) []Row {
}
func hostOriginTable(nmd report.Node) (Table, bool) {
// Ensure that all metrics have the same max
maxLoad := 0.0
for _, key := range []string{host.Load1, host.Load5, host.Load15} {
if metric, ok := nmd.Metrics[key]; ok {
if metric.Len() == 0 {
continue
}
if metric.Max > maxLoad {
maxLoad = metric.Max
}
}
}
rows := []Row{}
for _, tuple := range []struct{ key, human string }{
{host.Load, "Load"},
{host.Load1, "Load (1m)"},
{host.Load5, "Load (5m)"},
{host.Load15, "Load (15m)"},
} {
if val, ok := nmd.Metrics[tuple.key]; ok {
val.Max = maxLoad
rows = append(rows, sparklineRow(tuple.human, val, nil))
}
}
for _, tuple := range []struct{ key, human string }{
{host.OS, "Operating system"},
{host.KernelVersion, "Kernel version"},
{host.Uptime, "Uptime"},

View File

@@ -26,8 +26,10 @@ func TestOriginTable(t *testing.T) {
Numeric: false,
Rank: 1,
Rows: []render.Row{
{"Load", "0.01 0.01 0.01", "", false},
{"Operating system", "Linux", "", false},
{Key: "Load (1m)", ValueMajor: "0.01", Metric: &fixture.LoadMetric, ValueType: "sparkline"},
{Key: "Load (5m)", ValueMajor: "0.01", Metric: &fixture.LoadMetric, ValueType: "sparkline"},
{Key: "Load (15m)", ValueMajor: "0.01", Metric: &fixture.LoadMetric, ValueType: "sparkline"},
{Key: "Operating system", ValueMajor: "Linux"},
},
},
} {
@@ -48,8 +50,8 @@ func TestOriginTable(t *testing.T) {
Numeric: false,
Rank: 2,
Rows: []render.Row{
{"Host", fixture.ServerHostID, "", false},
{"Container ID", fixture.ServerContainerID, "", false},
{Key: "Host", ValueMajor: fixture.ServerHostID},
{Key: "Container ID", ValueMajor: fixture.ServerContainerID},
},
},
fixture.ServerContainerNodeID: {
@@ -57,14 +59,14 @@ func TestOriginTable(t *testing.T) {
Numeric: false,
Rank: 3,
Rows: []render.Row{
{"Host", fixture.ServerHostID, "", false},
{"State", "running", "", false},
{"ID", fixture.ServerContainerID, "", false},
{"Image ID", fixture.ServerContainerImageID, "", false},
{fmt.Sprintf(`Label %q`, render.AmazonECSContainerNameLabel), `server`, "", false},
{`Label "foo1"`, `bar1`, "", false},
{`Label "foo2"`, `bar2`, "", false},
{`Label "io.kubernetes.pod.name"`, "ping/pong-b", "", false},
{Key: "Host", ValueMajor: fixture.ServerHostID},
{Key: "State", ValueMajor: "running"},
{Key: "ID", ValueMajor: fixture.ServerContainerID},
{Key: "Image ID", ValueMajor: fixture.ServerContainerImageID},
{Key: fmt.Sprintf(`Label %q`, render.AmazonECSContainerNameLabel), ValueMajor: `server`},
{Key: `Label "foo1"`, ValueMajor: `bar1`},
{Key: `Label "foo2"`, ValueMajor: `bar2`},
{Key: `Label "io.kubernetes.pod.name"`, ValueMajor: "ping/pong-b"},
},
},
} {
@@ -95,14 +97,26 @@ func TestMakeDetailedHostNode(t *testing.T) {
Rank: 1,
Rows: []render.Row{
{
Key: "Load",
ValueMajor: "0.01 0.01 0.01",
ValueMinor: "",
Key: "Load (1m)",
ValueMajor: "0.01",
Metric: &fixture.LoadMetric,
ValueType: "sparkline",
},
{
Key: "Load (5m)",
ValueMajor: "0.01",
Metric: &fixture.LoadMetric,
ValueType: "sparkline",
},
{
Key: "Load (15m)",
ValueMajor: "0.01",
Metric: &fixture.LoadMetric,
ValueType: "sparkline",
},
{
Key: "Operating system",
ValueMajor: "Linux",
ValueMinor: "",
},
},
},
@@ -114,18 +128,15 @@ func TestMakeDetailedHostNode(t *testing.T) {
{
Key: "TCP connections",
ValueMajor: "3",
ValueMinor: "",
},
{
Key: "Client",
ValueMajor: "Server",
ValueMinor: "",
Expandable: true,
},
{
Key: "10.10.10.20",
ValueMajor: "192.168.1.1",
ValueMinor: "",
Expandable: true,
},
},
@@ -152,9 +163,9 @@ func TestMakeDetailedContainerNode(t *testing.T) {
Numeric: false,
Rank: 4,
Rows: []render.Row{
{"Image ID", fixture.ServerContainerImageID, "", false},
{`Label "foo1"`, `bar1`, "", false},
{`Label "foo2"`, `bar2`, "", false},
{Key: "Image ID", ValueMajor: fixture.ServerContainerImageID},
{Key: `Label "foo1"`, ValueMajor: `bar1`},
{Key: `Label "foo2"`, ValueMajor: `bar2`},
},
},
{
@@ -162,13 +173,13 @@ func TestMakeDetailedContainerNode(t *testing.T) {
Numeric: false,
Rank: 3,
Rows: []render.Row{
{"State", "running", "", false},
{"ID", fixture.ServerContainerID, "", false},
{"Image ID", fixture.ServerContainerImageID, "", false},
{fmt.Sprintf(`Label %q`, render.AmazonECSContainerNameLabel), `server`, "", false},
{`Label "foo1"`, `bar1`, "", false},
{`Label "foo2"`, `bar2`, "", false},
{`Label "io.kubernetes.pod.name"`, "ping/pong-b", "", false},
{Key: "State", ValueMajor: "running"},
{Key: "ID", ValueMajor: fixture.ServerContainerID},
{Key: "Image ID", ValueMajor: fixture.ServerContainerImageID},
{Key: fmt.Sprintf(`Label %q`, render.AmazonECSContainerNameLabel), ValueMajor: `server`},
{Key: `Label "foo1"`, ValueMajor: `bar1`},
{Key: `Label "foo2"`, ValueMajor: `bar2`},
{Key: `Label "io.kubernetes.pod.name"`, ValueMajor: "ping/pong-b"},
},
},
{
@@ -182,8 +193,10 @@ func TestMakeDetailedContainerNode(t *testing.T) {
Numeric: false,
Rank: 1,
Rows: []render.Row{
{"Load", "0.01 0.01 0.01", "", false},
{"Operating system", "Linux", "", false},
{Key: "Load (1m)", ValueMajor: "0.01", Metric: &fixture.LoadMetric, ValueType: "sparkline"},
{Key: "Load (5m)", ValueMajor: "0.01", Metric: &fixture.LoadMetric, ValueType: "sparkline"},
{Key: "Load (15m)", ValueMajor: "0.01", Metric: &fixture.LoadMetric, ValueType: "sparkline"},
{Key: "Operating system", ValueMajor: "Linux"},
},
},
{
@@ -191,44 +204,38 @@ func TestMakeDetailedContainerNode(t *testing.T) {
Numeric: false,
Rank: 0,
Rows: []render.Row{
{"Ingress packet rate", "105", "packets/sec", false},
{"Ingress byte rate", "1.0", "KBps", false},
{"Client", "Server", "", true},
{Key: "Ingress packet rate", ValueMajor: "105", ValueMinor: "packets/sec"},
{Key: "Ingress byte rate", ValueMajor: "1.0", ValueMinor: "KBps"},
{Key: "Client", ValueMajor: "Server", Expandable: true},
{
fmt.Sprintf("%s:%s", fixture.UnknownClient1IP, fixture.UnknownClient1Port),
fmt.Sprintf("%s:%s", fixture.ServerIP, fixture.ServerPort),
"",
true,
Key: fmt.Sprintf("%s:%s", fixture.UnknownClient1IP, fixture.UnknownClient1Port),
ValueMajor: fmt.Sprintf("%s:%s", fixture.ServerIP, fixture.ServerPort),
Expandable: true,
},
{
fmt.Sprintf("%s:%s", fixture.UnknownClient2IP, fixture.UnknownClient2Port),
fmt.Sprintf("%s:%s", fixture.ServerIP, fixture.ServerPort),
"",
true,
Key: fmt.Sprintf("%s:%s", fixture.UnknownClient2IP, fixture.UnknownClient2Port),
ValueMajor: fmt.Sprintf("%s:%s", fixture.ServerIP, fixture.ServerPort),
Expandable: true,
},
{
fmt.Sprintf("%s:%s", fixture.UnknownClient3IP, fixture.UnknownClient3Port),
fmt.Sprintf("%s:%s", fixture.ServerIP, fixture.ServerPort),
"",
true,
Key: fmt.Sprintf("%s:%s", fixture.UnknownClient3IP, fixture.UnknownClient3Port),
ValueMajor: fmt.Sprintf("%s:%s", fixture.ServerIP, fixture.ServerPort),
Expandable: true,
},
{
fmt.Sprintf("%s:%s", fixture.ClientIP, fixture.ClientPort54001),
fmt.Sprintf("%s:%s", fixture.ServerIP, fixture.ServerPort),
"",
true,
Key: fmt.Sprintf("%s:%s", fixture.ClientIP, fixture.ClientPort54001),
ValueMajor: fmt.Sprintf("%s:%s", fixture.ServerIP, fixture.ServerPort),
Expandable: true,
},
{
fmt.Sprintf("%s:%s", fixture.ClientIP, fixture.ClientPort54002),
fmt.Sprintf("%s:%s", fixture.ServerIP, fixture.ServerPort),
"",
true,
Key: fmt.Sprintf("%s:%s", fixture.ClientIP, fixture.ClientPort54002),
ValueMajor: fmt.Sprintf("%s:%s", fixture.ServerIP, fixture.ServerPort),
Expandable: true,
},
{
fmt.Sprintf("%s:%s", fixture.RandomClientIP, fixture.RandomClientPort),
fmt.Sprintf("%s:%s", fixture.ServerIP, fixture.ServerPort),
"",
true,
Key: fmt.Sprintf("%s:%s", fixture.RandomClientIP, fixture.RandomClientPort),
ValueMajor: fmt.Sprintf("%s:%s", fixture.ServerIP, fixture.ServerPort),
Expandable: true,
},
},
},

View File

@@ -79,7 +79,7 @@ func (m LatestMap) toIntermediate() map[string]LatestEntry {
return intermediate
}
func fromIntermediate(in map[string]LatestEntry) LatestMap {
func (m LatestMap) fromIntermediate(in map[string]LatestEntry) LatestMap {
out := ps.NewMap()
for k, v := range in {
out = out.Set(k, v)
@@ -105,7 +105,7 @@ func (m *LatestMap) UnmarshalJSON(input []byte) error {
if err := json.NewDecoder(bytes.NewBuffer(input)).Decode(&in); err != nil {
return err
}
*m = fromIntermediate(in)
*m = LatestMap{}.fromIntermediate(in)
return nil
}
@@ -122,6 +122,6 @@ func (m *LatestMap) GobDecode(input []byte) error {
if err := gob.NewDecoder(bytes.NewBuffer(input)).Decode(&in); err != nil {
return err
}
*m = fromIntermediate(in)
*m = LatestMap{}.fromIntermediate(in)
return nil
}

278
report/metrics.go Normal file
View File

@@ -0,0 +1,278 @@
package report
import (
"bytes"
"encoding/gob"
"encoding/json"
"math"
"time"
"github.com/mndrix/ps"
)
// Metrics is a string->metric map.
type Metrics map[string]Metric
// Merge merges two sets maps into a fresh set, performing set-union merges as
// appropriate.
func (m Metrics) Merge(other Metrics) Metrics {
result := m.Copy()
for k, v := range other {
result[k] = result[k].Merge(v)
}
return result
}
// Copy returns a value copy of the sets map.
func (m Metrics) Copy() Metrics {
result := Metrics{}
for k, v := range m {
result[k] = v
}
return result
}
// Metric is a list of timeseries data with some metadata. Clients must use the
// Add method to add values. Metrics are immutable.
type Metric struct {
Samples ps.List
Min, Max float64
First, Last time.Time
}
// Sample is a single datapoint of a metric.
type Sample struct {
Timestamp time.Time `json:"date"`
Value float64 `json:"value"`
}
var nilMetric = Metric{Samples: ps.NewList()}
// MakeMetric makes a new Metric.
func MakeMetric() Metric {
return nilMetric
}
// Copy returns a value copy of the Metric. Metric is immutable, so we can skip
// this.
func (m Metric) Copy() Metric {
return m
}
// WithFirst returns a fresh copy of m, with First set to t.
func (m Metric) WithFirst(t time.Time) Metric {
return Metric{
Samples: m.Samples,
Max: m.Max,
Min: m.Min,
First: t,
Last: m.Last,
}
}
// Len returns the number of samples in the metric.
func (m Metric) Len() int {
if m.Samples == nil {
return 0
}
return m.Samples.Size()
}
func first(t1, t2 time.Time) time.Time {
if !t1.IsZero() && t1.Before(t2) {
return t1
}
return t2
}
func last(t1, t2 time.Time) time.Time {
if !t1.IsZero() && t1.After(t2) {
return t1
}
return t2
}
// revCons appends acc to the head of curr, where acc is in reverse order.
// acc must never be nil, curr can be.
func revCons(acc, curr ps.List) ps.List {
if curr == nil {
return acc.Reverse()
}
for !acc.IsNil() {
acc, curr = acc.Tail(), curr.Cons(acc.Head())
}
return curr
}
// Add returns a new Metric with (t, v) added to its Samples. Add is the only
// valid way to grow a Metric.
func (m Metric) Add(t time.Time, v float64) Metric {
// Find the first element which is before you element, and insert
// your new element in the list. NB we want to dedupe entries with
// equal timestamps.
// This should be O(1) to insert a latest element, and O(n) in general.
curr, acc := m.Samples, ps.NewList()
for {
if curr == nil || curr.IsNil() {
acc = acc.Cons(Sample{t, v})
break
}
currSample := curr.Head().(Sample)
if currSample.Timestamp.Equal(t) {
acc, curr = acc.Cons(Sample{t, v}), curr.Tail()
break
}
if currSample.Timestamp.Before(t) {
acc = acc.Cons(Sample{t, v})
break
}
acc, curr = acc.Cons(curr.Head()), curr.Tail()
}
acc = revCons(acc, curr)
return Metric{
Samples: acc,
Max: math.Max(m.Max, v),
Min: math.Min(m.Min, v),
First: first(m.First, t),
Last: last(m.Last, t),
}
}
// Merge combines the two Metrics and returns a new result.
func (m Metric) Merge(other Metric) Metric {
// Merge two lists of samples in O(n)
curr1, curr2, acc := m.Samples, other.Samples, ps.NewList()
for {
if curr1 == nil || curr1.IsNil() {
acc = revCons(acc, curr2)
break
} else if curr2 == nil || curr2.IsNil() {
acc = revCons(acc, curr1)
break
}
s1 := curr1.Head().(Sample)
s2 := curr2.Head().(Sample)
if s1.Timestamp.Equal(s2.Timestamp) {
curr1, curr2, acc = curr1.Tail(), curr2.Tail(), acc.Cons(s1)
} else if s1.Timestamp.After(s2.Timestamp) {
curr1, acc = curr1.Tail(), acc.Cons(s1)
} else {
curr2, acc = curr2.Tail(), acc.Cons(s2)
}
}
return Metric{
Samples: acc,
Max: math.Max(m.Max, other.Max),
Min: math.Min(m.Min, other.Min),
First: first(m.First, other.First),
Last: last(m.Last, other.Last),
}
}
// Div returns a new copy of the metric, with each value divided by n.
func (m Metric) Div(n float64) Metric {
curr, acc := m.Samples, ps.NewList()
for curr != nil && !curr.IsNil() {
s := curr.Head().(Sample)
curr, acc = curr.Tail(), acc.Cons(Sample{s.Timestamp, s.Value / n})
}
acc = acc.Reverse()
return Metric{
Samples: acc,
Max: m.Max / n,
Min: m.Min / n,
First: m.First,
Last: m.Last,
}
}
// LastSample returns the last sample in the metric, or nil if there are no
// samples.
func (m Metric) LastSample() *Sample {
if m.Samples == nil || m.Samples.IsNil() {
return nil
}
s := m.Samples.Head().(Sample)
return &s
}
// WireMetrics is the on-the-wire representation of Metrics.
type WireMetrics struct {
Samples []Sample `json:"samples"` // On the wire, samples are sorted oldest to newest,
Min float64 `json:"min"` // the opposite order to how we store them internally.
Max float64 `json:"max"`
First time.Time `json:"first"`
Last time.Time `json:"last"`
}
func (m Metric) toIntermediate() WireMetrics {
samples := []Sample{}
if m.Samples != nil {
m.Samples.Reverse().ForEach(func(s interface{}) {
samples = append(samples, s.(Sample))
})
}
return WireMetrics{
Samples: samples,
Max: m.Max,
Min: m.Min,
First: m.First,
Last: m.Last,
}
}
func (m WireMetrics) fromIntermediate() Metric {
samples := ps.NewList()
for _, s := range m.Samples {
samples = samples.Cons(s)
}
return Metric{
Samples: samples,
Max: m.Max,
Min: m.Min,
First: m.First,
Last: m.Last,
}
}
// MarshalJSON implements json.Marshaller
func (m Metric) MarshalJSON() ([]byte, error) {
buf := bytes.Buffer{}
in := m.toIntermediate()
err := json.NewEncoder(&buf).Encode(in)
return buf.Bytes(), err
}
// UnmarshalJSON implements json.Unmarshaler
func (m *Metric) UnmarshalJSON(input []byte) error {
in := WireMetrics{}
if err := json.NewDecoder(bytes.NewBuffer(input)).Decode(&in); err != nil {
return err
}
*m = in.fromIntermediate()
return nil
}
// GobEncode implements gob.Marshaller
func (m Metric) GobEncode() ([]byte, error) {
buf := bytes.Buffer{}
err := gob.NewEncoder(&buf).Encode(m.toIntermediate())
return buf.Bytes(), err
}
// GobDecode implements gob.Unmarshaller
func (m *Metric) GobDecode(input []byte) error {
in := WireMetrics{}
if err := gob.NewDecoder(bytes.NewBuffer(input)).Decode(&in); err != nil {
return err
}
*m = in.fromIntermediate()
return nil
}

283
report/metrics_test.go Normal file
View File

@@ -0,0 +1,283 @@
package report_test
import (
"bytes"
"encoding/gob"
"encoding/json"
"reflect"
"testing"
"time"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/test"
)
func TestMetricsMerge(t *testing.T) {
t1 := time.Now()
t2 := time.Now().Add(1 * time.Minute)
t3 := time.Now().Add(2 * time.Minute)
t4 := time.Now().Add(3 * time.Minute)
metrics1 := report.Metrics{
"metric1": report.MakeMetric().Add(t1, 0.1).Add(t2, 0.2),
"metric2": report.MakeMetric().Add(t3, 0.3),
}
metrics2 := report.Metrics{
"metric2": report.MakeMetric().Add(t4, 0.4),
"metric3": report.MakeMetric().Add(t1, 0.1).Add(t2, 0.2),
}
want := report.Metrics{
"metric1": report.MakeMetric().Add(t1, 0.1).Add(t2, 0.2),
"metric2": report.MakeMetric().Add(t3, 0.3).Add(t4, 0.4),
"metric3": report.MakeMetric().Add(t1, 0.1).Add(t2, 0.2),
}
have := metrics1.Merge(metrics2)
if !reflect.DeepEqual(want, have) {
t.Errorf("diff: %s", test.Diff(want, have))
}
}
func TestMetricsCopy(t *testing.T) {
t1 := time.Now()
want := report.Metrics{
"metric1": report.MakeMetric().Add(t1, 0.1),
}
delete(want.Copy(), "metric1") // Modify a copy
have := want.Copy() // Check the original wasn't affected
if !reflect.DeepEqual(want, have) {
t.Errorf("diff: %s", test.Diff(want, have))
}
}
func checkMetric(t *testing.T, metric report.Metric, first, last time.Time, min, max float64) {
if !metric.First.Equal(first) {
t.Errorf("Expected metric.First == %q, but was: %q", first, metric.First)
}
if !metric.Last.Equal(last) {
t.Errorf("Expected metric.Last == %q, but was: %q", last, metric.Last)
}
if metric.Min != min {
t.Errorf("Expected metric.Min == %f, but was: %f", min, metric.Min)
}
if metric.Max != max {
t.Errorf("Expected metric.Max == %f, but was: %f", max, metric.Max)
}
}
func TestMetricFirstLastMinMax(t *testing.T) {
metric := report.MakeMetric()
var zero time.Time
t1 := time.Now()
t2 := time.Now().Add(1 * time.Minute)
t3 := time.Now().Add(2 * time.Minute)
t4 := time.Now().Add(3 * time.Minute)
other := report.MakeMetric()
other.Max = 5
other.Min = -5
other.First = t1.Add(-1 * time.Minute)
other.Last = t4.Add(1 * time.Minute)
tests := []struct {
f func(report.Metric) report.Metric
first, last time.Time
min, max float64
}{
{nil, zero, zero, 0, 0},
{func(m report.Metric) report.Metric { return m.Add(t2, 2) }, t2, t2, 0, 2},
{func(m report.Metric) report.Metric { return m.Add(t1, 1) }, t1, t2, 0, 2},
{func(m report.Metric) report.Metric { return m.Add(t3, -1) }, t1, t3, -1, 2},
{func(m report.Metric) report.Metric { return m.Add(t4, 3) }, t1, t4, -1, 3},
{func(m report.Metric) report.Metric { return m.Merge(other) }, t1.Add(-1 * time.Minute), t4.Add(1 * time.Minute), -5, 5},
}
for _, test := range tests {
oldFirst, oldLast, oldMin, oldMax := metric.First, metric.Last, metric.Min, metric.Max
oldMetric := metric
if test.f != nil {
metric = test.f(metric)
}
// Check it didn't modify the old one
checkMetric(t, oldMetric, oldFirst, oldLast, oldMin, oldMax)
// Check the new one is as expected
checkMetric(t, metric, test.first, test.last, test.min, test.max)
}
}
func TestMetricAdd(t *testing.T) {
s := []report.Sample{
{time.Now(), 0.1},
{time.Now().Add(1 * time.Minute), 0.2},
{time.Now().Add(2 * time.Minute), 0.3},
}
have := report.MakeMetric().
Add(s[0].Timestamp, s[0].Value).
Add(s[2].Timestamp, s[2].Value). // Keeps sorted
Add(s[1].Timestamp, s[1].Value).
Add(s[2].Timestamp, 0.5) // Overwrites duplicate timestamps
want := report.MakeMetric().
Add(s[0].Timestamp, s[0].Value).
Add(s[1].Timestamp, s[1].Value).
Add(s[2].Timestamp, 0.5)
if !reflect.DeepEqual(want, have) {
t.Errorf("diff: %s", test.Diff(want, have))
}
}
func TestMetricMerge(t *testing.T) {
t1 := time.Now()
t2 := time.Now().Add(1 * time.Minute)
t3 := time.Now().Add(2 * time.Minute)
t4 := time.Now().Add(3 * time.Minute)
metric1 := report.MakeMetric().
Add(t2, 0.2).
Add(t3, 0.31)
metric2 := report.MakeMetric().
Add(t1, -0.1).
Add(t3, 0.3).
Add(t4, 0.4)
want := report.MakeMetric().
Add(t1, -0.1).
Add(t2, 0.2).
Add(t3, 0.31).
Add(t4, 0.4)
have := metric1.Merge(metric2)
if !reflect.DeepEqual(want, have) {
t.Errorf("diff: %s", test.Diff(want, have))
}
// Check it didn't modify metric1
if !metric1.First.Equal(t2) {
t.Errorf("Expected metric1.First == %q, but was: %q", t2, metric1.First)
}
if !metric1.Last.Equal(t3) {
t.Errorf("Expected metric1.Last == %q, but was: %q", t3, metric1.Last)
}
if metric1.Min != 0.0 {
t.Errorf("Expected metric1.Min == %f, but was: %f", 0.0, metric1.Min)
}
if metric1.Max != 0.31 {
t.Errorf("Expected metric1.Max == %f, but was: %f", 0.31, metric1.Max)
}
// Check the result is not the same instance as metric1
if &metric1 == &have {
t.Errorf("Expected different pointers for metric1 and have, but both were: %p", &have)
}
}
func TestMetricCopy(t *testing.T) {
want := report.MakeMetric()
have := want.Copy()
if !reflect.DeepEqual(want, have) {
t.Errorf("diff: %s", test.Diff(want, have))
}
want = report.MakeMetric().Add(time.Now(), 1)
have = want.Copy()
if !reflect.DeepEqual(want, have) {
t.Errorf("diff: %s", test.Diff(want, have))
}
}
func TestMetricDiv(t *testing.T) {
t1 := time.Now()
t2 := time.Now().Add(1 * time.Minute)
want := report.MakeMetric().
Add(t1, -2).
Add(t2, 2)
beforeDiv := report.MakeMetric().
Add(t1, -2048).
Add(t2, 2048)
have := beforeDiv.Div(1024)
if !reflect.DeepEqual(want, have) {
t.Errorf("diff: %s", test.Diff(want, have))
}
// Check the original was unmodified
checkMetric(t, beforeDiv, t1, t2, -2048, 2048)
}
type codec struct {
name string
encode func(interface{}) ([]byte, error)
decode func([]byte, interface{}) error
extraChecks func(*testing.T, codec, []byte)
}
func TestMetricMarshalling(t *testing.T) {
t1 := time.Now().UTC()
t2 := time.Now().UTC().Add(1 * time.Minute)
t3 := time.Now().UTC().Add(2 * time.Minute)
t4 := time.Now().UTC().Add(3 * time.Minute)
wantSamples := []report.Sample{
{Timestamp: t1, Value: 0.1},
{Timestamp: t2, Value: 0.2},
{Timestamp: t3, Value: 0.3},
{Timestamp: t4, Value: 0.4},
}
want := report.MakeMetric()
for _, sample := range wantSamples {
want = want.Add(sample.Timestamp, sample.Value)
}
codecs := []codec{
{
"json",
json.Marshal,
json.Unmarshal,
func(t *testing.T, codec codec, b []byte) {
var wire struct {
Samples []report.Sample `json:"samples"`
}
if err := codec.decode(b, &wire); err != nil {
t.Fatalf("[%s] %s", codec.name, err)
}
if !reflect.DeepEqual(wantSamples, wire.Samples) {
t.Errorf("[%s] diff: %sencoded: %s", codec.name, test.Diff(wantSamples, wire.Samples), b)
}
},
},
{
"gob",
func(v interface{}) ([]byte, error) {
buf := &bytes.Buffer{}
err := gob.NewEncoder(buf).Encode(v)
return buf.Bytes(), err
},
func(b []byte, v interface{}) error {
return gob.NewDecoder(bytes.NewReader(b)).Decode(v)
},
nil,
},
}
for _, codec := range codecs {
b, err := codec.encode(want)
if err != nil {
t.Fatalf("[%s] %s", codec.name, err)
}
var have report.Metric
err = codec.decode(b, &have)
if err != nil {
t.Fatalf("[%s] %s", codec.name, err)
}
if !reflect.DeepEqual(want, have) {
t.Errorf("[%s] diff: %sencoded: %s", codec.name, test.Diff(want, have), b)
}
if codec.extraChecks != nil {
codec.extraChecks(t, codec, b)
}
}
}

View File

@@ -91,6 +91,7 @@ type Node struct {
Edges EdgeMetadatas `json:"edges,omitempty"`
Controls NodeControls `json:"controls,omitempty"`
Latest LatestMap `json:"latest,omitempty"`
Metrics Metrics `json:"metrics,omitempty"`
}
// MakeNode creates a new Node with no initial metadata.
@@ -103,6 +104,7 @@ func MakeNode() Node {
Edges: EdgeMetadatas{},
Controls: MakeNodeControls(),
Latest: MakeLatestMap(),
Metrics: Metrics{},
}
}
@@ -140,6 +142,20 @@ func (n Node) WithSets(sets Sets) Node {
return result
}
// WithMetric returns a fresh copy of n, with metric merged in at key.
func (n Node) WithMetric(key string, metric Metric) Node {
result := n.Copy()
n.Metrics[key] = n.Metrics[key].Merge(metric)
return result
}
// WithMetrics returns a fresh copy of n, with metrics merged in.
func (n Node) WithMetrics(metrics Metrics) Node {
result := n.Copy()
result.Metrics = result.Metrics.Merge(metrics)
return result
}
// WithAdjacent returns a fresh copy of n, with 'a' added to Adjacency
func (n Node) WithAdjacent(a ...string) Node {
result := n.Copy()
@@ -180,6 +196,7 @@ func (n Node) Copy() Node {
cp.Edges = n.Edges.Copy()
cp.Controls = n.Controls.Copy()
cp.Latest = n.Latest.Copy()
cp.Metrics = n.Metrics.Copy()
return cp
}
@@ -194,6 +211,7 @@ func (n Node) Merge(other Node) Node {
cp.Edges = cp.Edges.Merge(other.Edges)
cp.Controls = cp.Controls.Merge(other.Controls)
cp.Latest = cp.Latest.Merge(other.Latest)
cp.Metrics = cp.Metrics.Merge(other.Metrics)
return cp
}

View File

@@ -98,6 +98,13 @@ var (
ServiceID = "ping/pongservice"
ServiceNodeID = report.MakeServiceNodeID("ping", "pongservice")
LoadMetric = report.MakeMetric().Add(Now, 0.01).WithFirst(Now.Add(-15 * time.Second))
LoadMetrics = report.Metrics{
host.Load1: LoadMetric,
host.Load5: LoadMetric,
host.Load15: LoadMetric,
}
Report = report.Report{
Endpoint: report.Topology{
Nodes: report.Nodes{
@@ -286,18 +293,24 @@ var (
ClientHostNodeID: report.MakeNodeWith(map[string]string{
"host_name": ClientHostName,
"os": "Linux",
"load": "0.01 0.01 0.01",
report.HostNodeID: ClientHostNodeID,
}).WithSets(report.Sets{
host.LocalNetworks: report.MakeStringSet("10.10.10.0/24"),
}).WithMetrics(report.Metrics{
host.Load1: LoadMetric,
host.Load5: LoadMetric,
host.Load15: LoadMetric,
}),
ServerHostNodeID: report.MakeNodeWith(map[string]string{
"host_name": ServerHostName,
"os": "Linux",
"load": "0.01 0.01 0.01",
report.HostNodeID: ServerHostNodeID,
}).WithSets(report.Sets{
host.LocalNetworks: report.MakeStringSet("10.10.10.0/24"),
}).WithMetrics(report.Metrics{
host.Load1: LoadMetric,
host.Load5: LoadMetric,
host.Load15: LoadMetric,
}),
},
},