From b772fa83b37c578b9c8de73ecf556a3ebd9405d6 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Thu, 16 Apr 2020 19:24:21 +0000 Subject: [PATCH 1/2] Add a metric for topologies dropped because they are over limit Need to modify DropTopologiesOver() to report what it dropped, and plumb through the userid so the metric can show who has a problem. --- app/multitenant/aws_collector.go | 35 +++++++++++++++++++++++--------- report/report.go | 6 ++++-- 2 files changed, 29 insertions(+), 12 deletions(-) diff --git a/app/multitenant/aws_collector.go b/app/multitenant/aws_collector.go index dc3364e8e..fe52473c6 100644 --- a/app/multitenant/aws_collector.go +++ b/app/multitenant/aws_collector.go @@ -84,6 +84,11 @@ var ( Name: "reports_bytes_total", Help: "Total bytes stored in reports per user.", }, []string{"user"}) + topologiesDropped = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "scope", + Name: "topologies_dropped_total", + Help: "Total count of topologies dropped for being over limit.", + }, []string{"user", "topology"}) natsRequests = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "scope", @@ -406,7 +411,7 @@ func (c *awsCollector) getReportKeys(ctx context.Context, userid string, start, return reportKeys, nil } -func (c *awsCollector) getReports(ctx context.Context, reportKeys []string) ([]report.Report, error) { +func (c *awsCollector) getReports(ctx context.Context, userid string, reportKeys []string) ([]report.Report, error) { missing := reportKeys stores := []ReportStore{c.inProcess} @@ -426,10 +431,7 @@ func (c *awsCollector) getReports(ctx context.Context, reportKeys []string) ([]r log.Warningf("Error fetching from cache: %v", err) } for key, report := range found { - if c.cfg.MaxTopNodes > 0 { - report = report.DropTopologiesOver(c.cfg.MaxTopNodes) - } - report = report.Upgrade() + report = c.massageReport(userid, report) c.inProcess.StoreReport(key, report) reports = append(reports, report) } @@ -444,6 +446,19 @@ func (c *awsCollector) getReports(ctx context.Context, reportKeys []string) ([]r return reports, nil } +// process a report from a probe which may be at an older version or overloaded +func (c *awsCollector) massageReport(userid string, report report.Report) report.Report { + if c.cfg.MaxTopNodes > 0 { + var dropped []string + report, dropped = report.DropTopologiesOver(c.cfg.MaxTopNodes) + for _, name := range dropped { + topologiesDropped.WithLabelValues(userid, name).Inc() + } + } + report = report.Upgrade() + return report +} + /* S3 stores original reports from one probe at the timestamp they arrived at collector. Collector also sends every report to memcached. @@ -481,7 +496,7 @@ func (c *awsCollector) Report(ctx context.Context, timestamp time.Time) (report. reports = append(reports, quantumReport) } // Fetch individual reports for the period after the last quantum - last, err := c.reportsForKeysInRange(ctx, reportKeys, ts, endTS) + last, err := c.reportsForKeysInRange(ctx, userid, reportKeys, ts, endTS) if err != nil { return report.MakeReport(), err } @@ -497,7 +512,7 @@ func (c *awsCollector) reportForQuantum(ctx context.Context, userid string, repo if len(cached) == 1 { return cached[key], nil } - reports, err := c.reportsForKeysInRange(ctx, reportKeys, start, start+reportQuantisationInterval.Nanoseconds()) + reports, err := c.reportsForKeysInRange(ctx, userid, reportKeys, start, start+reportQuantisationInterval.Nanoseconds()) if err != nil { return report.MakeReport(), err } @@ -507,7 +522,7 @@ func (c *awsCollector) reportForQuantum(ctx context.Context, userid string, repo } // Find the keys relating to this time period then fetch from memcached and/or S3 -func (c *awsCollector) reportsForKeysInRange(ctx context.Context, reportKeys []keyInfo, start, end int64) ([]report.Report, error) { +func (c *awsCollector) reportsForKeysInRange(ctx context.Context, userid string, reportKeys []keyInfo, start, end int64) ([]report.Report, error) { var keys []string for _, k := range reportKeys { if k.ts >= start && k.ts < end { @@ -518,7 +533,7 @@ func (c *awsCollector) reportsForKeysInRange(ctx context.Context, reportKeys []k span.LogFields(otlog.Int("fetching", len(keys)), otlog.Int64("start", start), otlog.Int64("end", end)) } log.Debugf("Fetching %d reports from %v to %v", len(keys), start, end) - return c.getReports(ctx, keys) + return c.getReports(ctx, userid, keys) } func (c *awsCollector) HasReports(ctx context.Context, timestamp time.Time) (bool, error) { @@ -550,7 +565,7 @@ func (c *awsCollector) AdminSummary(ctx context.Context, timestamp time.Time) (s if err != nil { return "", err } - reports, err := c.reportsForKeysInRange(ctx, reportKeys, start.UnixNano(), end.UnixNano()) + reports, err := c.reportsForKeysInRange(ctx, userid, reportKeys, start.UnixNano(), end.UnixNano()) if err != nil { return "", err } diff --git a/report/report.go b/report/report.go index 66fc0cdcb..1534c6803 100644 --- a/report/report.go +++ b/report/report.go @@ -469,13 +469,15 @@ func (r Report) Validate() error { // DropTopologiesOver - as a protection against overloading the app // server, drop topologies that have really large node counts. In // practice we only see this with runaway numbers of zombie processes. -func (r Report) DropTopologiesOver(limit int) Report { +func (r Report) DropTopologiesOver(limit int) (Report, []string) { + dropped := []string{} r.WalkNamedTopologies(func(name string, topology *Topology) { if topology != nil && len(topology.Nodes) > limit { topology.Nodes = Nodes{} + dropped = append(dropped, name) } }) - return r + return r, dropped } // Summary returns a human-readable string summarising the contents, for diagnostic purposes From fa4d1c4c2b01ea4b7c65b5fb6dad584e5a1d2a5f Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Thu, 16 Apr 2020 19:27:40 +0000 Subject: [PATCH 2/2] Upgrade reports before merging In case they came from an older or an overload probe. --- app/multitenant/aws_collector.go | 1 + 1 file changed, 1 insertion(+) diff --git a/app/multitenant/aws_collector.go b/app/multitenant/aws_collector.go index fe52473c6..dd630ec68 100644 --- a/app/multitenant/aws_collector.go +++ b/app/multitenant/aws_collector.go @@ -704,6 +704,7 @@ func (c *awsCollector) Add(ctx context.Context, rep report.Report, buf []byte) e return err } } else { + rep = c.massageReport(userid, rep) entry := &pendingEntry{report: report.MakeReport()} if e, found := c.pending.LoadOrStore(userid, entry); found { entry = e.(*pendingEntry)