Cache merged groups of reports, to reduce the number we handle in parallel

Previously we would merge all reports in a 15-second window.
Now we use a 'quantum' of 3 seconds, similar to the single-user app.

E.g. a 30-node cluster will have 150 individual reports over 15
seconds, but the new code will merge 5 pre-merged reports plus 20-ish
very recent individual ones.

This limits the max heap size used for deserialising, since we only do
3 seconds at once per instance.

Individual reports are still put into the cache, but should get
displaced by the pre-merged ones under LRU.
This commit is contained in:
Bryan Boreham
2019-09-06 07:10:10 +00:00
parent 70550ca34a
commit b5376facf2

View File

@@ -30,6 +30,11 @@ const (
tsField = "ts"
reportField = "report"
natsTimeout = 10 * time.Second
reportQuantisationInterval = 3000000000 // 3 seconds in nanoseconds
// Grace period allows for some gap between the timestamp on reports
// (assigned when they arrive at collector) and them appearing in DynamoDB query
gracePeriod = 500000000 // 1/2 second in nanoseconds
)
var (
@@ -167,7 +172,7 @@ func NewAWSCollector(config AWSCollectorConfig) (AWSCollector, error) {
cfg: config,
db: dynamodb.New(session.New(config.DynamoDBConfig)),
merger: app.NewFastMerger(),
inProcess: newInProcessStore(reportCacheSize, config.Window),
inProcess: newInProcessStore(reportCacheSize, config.Window+reportQuantisationInterval),
nats: nc,
waiters: map[watchKey]*nats.Subscription{},
}, nil
@@ -225,8 +230,13 @@ func (c *awsCollector) CreateTables() error {
return err
}
type keyInfo struct {
key string
ts int64
}
// reportKeysInRange returns the s3 keys for reports in the specified range
func (c *awsCollector) reportKeysInRange(ctx context.Context, userid string, row int64, start, end time.Time) ([]string, error) {
func (c *awsCollector) reportKeysInRange(ctx context.Context, userid string, row int64, start, end time.Time) ([]keyInfo, error) {
rowKey := fmt.Sprintf("%s-%s", userid, strconv.FormatInt(row, 10))
var resp *dynamodb.QueryOutput
err := instrument.TimeRequestHistogram(ctx, "DynamoDB.Query", dynamoRequestDuration, func(_ context.Context) error {
@@ -260,22 +270,24 @@ func (c *awsCollector) reportKeysInRange(ctx context.Context, userid string, row
return nil, err
}
result := []string{}
result := []keyInfo{}
for _, item := range resp.Items {
reportKey := item[reportField].S
if reportKey == nil {
tsValue := item[tsField].N
if reportKey == nil || tsValue == nil {
log.Errorf("Empty row!")
continue
}
dynamoValueSize.WithLabelValues("BatchGetItem").
Add(float64(len(*reportKey)))
result = append(result, *reportKey)
ts, _ := strconv.ParseInt(*tsValue, 10, 64)
result = append(result, keyInfo{key: *reportKey, ts: ts})
}
return result, nil
}
// getReportKeys returns the S3 for reports in the interval [start, end].
func (c *awsCollector) getReportKeys(ctx context.Context, userid string, start, end time.Time) ([]string, error) {
func (c *awsCollector) getReportKeys(ctx context.Context, userid string, start, end time.Time) ([]keyInfo, error) {
var (
rowStart = start.UnixNano() / time.Hour.Nanoseconds()
rowEnd = end.UnixNano() / time.Hour.Nanoseconds()
@@ -283,7 +295,7 @@ func (c *awsCollector) getReportKeys(ctx context.Context, userid string, start,
)
// Queries will only every span 2 rows max.
var reportKeys []string
var reportKeys []keyInfo
if rowStart != rowEnd {
reportKeys1, err := c.reportKeysInRange(ctx, userid, rowStart, start, end)
if err != nil {
@@ -342,6 +354,15 @@ func (c *awsCollector) getReports(ctx context.Context, reportKeys []string) ([]r
return reports, nil
}
/*
S3 stores original reports from one probe at the timestamp they arrived at collector.
Collector also sends every report to memcached.
The in-memory cache stores:
- individual reports deserialised, under S3 key for report
- sets of reports in interval [t,t+3) merged, under key "instance:t"
- so to check the cache for reports from 14:31:00 to 14:31:15 you would request 5 keys 3 seconds apart
*/
func (c *awsCollector) Report(ctx context.Context, timestamp time.Time) (report.Report, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "awsCollector.Report")
defer span.Finish()
@@ -356,15 +377,59 @@ func (c *awsCollector) Report(ctx context.Context, timestamp time.Time) (report.
return report.MakeReport(), err
}
span.LogFields(otlog.Int("keys", len(reportKeys)), otlog.String("timestamp", timestamp.String()))
log.Debugf("Fetching %d reports to %v", len(reportKeys), timestamp)
reports, err := c.getReports(ctx, reportKeys)
var reports []report.Report
// Fetch a merged report for each time quantum covering the window
startTS, endTS := start.UnixNano(), end.UnixNano()
ts := startTS - (startTS % reportQuantisationInterval)
for ; ts+reportQuantisationInterval+gracePeriod < endTS; ts += reportQuantisationInterval {
quantumReport, err := c.reportForQuantum(ctx, userid, reportKeys, ts)
if err != nil {
return report.MakeReport(), err
}
reports = append(reports, quantumReport)
}
// Fetch individual reports for the period after the last quantum
last, err := c.reportsForKeysInRange(ctx, reportKeys, ts, endTS)
if err != nil {
return report.MakeReport(), err
}
reports = append(reports, last...)
span.LogFields(otlog.Int("merging", len(reports)))
return c.merger.Merge(reports), nil
}
// Fetch a merged report either from cache or from store which we put in cache
func (c *awsCollector) reportForQuantum(ctx context.Context, userid string, reportKeys []keyInfo, start int64) (report.Report, error) {
key := fmt.Sprintf("%s:%d", userid, start)
cached, _, err := c.inProcess.FetchReports(ctx, []string{key})
if len(cached) == 1 {
return cached[key], nil
}
reports, err := c.reportsForKeysInRange(ctx, reportKeys, start, start+reportQuantisationInterval)
if err != nil {
return report.MakeReport(), err
}
merged := c.merger.Merge(reports)
c.inProcess.StoreReport(key, merged)
return merged, nil
}
// Find the keys relating to this time period then fetch from memcached and/or S3
func (c *awsCollector) reportsForKeysInRange(ctx context.Context, reportKeys []keyInfo, start, end int64) ([]report.Report, error) {
var keys []string
for _, k := range reportKeys {
if k.ts >= start && k.ts < end {
keys = append(keys, k.key)
}
}
if span := opentracing.SpanFromContext(ctx); span != nil {
span.LogFields(otlog.Int("fetching", len(keys)), otlog.Int64("start", start), otlog.Int64("end", end))
}
log.Debugf("Fetching %d reports from %v to %v", len(keys), start, end)
return c.getReports(ctx, keys)
}
func (c *awsCollector) HasReports(ctx context.Context, timestamp time.Time) (bool, error) {
userid, err := c.cfg.UserIDer(ctx)
if err != nil {