perf(probe): publish delta reports to reduce data size

Similar to video compression which uses key-frames and differences
between them: every N publishes we send a full report, but inbetween
we only send what has changed.

Fairly simple approach in the probe - hold on to the last full report,
and for the deltas remove anything that would be merged in from the
full report.

On the receiving side in the app it already merges a set of reports
together to produce the final output for rendering, so provided N is
smaller than that set we don't need to do anything different.

Deltas don't need to represent nodes that have disappeared - an
earlier full node will have that node so it would be merged into the
final output anyway.
This commit is contained in:
Bryan Boreham
2019-09-13 15:55:05 +00:00
parent eff5a1f9f7
commit b6d5594f9f
10 changed files with 165 additions and 11 deletions

View File

@@ -230,6 +230,19 @@ function generate_latest_map() {
return true
}
// EqualIgnoringTimestamps returns true if all keys and values are the same.
func (m ${latest_map_type}) EqualIgnoringTimestamps(n ${latest_map_type}) bool {
if m.Size() != n.Size() {
return false
}
for i := range m {
if m[i].key != n[i].key || m[i].Value != n[i].Value {
return false
}
}
return true
}
// CodecEncodeSelf implements codec.Selfer.
// Duplicates the output for a built-in map without generating an
// intermediate copy of the data structure, to save time. Note this

View File

