Merge pull request #3671 from weaveworks/quantise-aws-read-cache

Quantise report cache in query side of aws-collector
This commit is contained in:
Bryan Boreham
2019-09-13 12:44:34 +01:00
committed by GitHub

View File

@@ -30,6 +30,11 @@ const (
tsField = "ts"
reportField = "report"
natsTimeout = 10 * time.Second
reportQuantisationInterval = 3 * time.Second
// Grace period allows for some gap between the timestamp on reports
// (assigned when they arrive at collector) and them appearing in DynamoDB query
gracePeriod = 500 * time.Millisecond
)
var (
@@ -167,7 +172,7 @@ func NewAWSCollector(config AWSCollectorConfig) (AWSCollector, error) {
cfg: config,
db: dynamodb.New(session.New(config.DynamoDBConfig)),
merger: app.NewFastMerger(),
inProcess: newInProcessStore(reportCacheSize, config.Window),
inProcess: newInProcessStore(reportCacheSize, config.Window+reportQuantisationInterval),
nats: nc,
waiters: map[watchKey]*nats.Subscription{},
}, nil
@@ -225,8 +230,13 @@ func (c *awsCollector) CreateTables() error {
return err
}
type keyInfo struct {
key string
ts int64
}
// reportKeysInRange returns the s3 keys for reports in the specified range
func (c *awsCollector) reportKeysInRange(ctx context.Context, userid string, row int64, start, end time.Time) ([]string, error) {
func (c *awsCollector) reportKeysInRange(ctx context.Context, userid string, row int64, start, end time.Time) ([]keyInfo, error) {
rowKey := fmt.Sprintf("%s-%s", userid, strconv.FormatInt(row, 10))
var resp *dynamodb.QueryOutput
err := instrument.TimeRequestHistogram(ctx, "DynamoDB.Query", dynamoRequestDuration, func(_ context.Context) error {
@@ -260,36 +270,32 @@ func (c *awsCollector) reportKeysInRange(ctx context.Context, userid string, row
return nil, err
}
result := []string{}
result := []keyInfo{}
for _, item := range resp.Items {
reportKey := item[reportField].S
if reportKey == nil {
tsValue := item[tsField].N
if reportKey == nil || tsValue == nil {
log.Errorf("Empty row!")
continue
}
dynamoValueSize.WithLabelValues("BatchGetItem").
Add(float64(len(*reportKey)))
result = append(result, *reportKey)
ts, _ := strconv.ParseInt(*tsValue, 10, 64)
result = append(result, keyInfo{key: *reportKey, ts: ts})
}
return result, nil
}
// getReportKeys returns the S3 for reports in the reporting window ending at timestamp.
func (c *awsCollector) getReportKeys(ctx context.Context, timestamp time.Time) ([]string, error) {
// getReportKeys returns the S3 for reports in the interval [start, end].
func (c *awsCollector) getReportKeys(ctx context.Context, userid string, start, end time.Time) ([]keyInfo, error) {
var (
end = timestamp
start = end.Add(-c.cfg.Window)
rowStart = start.UnixNano() / time.Hour.Nanoseconds()
rowEnd = end.UnixNano() / time.Hour.Nanoseconds()
err error
)
userid, err := c.cfg.UserIDer(ctx)
if err != nil {
return nil, err
}
// Queries will only every span 2 rows max.
var reportKeys []string
var reportKeys []keyInfo
if rowStart != rowEnd {
reportKeys1, err := c.reportKeysInRange(ctx, userid, rowStart, start, end)
if err != nil {
@@ -348,25 +354,89 @@ func (c *awsCollector) getReports(ctx context.Context, reportKeys []string) ([]r
return reports, nil
}
/*
S3 stores original reports from one probe at the timestamp they arrived at collector.
Collector also sends every report to memcached.
The in-memory cache stores:
- individual reports deserialised, under S3 key for report
- sets of reports in interval [t,t+3) merged, under key "instance:t"
- so to check the cache for reports from 14:31:00 to 14:31:15 you would request 5 keys 3 seconds apart
*/
func (c *awsCollector) Report(ctx context.Context, timestamp time.Time) (report.Report, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "awsCollector.Report")
defer span.Finish()
reportKeys, err := c.getReportKeys(ctx, timestamp)
userid, err := c.cfg.UserIDer(ctx)
if err != nil {
return report.MakeReport(), err
}
end := timestamp
start := end.Add(-c.cfg.Window)
reportKeys, err := c.getReportKeys(ctx, userid, start, end)
if err != nil {
return report.MakeReport(), err
}
span.LogFields(otlog.Int("keys", len(reportKeys)), otlog.String("timestamp", timestamp.String()))
log.Debugf("Fetching %d reports to %v", len(reportKeys), timestamp)
reports, err := c.getReports(ctx, reportKeys)
var reports []report.Report
// Fetch a merged report for each time quantum covering the window
startTS, endTS := start.UnixNano(), end.UnixNano()
ts := startTS - (startTS % reportQuantisationInterval.Nanoseconds())
for ; ts+(reportQuantisationInterval+gracePeriod).Nanoseconds() < endTS; ts += reportQuantisationInterval.Nanoseconds() {
quantumReport, err := c.reportForQuantum(ctx, userid, reportKeys, ts)
if err != nil {
return report.MakeReport(), err
}
reports = append(reports, quantumReport)
}
// Fetch individual reports for the period after the last quantum
last, err := c.reportsForKeysInRange(ctx, reportKeys, ts, endTS)
if err != nil {
return report.MakeReport(), err
}
reports = append(reports, last...)
span.LogFields(otlog.Int("merging", len(reports)))
return c.merger.Merge(reports), nil
}
// Fetch a merged report either from cache or from store which we put in cache
func (c *awsCollector) reportForQuantum(ctx context.Context, userid string, reportKeys []keyInfo, start int64) (report.Report, error) {
key := fmt.Sprintf("%s:%d", userid, start)
cached, _, err := c.inProcess.FetchReports(ctx, []string{key})
if len(cached) == 1 {
return cached[key], nil
}
reports, err := c.reportsForKeysInRange(ctx, reportKeys, start, start+reportQuantisationInterval.Nanoseconds())
if err != nil {
return report.MakeReport(), err
}
merged := c.merger.Merge(reports)
c.inProcess.StoreReport(key, merged)
return merged, nil
}
// Find the keys relating to this time period then fetch from memcached and/or S3
func (c *awsCollector) reportsForKeysInRange(ctx context.Context, reportKeys []keyInfo, start, end int64) ([]report.Report, error) {
var keys []string
for _, k := range reportKeys {
if k.ts >= start && k.ts < end {
keys = append(keys, k.key)
}
}
if span := opentracing.SpanFromContext(ctx); span != nil {
span.LogFields(otlog.Int("fetching", len(keys)), otlog.Int64("start", start), otlog.Int64("end", end))
}
log.Debugf("Fetching %d reports from %v to %v", len(keys), start, end)
return c.getReports(ctx, keys)
}
func (c *awsCollector) HasReports(ctx context.Context, timestamp time.Time) (bool, error) {
reportKeys, err := c.getReportKeys(ctx, timestamp)
userid, err := c.cfg.UserIDer(ctx)
if err != nil {
return false, err
}
start := timestamp.Add(-c.cfg.Window)
reportKeys, err := c.getReportKeys(ctx, userid, start, timestamp)
return len(reportKeys) > 0, err
}