diff --git a/app/multitenant/aws_collector.go b/app/multitenant/aws_collector.go index d7288d1bb..9bba88633 100644 --- a/app/multitenant/aws_collector.go +++ b/app/multitenant/aws_collector.go @@ -10,6 +10,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/bluele/gcache" @@ -355,6 +356,47 @@ func calculateReportKey(rowKey, colKey string) (string, error) { return fmt.Sprintf("%x/%s", rowKeyHash.Sum(nil), colKey), nil } +func (c *awsCollector) putItemInDynamo(rowKey, colKey, reportKey string) (*dynamodb.PutItemOutput, error) { + // Back off on ProvisionedThroughputExceededException + const ( + maxRetries = 5 + throuputExceededError = "ProvisionedThroughputExceededException" + ) + var ( + resp *dynamodb.PutItemOutput + err error + retries = 0 + backoff = 50 * time.Millisecond + ) + for { + resp, err = c.db.PutItem(&dynamodb.PutItemInput{ + TableName: aws.String(c.tableName), + Item: map[string]*dynamodb.AttributeValue{ + hourField: { + S: aws.String(rowKey), + }, + tsField: { + N: aws.String(colKey), + }, + reportField: { + S: aws.String(reportKey), + }, + }, + ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal), + }) + if err != nil && retries < maxRetries { + if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == throuputExceededError { + time.Sleep(backoff) + retries++ + backoff *= 2 + continue + } + } + break + } + return resp, err +} + func (c *awsCollector) Add(ctx context.Context, rep report.Report, buf []byte) error { userid, err := c.userIDer(ctx) if err != nil { @@ -392,23 +434,10 @@ func (c *awsCollector) Add(ctx context.Context, rep report.Report, buf []byte) e var resp *dynamodb.PutItemOutput err = instrument.TimeRequestHistogram(ctx, "DynamoDB.PutItem", dynamoRequestDuration, func(_ context.Context) error { var err error - resp, err = c.db.PutItem(&dynamodb.PutItemInput{ - TableName: aws.String(c.tableName), - Item: map[string]*dynamodb.AttributeValue{ - hourField: { - S: aws.String(rowKey), - }, - tsField: { - N: aws.String(colKey), - }, - reportField: { - S: aws.String(reportKey), - }, - }, - ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal), - }) + resp, err = c.putItemInDynamo(rowKey, colKey, reportKey) return err }) + if resp.ConsumedCapacity != nil { dynamoConsumedCapacity.WithLabelValues("PutItem"). Add(float64(*resp.ConsumedCapacity.CapacityUnits))