Refactor: extract function reportsFromStore()

To help clarify subsequent changes
This commit is contained in:
Bryan Boreham
2021-03-28 14:09:00 +01:00
parent 5d12b7ff65
commit 667daef81b

View File

@@ -459,15 +459,6 @@ func (c *awsCollector) massageReport(userid string, report report.Report) report
return report
}
/*
S3 stores original reports from one probe at the timestamp they arrived at collector.
Collector also sends every report to memcached.
The in-memory cache stores:
- individual reports deserialised, under S3 key for report
- sets of reports in interval [t,t+3) merged, under key "instance:t"
- so to check the cache for reports from 14:31:00 to 14:31:15 you would request 5 keys 3 seconds apart
*/
func (c *awsCollector) Report(ctx context.Context, timestamp time.Time) (report.Report, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "awsCollector.Report")
defer span.Finish()
@@ -476,11 +467,32 @@ func (c *awsCollector) Report(ctx context.Context, timestamp time.Time) (report.
return report.MakeReport(), err
}
span.SetTag("userid", userid)
var reports []report.Report
reports, err = c.reportsFromStore(ctx, userid, timestamp)
if err != nil {
return report.MakeReport(), err
}
span.LogFields(otlog.Int("merging", len(reports)))
return c.merger.Merge(reports), nil
}
/*
Given a timestamp in the past, fetch reports within the window from store or cache
S3 stores original reports from one probe at the timestamp they arrived at collector.
Collector also sends every report to memcached.
The in-memory cache stores:
- individual reports deserialised, under S3 key for report
- sets of reports in interval [t,t+3) merged, under key "instance:t"
- so to check the cache for reports from 14:31:00 to 14:31:15 you would request 5 keys 3 seconds apart
*/
func (c *awsCollector) reportsFromStore(ctx context.Context, userid string, timestamp time.Time) ([]report.Report, error) {
span := opentracing.SpanFromContext(ctx)
end := timestamp
start := end.Add(-c.cfg.Window)
reportKeys, err := c.getReportKeys(ctx, userid, start, end)
if err != nil {
return report.MakeReport(), err
return nil, err
}
span.LogFields(otlog.Int("keys", len(reportKeys)), otlog.String("timestamp", timestamp.String()))
@@ -491,18 +503,17 @@ func (c *awsCollector) Report(ctx context.Context, timestamp time.Time) (report.
for ; ts+(reportQuantisationInterval+gracePeriod).Nanoseconds() < endTS; ts += reportQuantisationInterval.Nanoseconds() {
quantumReport, err := c.reportForQuantum(ctx, userid, reportKeys, ts)
if err != nil {
return report.MakeReport(), err
return nil, err
}
reports = append(reports, quantumReport)
}
// Fetch individual reports for the period after the last quantum
last, err := c.reportsForKeysInRange(ctx, userid, reportKeys, ts, endTS)
if err != nil {
return report.MakeReport(), err
return nil, err
}
reports = append(reports, last...)
span.LogFields(otlog.Int("merging", len(reports)))
return c.merger.Merge(reports), nil
return reports, nil
}
// Fetch a merged report either from cache or from store which we put in cache