From eff5a1f9f7adae3698b41a67a7a7427f8f393b2e Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 13 Sep 2019 07:53:00 +0000 Subject: [PATCH 1/5] Refactor: pull Publish() call up to publishLoop() --- probe/probe.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/probe/probe.go b/probe/probe.go index c609f6319..0af1f19cc 100644 --- a/probe/probe.go +++ b/probe/probe.go @@ -208,7 +208,7 @@ func (p *Probe) tag(r report.Report) report.Report { return r } -func (p *Probe) drainAndPublish(rpt report.Report, rs chan report.Report) { +func (p *Probe) drainAndSanitise(rpt report.Report, rs chan report.Report) report.Report { p.rateLimiter.Wait(context.Background()) ForLoop: for { @@ -225,9 +225,7 @@ ForLoop: t.Controls = report.Controls{} }) } - if err := p.publisher.Publish(rpt); err != nil { - log.Infof("Publish: %v", err) - } + return rpt } func (p *Probe) publishLoop() { @@ -235,15 +233,21 @@ func (p *Probe) publishLoop() { pubTick := time.Tick(p.publishInterval) for { + var err error select { case <-pubTick: - p.drainAndPublish(report.MakeReport(), p.spiedReports) + rpt := p.drainAndSanitise(report.MakeReport(), p.spiedReports) + err = p.publisher.Publish(rpt) case rpt := <-p.shortcutReports: - p.drainAndPublish(rpt, p.shortcutReports) + rpt = p.drainAndSanitise(rpt, p.shortcutReports) + err = p.publisher.Publish(rpt) case <-p.quit: return } + if err != nil { + log.Infof("Publish: %v", err) + } } } From b6d5594f9f1562e7413d8116e265cea0ea49aad9 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 13 Sep 2019 15:55:05 +0000 Subject: [PATCH 2/5] perf(probe): publish delta reports to reduce data size Similar to video compression which uses key-frames and differences between them: every N publishes we send a full report, but inbetween we only send what has changed. Fairly simple approach in the probe - hold on to the last full report, and for the deltas remove anything that would be merged in from the full report. On the receiving side in the app it already merges a set of reports together to produce the final output for rendering, so provided N is smaller than that set we don't need to do anything different. Deltas don't need to represent nodes that have disappeared - an earlier full node will have that node so it would be merged into the final output anyway. --- extras/generate_latest_map | 13 ++++++++++ probe/probe.go | 34 +++++++++++++++++++------- probe/probe_internal_test.go | 4 ++-- prog/main.go | 2 ++ prog/probe.go | 2 +- report/id_list.go | 5 ++++ report/latest_map_generated.go | 26 ++++++++++++++++++++ report/node.go | 44 ++++++++++++++++++++++++++++++++++ report/report.go | 10 ++++++++ report/topology.go | 36 ++++++++++++++++++++++++++++ 10 files changed, 165 insertions(+), 11 deletions(-) diff --git a/extras/generate_latest_map b/extras/generate_latest_map index 3ae35b398..d41d07e23 100755 --- a/extras/generate_latest_map +++ b/extras/generate_latest_map @@ -230,6 +230,19 @@ function generate_latest_map() { return true } + // EqualIgnoringTimestamps returns true if all keys and values are the same. + func (m ${latest_map_type}) EqualIgnoringTimestamps(n ${latest_map_type}) bool { + if m.Size() != n.Size() { + return false + } + for i := range m { + if m[i].key != n[i].key || m[i].Value != n[i].Value { + return false + } + } + return true + } + // CodecEncodeSelf implements codec.Selfer. // Duplicates the output for a built-in map without generating an // intermediate copy of the data structure, to save time. Note this diff --git a/probe/probe.go b/probe/probe.go index 0af1f19cc..4205bc9b8 100644 --- a/probe/probe.go +++ b/probe/probe.go @@ -27,6 +27,7 @@ type Probe struct { spyInterval, publishInterval time.Duration publisher ReportPublisher rateLimiter *rate.Limiter + ticksPerFullReport int noControls bool tickers []Ticker @@ -77,17 +78,19 @@ type Ticker interface { func New( spyInterval, publishInterval time.Duration, publisher ReportPublisher, + ticksPerFullReport int, noControls bool, ) *Probe { result := &Probe{ - spyInterval: spyInterval, - publishInterval: publishInterval, - publisher: publisher, - rateLimiter: rate.NewLimiter(rate.Every(publishInterval/100), 1), - noControls: noControls, - quit: make(chan struct{}), - spiedReports: make(chan report.Report, spiedReportBufferSize), - shortcutReports: make(chan report.Report, shortcutReportBufferSize), + spyInterval: spyInterval, + publishInterval: publishInterval, + publisher: publisher, + rateLimiter: rate.NewLimiter(rate.Every(publishInterval/100), 1), + ticksPerFullReport: ticksPerFullReport, + noControls: noControls, + quit: make(chan struct{}), + spiedReports: make(chan report.Report, spiedReportBufferSize), + shortcutReports: make(chan report.Report, shortcutReportBufferSize), } return result } @@ -231,13 +234,28 @@ ForLoop: func (p *Probe) publishLoop() { defer p.done.Done() pubTick := time.Tick(p.publishInterval) + publishCount := 0 + var lastFullReport report.Report for { var err error select { case <-pubTick: rpt := p.drainAndSanitise(report.MakeReport(), p.spiedReports) + fullReport := (publishCount % p.ticksPerFullReport) == 0 + if !fullReport { + rpt.UnsafeUnMerge(lastFullReport) + } err = p.publisher.Publish(rpt) + if err == nil { + if fullReport { + lastFullReport = rpt + } + publishCount++ + } else { + // If we failed to send then drop back to full report next time + publishCount = 0 + } case rpt := <-p.shortcutReports: rpt = p.drainAndSanitise(rpt, p.shortcutReports) diff --git a/probe/probe_internal_test.go b/probe/probe_internal_test.go index 76e9502a4..97af02904 100644 --- a/probe/probe_internal_test.go +++ b/probe/probe_internal_test.go @@ -16,7 +16,7 @@ func TestApply(t *testing.T) { endpointNode = report.MakeNodeWith(endpointNodeID, map[string]string{"5": "6"}) ) - p := New(0, 0, nil, false) + p := New(0, 0, nil, 1, false) p.AddTagger(NewTopologyTagger()) r := report.MakeReport() @@ -72,7 +72,7 @@ func TestProbe(t *testing.T) { pub := mockPublisher{make(chan report.Report, 10)} - p := New(10*time.Millisecond, 100*time.Millisecond, pub, false) + p := New(10*time.Millisecond, 100*time.Millisecond, pub, 1, false) p.AddReporter(mockReporter{want}) p.Start() defer p.Stop() diff --git a/prog/main.go b/prog/main.go index 781790857..7920f13dd 100644 --- a/prog/main.go +++ b/prog/main.go @@ -100,6 +100,7 @@ type probeFlags struct { token string httpListen string publishInterval time.Duration + ticksPerFullReport int spyInterval time.Duration pluginsRoot string insecure bool @@ -297,6 +298,7 @@ func setupFlags(flags *flags) { flag.StringVar(&flags.probe.httpListen, "probe.http.listen", "", "listen address for HTTP profiling and instrumentation server") flag.DurationVar(&flags.probe.publishInterval, "probe.publish.interval", 3*time.Second, "publish (output) interval") flag.DurationVar(&flags.probe.spyInterval, "probe.spy.interval", time.Second, "spy (scan) interval") + flag.IntVar(&flags.probe.ticksPerFullReport, "probe.full-report-every", 3, "publish full report every N times, deltas in between") flag.StringVar(&flags.probe.pluginsRoot, "probe.plugins.root", "/var/run/scope/plugins", "Root directory to search for plugins") flag.BoolVar(&flags.probe.noControls, "probe.no-controls", false, "Disable controls (e.g. start/stop containers, terminals, logs ...)") flag.BoolVar(&flags.probe.noCommandLineArguments, "probe.omit.cmd-args", false, "Disable collection of command-line arguments") diff --git a/prog/probe.go b/prog/probe.go index 9df88d22e..9814d76c9 100644 --- a/prog/probe.go +++ b/prog/probe.go @@ -240,7 +240,7 @@ func probeMain(flags probeFlags, targets []appclient.Target) { clients = multiClients } - p := probe.New(flags.spyInterval, flags.publishInterval, clients, flags.noControls) + p := probe.New(flags.spyInterval, flags.publishInterval, clients, flags.ticksPerFullReport, flags.noControls) p.AddTagger(probe.NewTopologyTagger()) var processCache *process.CachingWalker diff --git a/report/id_list.go b/report/id_list.go index e74901e7a..1b6f10477 100644 --- a/report/id_list.go +++ b/report/id_list.go @@ -27,6 +27,11 @@ func (a IDList) Merge(b IDList) IDList { return IDList(merged) } +// Equal returns true if a and b have the same contents +func (a IDList) Equal(b IDList) bool { + return StringSet(a).Equal(StringSet(b)) +} + // Contains returns true if id is in the list. func (a IDList) Contains(id string) bool { return StringSet(a).Contains(id) diff --git a/report/latest_map_generated.go b/report/latest_map_generated.go index 97ce72382..c084cb92d 100644 --- a/report/latest_map_generated.go +++ b/report/latest_map_generated.go @@ -198,6 +198,19 @@ func (m StringLatestMap) DeepEqual(n StringLatestMap) bool { return true } +// EqualIgnoringTimestamps returns true if all keys and values are the same. +func (m StringLatestMap) EqualIgnoringTimestamps(n StringLatestMap) bool { + if m.Size() != n.Size() { + return false + } + for i := range m { + if m[i].key != n[i].key || m[i].Value != n[i].Value { + return false + } + } + return true +} + // CodecEncodeSelf implements codec.Selfer. // Duplicates the output for a built-in map without generating an // intermediate copy of the data structure, to save time. Note this @@ -450,6 +463,19 @@ func (m NodeControlDataLatestMap) DeepEqual(n NodeControlDataLatestMap) bool { return true } +// EqualIgnoringTimestamps returns true if all keys and values are the same. +func (m NodeControlDataLatestMap) EqualIgnoringTimestamps(n NodeControlDataLatestMap) bool { + if m.Size() != n.Size() { + return false + } + for i := range m { + if m[i].key != n[i].key || m[i].Value != n[i].Value { + return false + } + } + return true +} + // CodecEncodeSelf implements codec.Selfer. // Duplicates the output for a built-in map without generating an // intermediate copy of the data structure, to save time. Note this diff --git a/report/node.go b/report/node.go index 3b68f43a4..c64d70687 100644 --- a/report/node.go +++ b/report/node.go @@ -198,3 +198,47 @@ func (n Node) Merge(other Node) Node { Children: n.Children.Merge(other.Children), } } + +// UnsafeUnMerge removes data from n that would be added by merging other, +// modifying the original. +// returns true if n.Merge(other) is the same as n +func (n *Node) UnsafeUnMerge(other Node) bool { + // If it's not the same ID and topology then just bail out + if n.ID != other.ID || n.Topology != other.Topology { + return false + } + n.ID = "" + n.Topology = "" + remove := true + // We either keep a whole section or drop it if anything changed + // - a trade-off of some extra data size in favour of faster simpler code. + // (in practice, very few values reported by Scope probes do change over time) + if n.LatestControls.EqualIgnoringTimestamps(other.LatestControls) { + n.LatestControls = nil + } else { + remove = false + } + if n.Latest.EqualIgnoringTimestamps(other.Latest) { + n.Latest = nil + } else { + remove = false + } + if n.Sets.DeepEqual(other.Sets) { + n.Sets = MakeSets() + } else { + remove = false + } + if n.Parents.DeepEqual(other.Parents) { + n.Parents = MakeSets() + } else { + remove = false + } + if n.Adjacency.Equal(other.Adjacency) { + n.Adjacency = nil + } else { + remove = false + } + // counters and children are not created in the probe so we don't check those + // metrics don't overlap so just check if we have any + return remove && len(n.Metrics) == 0 +} diff --git a/report/report.go b/report/report.go index c7c1f0979..f58839a94 100644 --- a/report/report.go +++ b/report/report.go @@ -351,6 +351,16 @@ func (r *Report) UnsafeMerge(other Report) { }) } +// UnsafeUnMerge removes any information from r that would be added by merging other. +// The original is modified. +func (r *Report) UnsafeUnMerge(other Report) { + // TODO: DNS, Sampling, Plugins + r.Window = r.Window - other.Window + r.WalkPairedTopologies(&other, func(ourTopology, theirTopology *Topology) { + ourTopology.UnsafeUnMerge(*theirTopology) + }) +} + // WalkTopologies iterates through the Topologies of the report, // potentially modifying them func (r *Report) WalkTopologies(f func(*Topology)) { diff --git a/report/topology.go b/report/topology.go index 3fd8b916c..15e91ac1c 100644 --- a/report/topology.go +++ b/report/topology.go @@ -208,6 +208,27 @@ func (t *Topology) UnsafeMerge(other Topology) { t.TableTemplates = t.TableTemplates.Merge(other.TableTemplates) } +// UnsafeUnMerge removes any information from t that would be added by merging other, +// modifying the original. +func (t *Topology) UnsafeUnMerge(other Topology) { + if t.Shape == other.Shape { + t.Shape = "" + } + if t.Label == other.Label && t.LabelPlural == other.LabelPlural { + t.Label, t.LabelPlural = "", "" + } + if t.Tag == other.Tag { + t.Tag = "" + } + t.Nodes.UnsafeUnMerge(other.Nodes) + // TODO Controls + // NOTE: taking a shortcut and assuming templates are static, which they have always been in Scope + // If you break that assumption please change this. + t.MetadataTemplates = nil + t.MetricTemplates = nil + t.TableTemplates = nil +} + // Nodes is a collection of nodes in a topology. Keys are node IDs. // TODO(pb): type Topology map[string]Node type Nodes map[string]Node @@ -249,6 +270,21 @@ func (n *Nodes) UnsafeMerge(other Nodes) { } } +// UnsafeUnMerge removes nodes from n that would be added by merging other, +// modifying the original. +func (n *Nodes) UnsafeUnMerge(other Nodes) { + for k, node := range *n { + if otherNode, ok := (other)[k]; ok { + remove := node.UnsafeUnMerge(otherNode) + if remove { + delete(*n, k) + } else { + (*n)[k] = node + } + } + } +} + // Validate checks the topology for various inconsistencies. func (t Topology) Validate() error { errs := []string{} From 951629af292ad2964a73766840d8d2e3ac983e99 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Tue, 17 Sep 2019 15:31:58 +0000 Subject: [PATCH 3/5] chore: allow Report.DNS field to be nil Primarily to help when writing tests; may give a tiny performance benefit. --- report/report.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/report/report.go b/report/report.go index f58839a94..83a2c0a0f 100644 --- a/report/report.go +++ b/report/report.go @@ -188,7 +188,7 @@ type Report struct { // Job represent all Kubernetes Job on hosts running probes. Job Topology - DNS DNSRecords + DNS DNSRecords `json:"nodes,omitempty" deepequal:"nil==empty"` // Sampling data for this report. Sampling Sampling From da030d1618d0cd4e81be91413d02211e2371c0f0 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Tue, 17 Sep 2019 15:34:31 +0000 Subject: [PATCH 4/5] test: add TestReportUnMerge() Testing the new delta-report internals --- report/report_test.go | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/report/report_test.go b/report/report_test.go index 6ecbc1c8d..80717e7f6 100644 --- a/report/report_test.go +++ b/report/report_test.go @@ -98,3 +98,41 @@ func TestReportUpgrade(t *testing.T) { t.Error(test.Diff(expected, got)) } } + +func TestReportUnMerge(t *testing.T) { + n1 := report.MakeNodeWith("foo", map[string]string{"foo": "bar"}) + r1 := makeTestReport() + r2 := r1.Copy() + r2.Container.AddNode(n1) + // r2 should be the same as r1 with just the foo-bar node added + r2.UnsafeUnMerge(r1) + // Now r2 should have everything removed except that one node, and its ID + expected := report.Report{ + ID: r2.ID, + Container: report.Topology{ + Nodes: report.Nodes{ + "foo": n1, + }, + }, + } + + // Now test report with two nodes unmerged on report with one + r1.Container.AddNode(n1) + r2 = r1.Copy() + n2 := report.MakeNodeWith("foo2", map[string]string{"ping": "pong"}) + r2.Container.AddNode(n2) + // r2 should be the same as r1 with one extra node + r2.UnsafeUnMerge(r1) + expected = report.Report{ + ID: r2.ID, + Container: report.Topology{ + Nodes: report.Nodes{ + "foo2": n2, + }, + }, + } + + if !s_reflect.DeepEqual(expected, r2) { + t.Error(test.Diff(expected, r2)) + } +} From 395282b0439687e79f2e376642cb2e1484ebe9ee Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Wed, 18 Sep 2019 11:09:49 +0000 Subject: [PATCH 5/5] help: add note on constraint to -full-report-every argument --- prog/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/prog/main.go b/prog/main.go index 7920f13dd..bd4e488c0 100644 --- a/prog/main.go +++ b/prog/main.go @@ -298,7 +298,7 @@ func setupFlags(flags *flags) { flag.StringVar(&flags.probe.httpListen, "probe.http.listen", "", "listen address for HTTP profiling and instrumentation server") flag.DurationVar(&flags.probe.publishInterval, "probe.publish.interval", 3*time.Second, "publish (output) interval") flag.DurationVar(&flags.probe.spyInterval, "probe.spy.interval", time.Second, "spy (scan) interval") - flag.IntVar(&flags.probe.ticksPerFullReport, "probe.full-report-every", 3, "publish full report every N times, deltas in between") + flag.IntVar(&flags.probe.ticksPerFullReport, "probe.full-report-every", 3, "publish full report every N times, deltas in between. Make sure N < (app.window / probe.publish.interval)") flag.StringVar(&flags.probe.pluginsRoot, "probe.plugins.root", "/var/run/scope/plugins", "Root directory to search for plugins") flag.BoolVar(&flags.probe.noControls, "probe.no-controls", false, "Disable controls (e.g. start/stop containers, terminals, logs ...)") flag.BoolVar(&flags.probe.noCommandLineArguments, "probe.omit.cmd-args", false, "Disable collection of command-line arguments")