diff --git a/app/multitenant/aws_collector.go b/app/multitenant/aws_collector.go index 37698d936..5550ac830 100644 --- a/app/multitenant/aws_collector.go +++ b/app/multitenant/aws_collector.go @@ -30,6 +30,11 @@ const ( tsField = "ts" reportField = "report" natsTimeout = 10 * time.Second + + reportQuantisationInterval = 3000000000 // 3 seconds in nanoseconds + // Grace period allows for some gap between the timestamp on reports + // (assigned when they arrive at collector) and them appearing in DynamoDB query + gracePeriod = 500000000 // 1/2 second in nanoseconds ) var ( @@ -167,7 +172,7 @@ func NewAWSCollector(config AWSCollectorConfig) (AWSCollector, error) { cfg: config, db: dynamodb.New(session.New(config.DynamoDBConfig)), merger: app.NewFastMerger(), - inProcess: newInProcessStore(reportCacheSize, config.Window), + inProcess: newInProcessStore(reportCacheSize, config.Window+reportQuantisationInterval), nats: nc, waiters: map[watchKey]*nats.Subscription{}, }, nil @@ -225,8 +230,13 @@ func (c *awsCollector) CreateTables() error { return err } +type keyInfo struct { + key string + ts int64 +} + // reportKeysInRange returns the s3 keys for reports in the specified range -func (c *awsCollector) reportKeysInRange(ctx context.Context, userid string, row int64, start, end time.Time) ([]string, error) { +func (c *awsCollector) reportKeysInRange(ctx context.Context, userid string, row int64, start, end time.Time) ([]keyInfo, error) { rowKey := fmt.Sprintf("%s-%s", userid, strconv.FormatInt(row, 10)) var resp *dynamodb.QueryOutput err := instrument.TimeRequestHistogram(ctx, "DynamoDB.Query", dynamoRequestDuration, func(_ context.Context) error { @@ -260,22 +270,24 @@ func (c *awsCollector) reportKeysInRange(ctx context.Context, userid string, row return nil, err } - result := []string{} + result := []keyInfo{} for _, item := range resp.Items { reportKey := item[reportField].S - if reportKey == nil { + tsValue := item[tsField].N + if reportKey == nil || tsValue == nil { log.Errorf("Empty row!") continue } dynamoValueSize.WithLabelValues("BatchGetItem"). Add(float64(len(*reportKey))) - result = append(result, *reportKey) + ts, _ := strconv.ParseInt(*tsValue, 10, 64) + result = append(result, keyInfo{key: *reportKey, ts: ts}) } return result, nil } // getReportKeys returns the S3 for reports in the interval [start, end]. -func (c *awsCollector) getReportKeys(ctx context.Context, userid string, start, end time.Time) ([]string, error) { +func (c *awsCollector) getReportKeys(ctx context.Context, userid string, start, end time.Time) ([]keyInfo, error) { var ( rowStart = start.UnixNano() / time.Hour.Nanoseconds() rowEnd = end.UnixNano() / time.Hour.Nanoseconds() @@ -283,7 +295,7 @@ func (c *awsCollector) getReportKeys(ctx context.Context, userid string, start, ) // Queries will only every span 2 rows max. - var reportKeys []string + var reportKeys []keyInfo if rowStart != rowEnd { reportKeys1, err := c.reportKeysInRange(ctx, userid, rowStart, start, end) if err != nil { @@ -342,6 +354,15 @@ func (c *awsCollector) getReports(ctx context.Context, reportKeys []string) ([]r return reports, nil } +/* +S3 stores original reports from one probe at the timestamp they arrived at collector. +Collector also sends every report to memcached. +The in-memory cache stores: + - individual reports deserialised, under S3 key for report + - sets of reports in interval [t,t+3) merged, under key "instance:t" + - so to check the cache for reports from 14:31:00 to 14:31:15 you would request 5 keys 3 seconds apart +*/ + func (c *awsCollector) Report(ctx context.Context, timestamp time.Time) (report.Report, error) { span, ctx := opentracing.StartSpanFromContext(ctx, "awsCollector.Report") defer span.Finish() @@ -356,15 +377,59 @@ func (c *awsCollector) Report(ctx context.Context, timestamp time.Time) (report. return report.MakeReport(), err } span.LogFields(otlog.Int("keys", len(reportKeys)), otlog.String("timestamp", timestamp.String())) - log.Debugf("Fetching %d reports to %v", len(reportKeys), timestamp) - reports, err := c.getReports(ctx, reportKeys) + + var reports []report.Report + // Fetch a merged report for each time quantum covering the window + startTS, endTS := start.UnixNano(), end.UnixNano() + ts := startTS - (startTS % reportQuantisationInterval) + for ; ts+reportQuantisationInterval+gracePeriod < endTS; ts += reportQuantisationInterval { + quantumReport, err := c.reportForQuantum(ctx, userid, reportKeys, ts) + if err != nil { + return report.MakeReport(), err + } + reports = append(reports, quantumReport) + } + // Fetch individual reports for the period after the last quantum + last, err := c.reportsForKeysInRange(ctx, reportKeys, ts, endTS) if err != nil { return report.MakeReport(), err } - + reports = append(reports, last...) + span.LogFields(otlog.Int("merging", len(reports))) return c.merger.Merge(reports), nil } +// Fetch a merged report either from cache or from store which we put in cache +func (c *awsCollector) reportForQuantum(ctx context.Context, userid string, reportKeys []keyInfo, start int64) (report.Report, error) { + key := fmt.Sprintf("%s:%d", userid, start) + cached, _, err := c.inProcess.FetchReports(ctx, []string{key}) + if len(cached) == 1 { + return cached[key], nil + } + reports, err := c.reportsForKeysInRange(ctx, reportKeys, start, start+reportQuantisationInterval) + if err != nil { + return report.MakeReport(), err + } + merged := c.merger.Merge(reports) + c.inProcess.StoreReport(key, merged) + return merged, nil +} + +// Find the keys relating to this time period then fetch from memcached and/or S3 +func (c *awsCollector) reportsForKeysInRange(ctx context.Context, reportKeys []keyInfo, start, end int64) ([]report.Report, error) { + var keys []string + for _, k := range reportKeys { + if k.ts >= start && k.ts < end { + keys = append(keys, k.key) + } + } + if span := opentracing.SpanFromContext(ctx); span != nil { + span.LogFields(otlog.Int("fetching", len(keys)), otlog.Int64("start", start), otlog.Int64("end", end)) + } + log.Debugf("Fetching %d reports from %v to %v", len(keys), start, end) + return c.getReports(ctx, keys) +} + func (c *awsCollector) HasReports(ctx context.Context, timestamp time.Time) (bool, error) { userid, err := c.cfg.UserIDer(ctx) if err != nil {