@@ -27,6 +27,7 @@ type Probe struct {
spyInterval, publishInterval time.Duration
publisher ReportPublisher
rateLimiter *rate.Limiter
ticksPerFullReport int
noControls bool
tickers []Ticker
@@ -77,17 +78,19 @@ type Ticker interface {
func New(
spyInterval, publishInterval time.Duration,
publisher ReportPublisher,
ticksPerFullReport int,
noControls bool,
) *Probe {
result := &Probe{
spyInterval: spyInterval,
publishInterval: publishInterval,
publisher: publisher,
rateLimiter: rate.NewLimiter(rate.Every(publishInterval/100), 1),
noControls: noControls,
quit: make(chan struct{}),
spiedReports: make(chan report.Report, spiedReportBufferSize),
shortcutReports: make(chan report.Report, shortcutReportBufferSize),
spyInterval: spyInterval,
publishInterval: publishInterval,
publisher: publisher,
rateLimiter: rate.NewLimiter(rate.Every(publishInterval/100), 1),
ticksPerFullReport: ticksPerFullReport,
noControls: noControls,
quit: make(chan struct{}),
spiedReports: make(chan report.Report, spiedReportBufferSize),
shortcutReports: make(chan report.Report, shortcutReportBufferSize),
}
return result
}
@@ -231,13 +234,28 @@ ForLoop:
func (p *Probe) publishLoop() {
defer p.done.Done()
pubTick := time.Tick(p.publishInterval)
publishCount := 0
var lastFullReport report.Report
for {
var err error
select {
case <-pubTick:
rpt := p.drainAndSanitise(report.MakeReport(), p.spiedReports)
fullReport := (publishCount % p.ticksPerFullReport) == 0
if !fullReport {
rpt.UnsafeUnMerge(lastFullReport)
}
err = p.publisher.Publish(rpt)
if err == nil {
if fullReport {
lastFullReport = rpt
}
publishCount++
} else {
// If we failed to send then drop back to full report next time
publishCount = 0
}
case rpt := <-p.shortcutReports:
rpt = p.drainAndSanitise(rpt, p.shortcutReports)

View File

@@ -16,7 +16,7 @@ func TestApply(t *testing.T) {
endpointNode = report.MakeNodeWith(endpointNodeID, map[string]string{"5": "6"})
)
p := New(0, 0, nil, false)
p := New(0, 0, nil, 1, false)
p.AddTagger(NewTopologyTagger())
r := report.MakeReport()
@@ -72,7 +72,7 @@ func TestProbe(t *testing.T) {
pub := mockPublisher{make(chan report.Report, 10)}
p := New(10*time.Millisecond, 100*time.Millisecond, pub, false)
p := New(10*time.Millisecond, 100*time.Millisecond, pub, 1, false)
p.AddReporter(mockReporter{want})
p.Start()
defer p.Stop()

View File

@@ -100,6 +100,7 @@ type probeFlags struct {
token string
httpListen string
publishInterval time.Duration
ticksPerFullReport int
spyInterval time.Duration
pluginsRoot string
insecure bool
@@ -297,6 +298,7 @@ func setupFlags(flags *flags) {
flag.StringVar(&flags.probe.httpListen, "probe.http.listen", "", "listen address for HTTP profiling and instrumentation server")
flag.DurationVar(&flags.probe.publishInterval, "probe.publish.interval", 3*time.Second, "publish (output) interval")
flag.DurationVar(&flags.probe.spyInterval, "probe.spy.interval", time.Second, "spy (scan) interval")
flag.IntVar(&flags.probe.ticksPerFullReport, "probe.full-report-every", 3, "publish full report every N times, deltas in between")
flag.StringVar(&flags.probe.pluginsRoot, "probe.plugins.root", "/var/run/scope/plugins", "Root directory to search for plugins")
flag.BoolVar(&flags.probe.noControls, "probe.no-controls", false, "Disable controls (e.g. start/stop containers, terminals, logs ...)")
flag.BoolVar(&flags.probe.noCommandLineArguments, "probe.omit.cmd-args", false, "Disable collection of command-line arguments")

View File

@@ -240,7 +240,7 @@ func probeMain(flags probeFlags, targets []appclient.Target) {
clients = multiClients
}
p := probe.New(flags.spyInterval, flags.publishInterval, clients, flags.noControls)
p := probe.New(flags.spyInterval, flags.publishInterval, clients, flags.ticksPerFullReport, flags.noControls)
p.AddTagger(probe.NewTopologyTagger())
var processCache *process.CachingWalker

View File

@@ -27,6 +27,11 @@ func (a IDList) Merge(b IDList) IDList {
return IDList(merged)
}
// Equal returns true if a and b have the same contents
func (a IDList) Equal(b IDList) bool {
return StringSet(a).Equal(StringSet(b))
}
// Contains returns true if id is in the list.
func (a IDList) Contains(id string) bool {
return StringSet(a).Contains(id)

View File

@@ -198,6 +198,19 @@ func (m StringLatestMap) DeepEqual(n StringLatestMap) bool {
return true
}
// EqualIgnoringTimestamps returns true if all keys and values are the same.
func (m StringLatestMap) EqualIgnoringTimestamps(n StringLatestMap) bool {
if m.Size() != n.Size() {
return false
}
for i := range m {
if m[i].key != n[i].key || m[i].Value != n[i].Value {
return false
}
}
return true
}
// CodecEncodeSelf implements codec.Selfer.
// Duplicates the output for a built-in map without generating an
// intermediate copy of the data structure, to save time. Note this
@@ -450,6 +463,19 @@ func (m NodeControlDataLatestMap) DeepEqual(n NodeControlDataLatestMap) bool {
return true
}
// EqualIgnoringTimestamps returns true if all keys and values are the same.
func (m NodeControlDataLatestMap) EqualIgnoringTimestamps(n NodeControlDataLatestMap) bool {
if m.Size() != n.Size() {
return false
}
for i := range m {
if m[i].key != n[i].key || m[i].Value != n[i].Value {
return false
}
}
return true
}
// CodecEncodeSelf implements codec.Selfer.
// Duplicates the output for a built-in map without generating an
// intermediate copy of the data structure, to save time. Note this

View File

@@ -198,3 +198,47 @@ func (n Node) Merge(other Node) Node {
Children: n.Children.Merge(other.Children),
}
}
// UnsafeUnMerge removes data from n that would be added by merging other,
// modifying the original.
// returns true if n.Merge(other) is the same as n
func (n *Node) UnsafeUnMerge(other Node) bool {
// If it's not the same ID and topology then just bail out
if n.ID != other.ID || n.Topology != other.Topology {
return false
}
n.ID = ""
n.Topology = ""
remove := true
// We either keep a whole section or drop it if anything changed
// - a trade-off of some extra data size in favour of faster simpler code.
// (in practice, very few values reported by Scope probes do change over time)
if n.LatestControls.EqualIgnoringTimestamps(other.LatestControls) {
n.LatestControls = nil
} else {
remove = false
}
if n.Latest.EqualIgnoringTimestamps(other.Latest) {
n.Latest = nil
} else {
remove = false
}
if n.Sets.DeepEqual(other.Sets) {
n.Sets = MakeSets()
} else {
remove = false
}
if n.Parents.DeepEqual(other.Parents) {
n.Parents = MakeSets()
} else {
remove = false
}
if n.Adjacency.Equal(other.Adjacency) {
n.Adjacency = nil
} else {
remove = false
}
// counters and children are not created in the probe so we don't check those
// metrics don't overlap so just check if we have any
return remove && len(n.Metrics) == 0
}

View File

@@ -351,6 +351,16 @@ func (r *Report) UnsafeMerge(other Report) {
})
}
// UnsafeUnMerge removes any information from r that would be added by merging other.
// The original is modified.
func (r *Report) UnsafeUnMerge(other Report) {
// TODO: DNS, Sampling, Plugins
r.Window = r.Window - other.Window
r.WalkPairedTopologies(&other, func(ourTopology, theirTopology *Topology) {
ourTopology.UnsafeUnMerge(*theirTopology)
})
}
// WalkTopologies iterates through the Topologies of the report,
// potentially modifying them
func (r *Report) WalkTopologies(f func(*Topology)) {

View File

@@ -208,6 +208,27 @@ func (t *Topology) UnsafeMerge(other Topology) {
t.TableTemplates = t.TableTemplates.Merge(other.TableTemplates)
}
// UnsafeUnMerge removes any information from t that would be added by merging other,
// modifying the original.
func (t *Topology) UnsafeUnMerge(other Topology) {
if t.Shape == other.Shape {
t.Shape = ""
}
if t.Label == other.Label && t.LabelPlural == other.LabelPlural {
t.Label, t.LabelPlural = "", ""
}
if t.Tag == other.Tag {
t.Tag = ""
}
t.Nodes.UnsafeUnMerge(other.Nodes)
// TODO Controls
// NOTE: taking a shortcut and assuming templates are static, which they have always been in Scope
// If you break that assumption please change this.
t.MetadataTemplates = nil
t.MetricTemplates = nil
t.TableTemplates = nil
}
// Nodes is a collection of nodes in a topology. Keys are node IDs.
// TODO(pb): type Topology map[string]Node
type Nodes map[string]Node
@@ -249,6 +270,21 @@ func (n *Nodes) UnsafeMerge(other Nodes) {
}
}
// UnsafeUnMerge removes nodes from n that would be added by merging other,
// modifying the original.
func (n *Nodes) UnsafeUnMerge(other Nodes) {
for k, node := range *n {
if otherNode, ok := (other)[k]; ok {
remove := node.UnsafeUnMerge(otherNode)
if remove {
delete(*n, k)
} else {
(*n)[k] = node
}
}
}
}
// Validate checks the topology for various inconsistencies.
func (t Topology) Validate() error {
errs := []string{}