Merge pull request #3677 from weaveworks/delta-reports

perf(probe): publish delta reports to reduce data size
This commit is contained in:
Bryan Boreham
2019-09-18 12:10:30 +01:00
committed by GitHub
11 changed files with 214 additions and 18 deletions

View File

@@ -230,6 +230,19 @@ function generate_latest_map() {
return true
}
// EqualIgnoringTimestamps returns true if all keys and values are the same.
func (m ${latest_map_type}) EqualIgnoringTimestamps(n ${latest_map_type}) bool {
if m.Size() != n.Size() {
return false
}
for i := range m {
if m[i].key != n[i].key || m[i].Value != n[i].Value {
return false
}
}
return true
}
// CodecEncodeSelf implements codec.Selfer.
// Duplicates the output for a built-in map without generating an
// intermediate copy of the data structure, to save time. Note this

View File

@@ -27,6 +27,7 @@ type Probe struct {
spyInterval, publishInterval time.Duration
publisher ReportPublisher
rateLimiter *rate.Limiter
ticksPerFullReport int
noControls bool
tickers []Ticker
@@ -77,17 +78,19 @@ type Ticker interface {
func New(
spyInterval, publishInterval time.Duration,
publisher ReportPublisher,
ticksPerFullReport int,
noControls bool,
) *Probe {
result := &Probe{
spyInterval: spyInterval,
publishInterval: publishInterval,
publisher: publisher,
rateLimiter: rate.NewLimiter(rate.Every(publishInterval/100), 1),
noControls: noControls,
quit: make(chan struct{}),
spiedReports: make(chan report.Report, spiedReportBufferSize),
shortcutReports: make(chan report.Report, shortcutReportBufferSize),
spyInterval: spyInterval,
publishInterval: publishInterval,
publisher: publisher,
rateLimiter: rate.NewLimiter(rate.Every(publishInterval/100), 1),
ticksPerFullReport: ticksPerFullReport,
noControls: noControls,
quit: make(chan struct{}),
spiedReports: make(chan report.Report, spiedReportBufferSize),
shortcutReports: make(chan report.Report, shortcutReportBufferSize),
}
return result
}
@@ -208,7 +211,7 @@ func (p *Probe) tag(r report.Report) report.Report {
return r
}
func (p *Probe) drainAndPublish(rpt report.Report, rs chan report.Report) {
func (p *Probe) drainAndSanitise(rpt report.Report, rs chan report.Report) report.Report {
p.rateLimiter.Wait(context.Background())
ForLoop:
for {
@@ -225,25 +228,44 @@ ForLoop:
t.Controls = report.Controls{}
})
}
if err := p.publisher.Publish(rpt); err != nil {
log.Infof("Publish: %v", err)
}
return rpt
}
func (p *Probe) publishLoop() {
defer p.done.Done()
pubTick := time.Tick(p.publishInterval)
publishCount := 0
var lastFullReport report.Report
for {
var err error
select {
case <-pubTick:
p.drainAndPublish(report.MakeReport(), p.spiedReports)
rpt := p.drainAndSanitise(report.MakeReport(), p.spiedReports)
fullReport := (publishCount % p.ticksPerFullReport) == 0
if !fullReport {
rpt.UnsafeUnMerge(lastFullReport)
}
err = p.publisher.Publish(rpt)
if err == nil {
if fullReport {
lastFullReport = rpt
}
publishCount++
} else {
// If we failed to send then drop back to full report next time
publishCount = 0
}
case rpt := <-p.shortcutReports:
p.drainAndPublish(rpt, p.shortcutReports)
rpt = p.drainAndSanitise(rpt, p.shortcutReports)
err = p.publisher.Publish(rpt)
case <-p.quit:
return
}
if err != nil {
log.Infof("Publish: %v", err)
}
}
}

View File

@@ -16,7 +16,7 @@ func TestApply(t *testing.T) {
endpointNode = report.MakeNodeWith(endpointNodeID, map[string]string{"5": "6"})
)
p := New(0, 0, nil, false)
p := New(0, 0, nil, 1, false)
p.AddTagger(NewTopologyTagger())
r := report.MakeReport()
@@ -72,7 +72,7 @@ func TestProbe(t *testing.T) {
pub := mockPublisher{make(chan report.Report, 10)}
p := New(10*time.Millisecond, 100*time.Millisecond, pub, false)
p := New(10*time.Millisecond, 100*time.Millisecond, pub, 1, false)
p.AddReporter(mockReporter{want})
p.Start()
defer p.Stop()

View File

@@ -100,6 +100,7 @@ type probeFlags struct {
token string
httpListen string
publishInterval time.Duration
ticksPerFullReport int
spyInterval time.Duration
pluginsRoot string
insecure bool
@@ -297,6 +298,7 @@ func setupFlags(flags *flags) {
flag.StringVar(&flags.probe.httpListen, "probe.http.listen", "", "listen address for HTTP profiling and instrumentation server")
flag.DurationVar(&flags.probe.publishInterval, "probe.publish.interval", 3*time.Second, "publish (output) interval")
flag.DurationVar(&flags.probe.spyInterval, "probe.spy.interval", time.Second, "spy (scan) interval")
flag.IntVar(&flags.probe.ticksPerFullReport, "probe.full-report-every", 3, "publish full report every N times, deltas in between. Make sure N < (app.window / probe.publish.interval)")
flag.StringVar(&flags.probe.pluginsRoot, "probe.plugins.root", "/var/run/scope/plugins", "Root directory to search for plugins")
flag.BoolVar(&flags.probe.noControls, "probe.no-controls", false, "Disable controls (e.g. start/stop containers, terminals, logs ...)")
flag.BoolVar(&flags.probe.noCommandLineArguments, "probe.omit.cmd-args", false, "Disable collection of command-line arguments")

View File

@@ -240,7 +240,7 @@ func probeMain(flags probeFlags, targets []appclient.Target) {
clients = multiClients
}
p := probe.New(flags.spyInterval, flags.publishInterval, clients, flags.noControls)
p := probe.New(flags.spyInterval, flags.publishInterval, clients, flags.ticksPerFullReport, flags.noControls)
p.AddTagger(probe.NewTopologyTagger())
var processCache *process.CachingWalker

View File

@@ -27,6 +27,11 @@ func (a IDList) Merge(b IDList) IDList {
return IDList(merged)
}
// Equal returns true if a and b have the same contents
func (a IDList) Equal(b IDList) bool {
return StringSet(a).Equal(StringSet(b))
}
// Contains returns true if id is in the list.
func (a IDList) Contains(id string) bool {
return StringSet(a).Contains(id)

View File

@@ -198,6 +198,19 @@ func (m StringLatestMap) DeepEqual(n StringLatestMap) bool {
return true
}
// EqualIgnoringTimestamps returns true if all keys and values are the same.
func (m StringLatestMap) EqualIgnoringTimestamps(n StringLatestMap) bool {
if m.Size() != n.Size() {
return false
}
for i := range m {
if m[i].key != n[i].key || m[i].Value != n[i].Value {
return false
}
}
return true
}
// CodecEncodeSelf implements codec.Selfer.
// Duplicates the output for a built-in map without generating an
// intermediate copy of the data structure, to save time. Note this
@@ -450,6 +463,19 @@ func (m NodeControlDataLatestMap) DeepEqual(n NodeControlDataLatestMap) bool {
return true
}
// EqualIgnoringTimestamps returns true if all keys and values are the same.
func (m NodeControlDataLatestMap) EqualIgnoringTimestamps(n NodeControlDataLatestMap) bool {
if m.Size() != n.Size() {
return false
}
for i := range m {
if m[i].key != n[i].key || m[i].Value != n[i].Value {
return false
}
}
return true
}
// CodecEncodeSelf implements codec.Selfer.
// Duplicates the output for a built-in map without generating an
// intermediate copy of the data structure, to save time. Note this

View File

@@ -198,3 +198,47 @@ func (n Node) Merge(other Node) Node {
Children: n.Children.Merge(other.Children),
}
}
// UnsafeUnMerge removes data from n that would be added by merging other,
// modifying the original.
// returns true if n.Merge(other) is the same as n
func (n *Node) UnsafeUnMerge(other Node) bool {
// If it's not the same ID and topology then just bail out
if n.ID != other.ID || n.Topology != other.Topology {
return false
}
n.ID = ""
n.Topology = ""
remove := true
// We either keep a whole section or drop it if anything changed
// - a trade-off of some extra data size in favour of faster simpler code.
// (in practice, very few values reported by Scope probes do change over time)
if n.LatestControls.EqualIgnoringTimestamps(other.LatestControls) {
n.LatestControls = nil
} else {
remove = false
}
if n.Latest.EqualIgnoringTimestamps(other.Latest) {
n.Latest = nil
} else {
remove = false
}
if n.Sets.DeepEqual(other.Sets) {
n.Sets = MakeSets()
} else {
remove = false
}
if n.Parents.DeepEqual(other.Parents) {
n.Parents = MakeSets()
} else {
remove = false
}
if n.Adjacency.Equal(other.Adjacency) {
n.Adjacency = nil
} else {
remove = false
}
// counters and children are not created in the probe so we don't check those
// metrics don't overlap so just check if we have any
return remove && len(n.Metrics) == 0
}

View File

@@ -188,7 +188,7 @@ type Report struct {
// Job represent all Kubernetes Job on hosts running probes.
Job Topology
DNS DNSRecords
DNS DNSRecords `json:"nodes,omitempty" deepequal:"nil==empty"`
// Sampling data for this report.
Sampling Sampling
@@ -351,6 +351,16 @@ func (r *Report) UnsafeMerge(other Report) {
})
}
// UnsafeUnMerge removes any information from r that would be added by merging other.
// The original is modified.
func (r *Report) UnsafeUnMerge(other Report) {
// TODO: DNS, Sampling, Plugins
r.Window = r.Window - other.Window
r.WalkPairedTopologies(&other, func(ourTopology, theirTopology *Topology) {
ourTopology.UnsafeUnMerge(*theirTopology)
})
}
// WalkTopologies iterates through the Topologies of the report,
// potentially modifying them
func (r *Report) WalkTopologies(f func(*Topology)) {

View File

@@ -98,3 +98,41 @@ func TestReportUpgrade(t *testing.T) {
t.Error(test.Diff(expected, got))
}
}
func TestReportUnMerge(t *testing.T) {
n1 := report.MakeNodeWith("foo", map[string]string{"foo": "bar"})
r1 := makeTestReport()
r2 := r1.Copy()
r2.Container.AddNode(n1)
// r2 should be the same as r1 with just the foo-bar node added
r2.UnsafeUnMerge(r1)
// Now r2 should have everything removed except that one node, and its ID
expected := report.Report{
ID: r2.ID,
Container: report.Topology{
Nodes: report.Nodes{
"foo": n1,
},
},
}
// Now test report with two nodes unmerged on report with one
r1.Container.AddNode(n1)
r2 = r1.Copy()
n2 := report.MakeNodeWith("foo2", map[string]string{"ping": "pong"})
r2.Container.AddNode(n2)
// r2 should be the same as r1 with one extra node
r2.UnsafeUnMerge(r1)
expected = report.Report{
ID: r2.ID,
Container: report.Topology{
Nodes: report.Nodes{
"foo2": n2,
},
},
}
if !s_reflect.DeepEqual(expected, r2) {
t.Error(test.Diff(expected, r2))
}
}

View File

@@ -208,6 +208,27 @@ func (t *Topology) UnsafeMerge(other Topology) {
t.TableTemplates = t.TableTemplates.Merge(other.TableTemplates)
}
// UnsafeUnMerge removes any information from t that would be added by merging other,
// modifying the original.
func (t *Topology) UnsafeUnMerge(other Topology) {
if t.Shape == other.Shape {
t.Shape = ""
}
if t.Label == other.Label && t.LabelPlural == other.LabelPlural {
t.Label, t.LabelPlural = "", ""
}
if t.Tag == other.Tag {
t.Tag = ""
}
t.Nodes.UnsafeUnMerge(other.Nodes)
// TODO Controls
// NOTE: taking a shortcut and assuming templates are static, which they have always been in Scope
// If you break that assumption please change this.
t.MetadataTemplates = nil
t.MetricTemplates = nil
t.TableTemplates = nil
}
// Nodes is a collection of nodes in a topology. Keys are node IDs.
// TODO(pb): type Topology map[string]Node
type Nodes map[string]Node
@@ -249,6 +270,21 @@ func (n *Nodes) UnsafeMerge(other Nodes) {
}
}
// UnsafeUnMerge removes nodes from n that would be added by merging other,
// modifying the original.
func (n *Nodes) UnsafeUnMerge(other Nodes) {
for k, node := range *n {
if otherNode, ok := (other)[k]; ok {
remove := node.UnsafeUnMerge(otherNode)
if remove {
delete(*n, k)
} else {
(*n)[k] = node
}
}
}
}
// Validate checks the topology for various inconsistencies.
func (t Topology) Validate() error {
errs := []string{}