Merge pull request #3782 from weaveworks/limit-topologies-on-collection

Upgrade reports before merging in multitenant collector
This commit is contained in:
Bryan Boreham
2020-04-17 11:24:14 +01:00
committed by GitHub
2 changed files with 30 additions and 12 deletions

View File

@@ -84,6 +84,11 @@ var (
Name: "reports_bytes_total",
Help: "Total bytes stored in reports per user.",
}, []string{"user"})
topologiesDropped = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "scope",
Name: "topologies_dropped_total",
Help: "Total count of topologies dropped for being over limit.",
}, []string{"user", "topology"})
natsRequests = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "scope",
@@ -406,7 +411,7 @@ func (c *awsCollector) getReportKeys(ctx context.Context, userid string, start,
return reportKeys, nil
}
func (c *awsCollector) getReports(ctx context.Context, reportKeys []string) ([]report.Report, error) {
func (c *awsCollector) getReports(ctx context.Context, userid string, reportKeys []string) ([]report.Report, error) {
missing := reportKeys
stores := []ReportStore{c.inProcess}
@@ -426,10 +431,7 @@ func (c *awsCollector) getReports(ctx context.Context, reportKeys []string) ([]r
log.Warningf("Error fetching from cache: %v", err)
}
for key, report := range found {
if c.cfg.MaxTopNodes > 0 {
report = report.DropTopologiesOver(c.cfg.MaxTopNodes)
}
report = report.Upgrade()
report = c.massageReport(userid, report)
c.inProcess.StoreReport(key, report)
reports = append(reports, report)
}
@@ -444,6 +446,19 @@ func (c *awsCollector) getReports(ctx context.Context, reportKeys []string) ([]r
return reports, nil
}
// process a report from a probe which may be at an older version or overloaded
func (c *awsCollector) massageReport(userid string, report report.Report) report.Report {
if c.cfg.MaxTopNodes > 0 {
var dropped []string
report, dropped = report.DropTopologiesOver(c.cfg.MaxTopNodes)
for _, name := range dropped {
topologiesDropped.WithLabelValues(userid, name).Inc()
}
}
report = report.Upgrade()
return report
}
/*
S3 stores original reports from one probe at the timestamp they arrived at collector.
Collector also sends every report to memcached.
@@ -481,7 +496,7 @@ func (c *awsCollector) Report(ctx context.Context, timestamp time.Time) (report.
reports = append(reports, quantumReport)
}
// Fetch individual reports for the period after the last quantum
last, err := c.reportsForKeysInRange(ctx, reportKeys, ts, endTS)
last, err := c.reportsForKeysInRange(ctx, userid, reportKeys, ts, endTS)
if err != nil {
return report.MakeReport(), err
}
@@ -497,7 +512,7 @@ func (c *awsCollector) reportForQuantum(ctx context.Context, userid string, repo
if len(cached) == 1 {
return cached[key], nil
}
reports, err := c.reportsForKeysInRange(ctx, reportKeys, start, start+reportQuantisationInterval.Nanoseconds())
reports, err := c.reportsForKeysInRange(ctx, userid, reportKeys, start, start+reportQuantisationInterval.Nanoseconds())
if err != nil {
return report.MakeReport(), err
}
@@ -507,7 +522,7 @@ func (c *awsCollector) reportForQuantum(ctx context.Context, userid string, repo
}
// Find the keys relating to this time period then fetch from memcached and/or S3
func (c *awsCollector) reportsForKeysInRange(ctx context.Context, reportKeys []keyInfo, start, end int64) ([]report.Report, error) {
func (c *awsCollector) reportsForKeysInRange(ctx context.Context, userid string, reportKeys []keyInfo, start, end int64) ([]report.Report, error) {
var keys []string
for _, k := range reportKeys {
if k.ts >= start && k.ts < end {
@@ -518,7 +533,7 @@ func (c *awsCollector) reportsForKeysInRange(ctx context.Context, reportKeys []k
span.LogFields(otlog.Int("fetching", len(keys)), otlog.Int64("start", start), otlog.Int64("end", end))
}
log.Debugf("Fetching %d reports from %v to %v", len(keys), start, end)
return c.getReports(ctx, keys)
return c.getReports(ctx, userid, keys)
}
func (c *awsCollector) HasReports(ctx context.Context, timestamp time.Time) (bool, error) {
@@ -550,7 +565,7 @@ func (c *awsCollector) AdminSummary(ctx context.Context, timestamp time.Time) (s
if err != nil {
return "", err
}
reports, err := c.reportsForKeysInRange(ctx, reportKeys, start.UnixNano(), end.UnixNano())
reports, err := c.reportsForKeysInRange(ctx, userid, reportKeys, start.UnixNano(), end.UnixNano())
if err != nil {
return "", err
}
@@ -689,6 +704,7 @@ func (c *awsCollector) Add(ctx context.Context, rep report.Report, buf []byte) e
return err
}
} else {
rep = c.massageReport(userid, rep)
entry := &pendingEntry{report: report.MakeReport()}
if e, found := c.pending.LoadOrStore(userid, entry); found {
entry = e.(*pendingEntry)

View File

@@ -469,13 +469,15 @@ func (r Report) Validate() error {
// DropTopologiesOver - as a protection against overloading the app
// server, drop topologies that have really large node counts. In
// practice we only see this with runaway numbers of zombie processes.
func (r Report) DropTopologiesOver(limit int) Report {
func (r Report) DropTopologiesOver(limit int) (Report, []string) {
dropped := []string{}
r.WalkNamedTopologies(func(name string, topology *Topology) {
if topology != nil && len(topology.Nodes) > limit {
topology.Nodes = Nodes{}
dropped = append(dropped, name)
}
})
return r
return r, dropped
}
// Summary returns a human-readable string summarising the contents, for diagnostic purposes