From 7377945302c4cb334134fa125585001e47dd0e49 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Fri, 27 May 2016 08:57:07 +0100 Subject: [PATCH] Use smart merger in the dynamodb collector. (#1543) --- app/multitenant/dynamo_collector.go | 34 +++++++++++++++++++---------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/app/multitenant/dynamo_collector.go b/app/multitenant/dynamo_collector.go index 3e3cc9149..037b073f2 100644 --- a/app/multitenant/dynamo_collector.go +++ b/app/multitenant/dynamo_collector.go @@ -47,6 +47,7 @@ type dynamoDBCollector struct { userIDer UserIDer db *dynamodb.DynamoDB tableName string + merger app.Merger } // NewDynamoDBCollector the reaper of souls @@ -56,6 +57,7 @@ func NewDynamoDBCollector(config *aws.Config, userIDer UserIDer, tableName strin db: dynamodb.New(session.New(config)), userIDer: userIDer, tableName: tableName, + merger: app.NewSmartMerger(), } } @@ -111,7 +113,7 @@ func (c *dynamoDBCollector) CreateTables() error { return err } -func (c *dynamoDBCollector) getRows(userid string, row int64, start, end time.Time, input report.Report) (report.Report, error) { +func (c *dynamoDBCollector) getRows(userid string, row int64, start, end time.Time) ([]report.Report, error) { rowKey := fmt.Sprintf("%s-%s", userid, strconv.FormatInt(row, 10)) startTime := time.Now() resp, err := c.db.Query(&dynamodb.QueryInput{ @@ -135,10 +137,10 @@ func (c *dynamoDBCollector) getRows(userid string, row int64, start, end time.Ti duration := time.Now().Sub(startTime) if err != nil { dynamoRequestDuration.WithLabelValues("Query", "500").Observe(float64(duration.Nanoseconds())) - return report.MakeReport(), err + return nil, err } dynamoRequestDuration.WithLabelValues("Query", "200").Observe(float64(duration.Nanoseconds())) - result := input + reports := []report.Report{} for _, item := range resp.Items { b := item[reportField].B if b == nil { @@ -156,9 +158,9 @@ func (c *dynamoDBCollector) getRows(userid string, row int64, start, end time.Ti log.Errorf("Failed to decode report: %v", err) continue } - result = result.Merge(rep) + reports = append(reports, rep) } - return result, nil + return reports, nil } func (c *dynamoDBCollector) Report(ctx context.Context) (report.Report, error) { @@ -166,8 +168,8 @@ func (c *dynamoDBCollector) Report(ctx context.Context) (report.Report, error) { now = time.Now() start = now.Add(-15 * time.Second) rowStart, rowEnd = start.UnixNano() / time.Hour.Nanoseconds(), now.UnixNano() / time.Hour.Nanoseconds() - result = report.MakeReport() userid, err = c.userIDer(ctx) + reports []report.Report ) if err != nil { return report.MakeReport(), err @@ -175,14 +177,24 @@ func (c *dynamoDBCollector) Report(ctx context.Context) (report.Report, error) { // Queries will only every span 2 rows max. if rowStart != rowEnd { - if result, err = c.getRows(userid, rowStart, start, now, result); err != nil { + reports1, err := c.getRows(userid, rowStart, start, now) + if err != nil { + return report.MakeReport(), err + } + + reports2, err := c.getRows(userid, rowEnd, start, now) + if err != nil { + return report.MakeReport(), err + } + + reports = append(reports1, reports2...) + } else { + if reports, err = c.getRows(userid, rowEnd, start, now); err != nil { return report.MakeReport(), err } } - if result, err = c.getRows(userid, rowEnd, start, now, result); err != nil { - return report.MakeReport(), err - } - return result, nil + + return c.merger.Merge(reports), nil } func (c *dynamoDBCollector) Add(ctx context.Context, rep report.Report) error {