diff --git a/extras/generate_latest_map b/extras/generate_latest_map index 3ae35b398..d41d07e23 100755 --- a/extras/generate_latest_map +++ b/extras/generate_latest_map @@ -230,6 +230,19 @@ function generate_latest_map() { return true } + // EqualIgnoringTimestamps returns true if all keys and values are the same. + func (m ${latest_map_type}) EqualIgnoringTimestamps(n ${latest_map_type}) bool { + if m.Size() != n.Size() { + return false + } + for i := range m { + if m[i].key != n[i].key || m[i].Value != n[i].Value { + return false + } + } + return true + } + // CodecEncodeSelf implements codec.Selfer. // Duplicates the output for a built-in map without generating an // intermediate copy of the data structure, to save time. Note this diff --git a/probe/probe.go b/probe/probe.go index c609f6319..4205bc9b8 100644 --- a/probe/probe.go +++ b/probe/probe.go @@ -27,6 +27,7 @@ type Probe struct { spyInterval, publishInterval time.Duration publisher ReportPublisher rateLimiter *rate.Limiter + ticksPerFullReport int noControls bool tickers []Ticker @@ -77,17 +78,19 @@ type Ticker interface { func New( spyInterval, publishInterval time.Duration, publisher ReportPublisher, + ticksPerFullReport int, noControls bool, ) *Probe { result := &Probe{ - spyInterval: spyInterval, - publishInterval: publishInterval, - publisher: publisher, - rateLimiter: rate.NewLimiter(rate.Every(publishInterval/100), 1), - noControls: noControls, - quit: make(chan struct{}), - spiedReports: make(chan report.Report, spiedReportBufferSize), - shortcutReports: make(chan report.Report, shortcutReportBufferSize), + spyInterval: spyInterval, + publishInterval: publishInterval, + publisher: publisher, + rateLimiter: rate.NewLimiter(rate.Every(publishInterval/100), 1), + ticksPerFullReport: ticksPerFullReport, + noControls: noControls, + quit: make(chan struct{}), + spiedReports: make(chan report.Report, spiedReportBufferSize), + shortcutReports: make(chan report.Report, shortcutReportBufferSize), } return result } @@ -208,7 +211,7 @@ func (p *Probe) tag(r report.Report) report.Report { return r } -func (p *Probe) drainAndPublish(rpt report.Report, rs chan report.Report) { +func (p *Probe) drainAndSanitise(rpt report.Report, rs chan report.Report) report.Report { p.rateLimiter.Wait(context.Background()) ForLoop: for { @@ -225,25 +228,44 @@ ForLoop: t.Controls = report.Controls{} }) } - if err := p.publisher.Publish(rpt); err != nil { - log.Infof("Publish: %v", err) - } + return rpt } func (p *Probe) publishLoop() { defer p.done.Done() pubTick := time.Tick(p.publishInterval) + publishCount := 0 + var lastFullReport report.Report for { + var err error select { case <-pubTick: - p.drainAndPublish(report.MakeReport(), p.spiedReports) + rpt := p.drainAndSanitise(report.MakeReport(), p.spiedReports) + fullReport := (publishCount % p.ticksPerFullReport) == 0 + if !fullReport { + rpt.UnsafeUnMerge(lastFullReport) + } + err = p.publisher.Publish(rpt) + if err == nil { + if fullReport { + lastFullReport = rpt + } + publishCount++ + } else { + // If we failed to send then drop back to full report next time + publishCount = 0 + } case rpt := <-p.shortcutReports: - p.drainAndPublish(rpt, p.shortcutReports) + rpt = p.drainAndSanitise(rpt, p.shortcutReports) + err = p.publisher.Publish(rpt) case <-p.quit: return } + if err != nil { + log.Infof("Publish: %v", err) + } } } diff --git a/probe/probe_internal_test.go b/probe/probe_internal_test.go index 76e9502a4..97af02904 100644 --- a/probe/probe_internal_test.go +++ b/probe/probe_internal_test.go @@ -16,7 +16,7 @@ func TestApply(t *testing.T) { endpointNode = report.MakeNodeWith(endpointNodeID, map[string]string{"5": "6"}) ) - p := New(0, 0, nil, false) + p := New(0, 0, nil, 1, false) p.AddTagger(NewTopologyTagger()) r := report.MakeReport() @@ -72,7 +72,7 @@ func TestProbe(t *testing.T) { pub := mockPublisher{make(chan report.Report, 10)} - p := New(10*time.Millisecond, 100*time.Millisecond, pub, false) + p := New(10*time.Millisecond, 100*time.Millisecond, pub, 1, false) p.AddReporter(mockReporter{want}) p.Start() defer p.Stop() diff --git a/prog/main.go b/prog/main.go index 781790857..bd4e488c0 100644 --- a/prog/main.go +++ b/prog/main.go @@ -100,6 +100,7 @@ type probeFlags struct { token string httpListen string publishInterval time.Duration + ticksPerFullReport int spyInterval time.Duration pluginsRoot string insecure bool @@ -297,6 +298,7 @@ func setupFlags(flags *flags) { flag.StringVar(&flags.probe.httpListen, "probe.http.listen", "", "listen address for HTTP profiling and instrumentation server") flag.DurationVar(&flags.probe.publishInterval, "probe.publish.interval", 3*time.Second, "publish (output) interval") flag.DurationVar(&flags.probe.spyInterval, "probe.spy.interval", time.Second, "spy (scan) interval") + flag.IntVar(&flags.probe.ticksPerFullReport, "probe.full-report-every", 3, "publish full report every N times, deltas in between. Make sure N < (app.window / probe.publish.interval)") flag.StringVar(&flags.probe.pluginsRoot, "probe.plugins.root", "/var/run/scope/plugins", "Root directory to search for plugins") flag.BoolVar(&flags.probe.noControls, "probe.no-controls", false, "Disable controls (e.g. start/stop containers, terminals, logs ...)") flag.BoolVar(&flags.probe.noCommandLineArguments, "probe.omit.cmd-args", false, "Disable collection of command-line arguments") diff --git a/prog/probe.go b/prog/probe.go index 9df88d22e..9814d76c9 100644 --- a/prog/probe.go +++ b/prog/probe.go @@ -240,7 +240,7 @@ func probeMain(flags probeFlags, targets []appclient.Target) { clients = multiClients } - p := probe.New(flags.spyInterval, flags.publishInterval, clients, flags.noControls) + p := probe.New(flags.spyInterval, flags.publishInterval, clients, flags.ticksPerFullReport, flags.noControls) p.AddTagger(probe.NewTopologyTagger()) var processCache *process.CachingWalker diff --git a/report/id_list.go b/report/id_list.go index e74901e7a..1b6f10477 100644 --- a/report/id_list.go +++ b/report/id_list.go @@ -27,6 +27,11 @@ func (a IDList) Merge(b IDList) IDList { return IDList(merged) } +// Equal returns true if a and b have the same contents +func (a IDList) Equal(b IDList) bool { + return StringSet(a).Equal(StringSet(b)) +} + // Contains returns true if id is in the list. func (a IDList) Contains(id string) bool { return StringSet(a).Contains(id) diff --git a/report/latest_map_generated.go b/report/latest_map_generated.go index 97ce72382..c084cb92d 100644 --- a/report/latest_map_generated.go +++ b/report/latest_map_generated.go @@ -198,6 +198,19 @@ func (m StringLatestMap) DeepEqual(n StringLatestMap) bool { return true } +// EqualIgnoringTimestamps returns true if all keys and values are the same. +func (m StringLatestMap) EqualIgnoringTimestamps(n StringLatestMap) bool { + if m.Size() != n.Size() { + return false + } + for i := range m { + if m[i].key != n[i].key || m[i].Value != n[i].Value { + return false + } + } + return true +} + // CodecEncodeSelf implements codec.Selfer. // Duplicates the output for a built-in map without generating an // intermediate copy of the data structure, to save time. Note this @@ -450,6 +463,19 @@ func (m NodeControlDataLatestMap) DeepEqual(n NodeControlDataLatestMap) bool { return true } +// EqualIgnoringTimestamps returns true if all keys and values are the same. +func (m NodeControlDataLatestMap) EqualIgnoringTimestamps(n NodeControlDataLatestMap) bool { + if m.Size() != n.Size() { + return false + } + for i := range m { + if m[i].key != n[i].key || m[i].Value != n[i].Value { + return false + } + } + return true +} + // CodecEncodeSelf implements codec.Selfer. // Duplicates the output for a built-in map without generating an // intermediate copy of the data structure, to save time. Note this diff --git a/report/node.go b/report/node.go index 3b68f43a4..c64d70687 100644 --- a/report/node.go +++ b/report/node.go @@ -198,3 +198,47 @@ func (n Node) Merge(other Node) Node { Children: n.Children.Merge(other.Children), } } + +// UnsafeUnMerge removes data from n that would be added by merging other, +// modifying the original. +// returns true if n.Merge(other) is the same as n +func (n *Node) UnsafeUnMerge(other Node) bool { + // If it's not the same ID and topology then just bail out + if n.ID != other.ID || n.Topology != other.Topology { + return false + } + n.ID = "" + n.Topology = "" + remove := true + // We either keep a whole section or drop it if anything changed + // - a trade-off of some extra data size in favour of faster simpler code. + // (in practice, very few values reported by Scope probes do change over time) + if n.LatestControls.EqualIgnoringTimestamps(other.LatestControls) { + n.LatestControls = nil + } else { + remove = false + } + if n.Latest.EqualIgnoringTimestamps(other.Latest) { + n.Latest = nil + } else { + remove = false + } + if n.Sets.DeepEqual(other.Sets) { + n.Sets = MakeSets() + } else { + remove = false + } + if n.Parents.DeepEqual(other.Parents) { + n.Parents = MakeSets() + } else { + remove = false + } + if n.Adjacency.Equal(other.Adjacency) { + n.Adjacency = nil + } else { + remove = false + } + // counters and children are not created in the probe so we don't check those + // metrics don't overlap so just check if we have any + return remove && len(n.Metrics) == 0 +} diff --git a/report/report.go b/report/report.go index c7c1f0979..83a2c0a0f 100644 --- a/report/report.go +++ b/report/report.go @@ -188,7 +188,7 @@ type Report struct { // Job represent all Kubernetes Job on hosts running probes. Job Topology - DNS DNSRecords + DNS DNSRecords `json:"nodes,omitempty" deepequal:"nil==empty"` // Sampling data for this report. Sampling Sampling @@ -351,6 +351,16 @@ func (r *Report) UnsafeMerge(other Report) { }) } +// UnsafeUnMerge removes any information from r that would be added by merging other. +// The original is modified. +func (r *Report) UnsafeUnMerge(other Report) { + // TODO: DNS, Sampling, Plugins + r.Window = r.Window - other.Window + r.WalkPairedTopologies(&other, func(ourTopology, theirTopology *Topology) { + ourTopology.UnsafeUnMerge(*theirTopology) + }) +} + // WalkTopologies iterates through the Topologies of the report, // potentially modifying them func (r *Report) WalkTopologies(f func(*Topology)) { diff --git a/report/report_test.go b/report/report_test.go index 6ecbc1c8d..80717e7f6 100644 --- a/report/report_test.go +++ b/report/report_test.go @@ -98,3 +98,41 @@ func TestReportUpgrade(t *testing.T) { t.Error(test.Diff(expected, got)) } } + +func TestReportUnMerge(t *testing.T) { + n1 := report.MakeNodeWith("foo", map[string]string{"foo": "bar"}) + r1 := makeTestReport() + r2 := r1.Copy() + r2.Container.AddNode(n1) + // r2 should be the same as r1 with just the foo-bar node added + r2.UnsafeUnMerge(r1) + // Now r2 should have everything removed except that one node, and its ID + expected := report.Report{ + ID: r2.ID, + Container: report.Topology{ + Nodes: report.Nodes{ + "foo": n1, + }, + }, + } + + // Now test report with two nodes unmerged on report with one + r1.Container.AddNode(n1) + r2 = r1.Copy() + n2 := report.MakeNodeWith("foo2", map[string]string{"ping": "pong"}) + r2.Container.AddNode(n2) + // r2 should be the same as r1 with one extra node + r2.UnsafeUnMerge(r1) + expected = report.Report{ + ID: r2.ID, + Container: report.Topology{ + Nodes: report.Nodes{ + "foo2": n2, + }, + }, + } + + if !s_reflect.DeepEqual(expected, r2) { + t.Error(test.Diff(expected, r2)) + } +} diff --git a/report/topology.go b/report/topology.go index 3fd8b916c..15e91ac1c 100644 --- a/report/topology.go +++ b/report/topology.go @@ -208,6 +208,27 @@ func (t *Topology) UnsafeMerge(other Topology) { t.TableTemplates = t.TableTemplates.Merge(other.TableTemplates) } +// UnsafeUnMerge removes any information from t that would be added by merging other, +// modifying the original. +func (t *Topology) UnsafeUnMerge(other Topology) { + if t.Shape == other.Shape { + t.Shape = "" + } + if t.Label == other.Label && t.LabelPlural == other.LabelPlural { + t.Label, t.LabelPlural = "", "" + } + if t.Tag == other.Tag { + t.Tag = "" + } + t.Nodes.UnsafeUnMerge(other.Nodes) + // TODO Controls + // NOTE: taking a shortcut and assuming templates are static, which they have always been in Scope + // If you break that assumption please change this. + t.MetadataTemplates = nil + t.MetricTemplates = nil + t.TableTemplates = nil +} + // Nodes is a collection of nodes in a topology. Keys are node IDs. // TODO(pb): type Topology map[string]Node type Nodes map[string]Node @@ -249,6 +270,21 @@ func (n *Nodes) UnsafeMerge(other Nodes) { } } +// UnsafeUnMerge removes nodes from n that would be added by merging other, +// modifying the original. +func (n *Nodes) UnsafeUnMerge(other Nodes) { + for k, node := range *n { + if otherNode, ok := (other)[k]; ok { + remove := node.UnsafeUnMerge(otherNode) + if remove { + delete(*n, k) + } else { + (*n)[k] = node + } + } + } +} + // Validate checks the topology for various inconsistencies. func (t Topology) Validate() error { errs := []string{}