mirror of
https://github.com/weaveworks/scope.git
synced 2026-03-03 18:20:27 +00:00
Merge pull request #2723 from weaveworks/2345-backoff-on-dynamo-and-s3
Back off when writing to Dynamo and S3
This commit is contained in:
@@ -10,6 +10,7 @@ import (
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/dynamodb"
|
||||
"github.com/bluele/gcache"
|
||||
@@ -355,6 +356,47 @@ func calculateReportKey(rowKey, colKey string) (string, error) {
|
||||
return fmt.Sprintf("%x/%s", rowKeyHash.Sum(nil), colKey), nil
|
||||
}
|
||||
|
||||
func (c *awsCollector) putItemInDynamo(rowKey, colKey, reportKey string) (*dynamodb.PutItemOutput, error) {
|
||||
// Back off on ProvisionedThroughputExceededException
|
||||
const (
|
||||
maxRetries = 5
|
||||
throuputExceededError = "ProvisionedThroughputExceededException"
|
||||
)
|
||||
var (
|
||||
resp *dynamodb.PutItemOutput
|
||||
err error
|
||||
retries = 0
|
||||
backoff = 50 * time.Millisecond
|
||||
)
|
||||
for {
|
||||
resp, err = c.db.PutItem(&dynamodb.PutItemInput{
|
||||
TableName: aws.String(c.tableName),
|
||||
Item: map[string]*dynamodb.AttributeValue{
|
||||
hourField: {
|
||||
S: aws.String(rowKey),
|
||||
},
|
||||
tsField: {
|
||||
N: aws.String(colKey),
|
||||
},
|
||||
reportField: {
|
||||
S: aws.String(reportKey),
|
||||
},
|
||||
},
|
||||
ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal),
|
||||
})
|
||||
if err != nil && retries < maxRetries {
|
||||
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == throuputExceededError {
|
||||
time.Sleep(backoff)
|
||||
retries++
|
||||
backoff *= 2
|
||||
continue
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (c *awsCollector) Add(ctx context.Context, rep report.Report, buf []byte) error {
|
||||
userid, err := c.userIDer(ctx)
|
||||
if err != nil {
|
||||
@@ -392,23 +434,10 @@ func (c *awsCollector) Add(ctx context.Context, rep report.Report, buf []byte) e
|
||||
var resp *dynamodb.PutItemOutput
|
||||
err = instrument.TimeRequestHistogram(ctx, "DynamoDB.PutItem", dynamoRequestDuration, func(_ context.Context) error {
|
||||
var err error
|
||||
resp, err = c.db.PutItem(&dynamodb.PutItemInput{
|
||||
TableName: aws.String(c.tableName),
|
||||
Item: map[string]*dynamodb.AttributeValue{
|
||||
hourField: {
|
||||
S: aws.String(rowKey),
|
||||
},
|
||||
tsField: {
|
||||
N: aws.String(colKey),
|
||||
},
|
||||
reportField: {
|
||||
S: aws.String(reportKey),
|
||||
},
|
||||
},
|
||||
ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal),
|
||||
})
|
||||
resp, err = c.putItemInDynamo(rowKey, colKey, reportKey)
|
||||
return err
|
||||
})
|
||||
|
||||
if resp.ConsumedCapacity != nil {
|
||||
dynamoConsumedCapacity.WithLabelValues("PutItem").
|
||||
Add(float64(*resp.ConsumedCapacity.CapacityUnits))
|
||||
|
||||
Reference in New Issue
Block a user