From f66d79d4273c8c5c4b3797cab13b04f6a39bdf89 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Tue, 18 Jul 2017 15:28:08 +0000 Subject: [PATCH 1/3] Backoff when writing to Dynamo and S3 --- app/multitenant/aws_collector.go | 53 ++++++++++++++++++++++---------- 1 file changed, 37 insertions(+), 16 deletions(-) diff --git a/app/multitenant/aws_collector.go b/app/multitenant/aws_collector.go index d7288d1bb..e30e56b2d 100644 --- a/app/multitenant/aws_collector.go +++ b/app/multitenant/aws_collector.go @@ -355,6 +355,19 @@ func calculateReportKey(rowKey, colKey string) (string, error) { return fmt.Sprintf("%x/%s", rowKeyHash.Sum(nil), colKey), nil } +func withBackoff(f func() error) error { + retries := 0 + backoff := 50 * time.Millisecond + err := f() + for err != nil && retries <= 5 { + time.Sleep(backoff) + err = f() + retries += 1 + backoff *= 2 + } + return err +} + func (c *awsCollector) Add(ctx context.Context, rep report.Report, buf []byte) error { userid, err := c.userIDer(ctx) if err != nil { @@ -368,7 +381,12 @@ func (c *awsCollector) Add(ctx context.Context, rep report.Report, buf []byte) e return err } - reportSize, err := c.s3.StoreReportBytes(ctx, reportKey, buf) + var reportSize int + err = withBackoff(func() error { + var err error + reportSize, err = c.s3.StoreReportBytes(ctx, reportKey, buf) + return err + }) if err != nil { return err } @@ -391,24 +409,27 @@ func (c *awsCollector) Add(ctx context.Context, rep report.Report, buf []byte) e var resp *dynamodb.PutItemOutput err = instrument.TimeRequestHistogram(ctx, "DynamoDB.PutItem", dynamoRequestDuration, func(_ context.Context) error { - var err error - resp, err = c.db.PutItem(&dynamodb.PutItemInput{ - TableName: aws.String(c.tableName), - Item: map[string]*dynamodb.AttributeValue{ - hourField: { - S: aws.String(rowKey), + return withBackoff(func() error { + var err error + resp, err = c.db.PutItem(&dynamodb.PutItemInput{ + TableName: aws.String(c.tableName), + Item: map[string]*dynamodb.AttributeValue{ + hourField: { + S: aws.String(rowKey), + }, + tsField: { + N: aws.String(colKey), + }, + reportField: { + S: aws.String(reportKey), + }, }, - tsField: { - N: aws.String(colKey), - }, - reportField: { - S: aws.String(reportKey), - }, - }, - ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal), + ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal), + }) + return err }) - return err }) + if resp.ConsumedCapacity != nil { dynamoConsumedCapacity.WithLabelValues("PutItem"). Add(float64(*resp.ConsumedCapacity.CapacityUnits)) From c6d5a6a6469cdb18fdbb40727dc9b82df8e1fc05 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Thu, 20 Jul 2017 10:05:48 +0000 Subject: [PATCH 2/3] Make linter happy --- app/multitenant/aws_collector.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/multitenant/aws_collector.go b/app/multitenant/aws_collector.go index e30e56b2d..0fba49ce6 100644 --- a/app/multitenant/aws_collector.go +++ b/app/multitenant/aws_collector.go @@ -362,7 +362,7 @@ func withBackoff(f func() error) error { for err != nil && retries <= 5 { time.Sleep(backoff) err = f() - retries += 1 + retries++ backoff *= 2 } return err From 7b4b410f4a962b5bedb2b4071d5b86c3619ad349 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Thu, 20 Jul 2017 11:01:44 +0000 Subject: [PATCH 3/3] Review Feedback --- app/multitenant/aws_collector.go | 78 ++++++++++++++++++-------------- 1 file changed, 43 insertions(+), 35 deletions(-) diff --git a/app/multitenant/aws_collector.go b/app/multitenant/aws_collector.go index 0fba49ce6..9bba88633 100644 --- a/app/multitenant/aws_collector.go +++ b/app/multitenant/aws_collector.go @@ -10,6 +10,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/bluele/gcache" @@ -355,17 +356,45 @@ func calculateReportKey(rowKey, colKey string) (string, error) { return fmt.Sprintf("%x/%s", rowKeyHash.Sum(nil), colKey), nil } -func withBackoff(f func() error) error { - retries := 0 - backoff := 50 * time.Millisecond - err := f() - for err != nil && retries <= 5 { - time.Sleep(backoff) - err = f() - retries++ - backoff *= 2 +func (c *awsCollector) putItemInDynamo(rowKey, colKey, reportKey string) (*dynamodb.PutItemOutput, error) { + // Back off on ProvisionedThroughputExceededException + const ( + maxRetries = 5 + throuputExceededError = "ProvisionedThroughputExceededException" + ) + var ( + resp *dynamodb.PutItemOutput + err error + retries = 0 + backoff = 50 * time.Millisecond + ) + for { + resp, err = c.db.PutItem(&dynamodb.PutItemInput{ + TableName: aws.String(c.tableName), + Item: map[string]*dynamodb.AttributeValue{ + hourField: { + S: aws.String(rowKey), + }, + tsField: { + N: aws.String(colKey), + }, + reportField: { + S: aws.String(reportKey), + }, + }, + ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal), + }) + if err != nil && retries < maxRetries { + if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == throuputExceededError { + time.Sleep(backoff) + retries++ + backoff *= 2 + continue + } + } + break } - return err + return resp, err } func (c *awsCollector) Add(ctx context.Context, rep report.Report, buf []byte) error { @@ -381,12 +410,7 @@ func (c *awsCollector) Add(ctx context.Context, rep report.Report, buf []byte) e return err } - var reportSize int - err = withBackoff(func() error { - var err error - reportSize, err = c.s3.StoreReportBytes(ctx, reportKey, buf) - return err - }) + reportSize, err := c.s3.StoreReportBytes(ctx, reportKey, buf) if err != nil { return err } @@ -409,25 +433,9 @@ func (c *awsCollector) Add(ctx context.Context, rep report.Report, buf []byte) e var resp *dynamodb.PutItemOutput err = instrument.TimeRequestHistogram(ctx, "DynamoDB.PutItem", dynamoRequestDuration, func(_ context.Context) error { - return withBackoff(func() error { - var err error - resp, err = c.db.PutItem(&dynamodb.PutItemInput{ - TableName: aws.String(c.tableName), - Item: map[string]*dynamodb.AttributeValue{ - hourField: { - S: aws.String(rowKey), - }, - tsField: { - N: aws.String(colKey), - }, - reportField: { - S: aws.String(reportKey), - }, - }, - ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal), - }) - return err - }) + var err error + resp, err = c.putItemInDynamo(rowKey, colKey, reportKey) + return err }) if resp.ConsumedCapacity != nil {