Refactor: delete from DynamoDB in same goroutine as S3

This commit is contained in:
Bryan Boreham
2019-06-04 16:07:22 +00:00
parent bc59b03f67
commit 18cd43d2bf

View File

@@ -29,30 +29,25 @@ import (
)
type scanner struct {
startHour int
stopHour int
segments int
deleters int
deleteBatchSize int
tableName string
bucketName string
address string
startHour int
stopHour int
segments int
tableName string
bucketName string
address string
writeLimiter *rate.Limiter
queryLimiter *rate.Limiter
dynamoDB *dynamodb.DynamoDB
s3 *s3.S3
// Readers send items on this chan to be deleted
delete chan map[string]*dynamodb.AttributeValue
retry chan map[string]*dynamodb.AttributeValue
// Deleters read batches of items from this chan
batched chan []*dynamodb.WriteRequest
}
const (
s3deleteBatchSize = 250
// See http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Limits.html.
dynamoDBMaxWriteBatchSize = 25
)
var (
@@ -105,8 +100,6 @@ func main() {
flag.IntVar(&scanner.startHour, "start-hour", 406848, "Hour number to start")
flag.IntVar(&scanner.stopHour, "stop-hour", 0, "Hour number to stop (0 for current hour)")
flag.IntVar(&scanner.segments, "segments", 1, "Number of segments to read in parallel")
flag.IntVar(&scanner.deleters, "deleters", 1, "Number of deleters to run in parallel")
flag.IntVar(&scanner.deleteBatchSize, "delete-batch-size", 25, "Number of delete requests to batch up")
flag.StringVar(&scanner.address, "address", ":6060", "Address to listen on, for profiling, etc.")
flag.StringVar(&orgsFile, "delete-orgs-file", "", "File containing IDs of orgs to delete")
flag.StringVar(&loglevel, "log-level", "info", "Debug level: debug, info, warning, error")
@@ -163,25 +156,6 @@ func main() {
session := session.New(dynamoDBConfig)
scanner.dynamoDB = dynamodb.New(session)
// Unbuffered chan so we can tell when batcher has received all items
scanner.delete = make(chan map[string]*dynamodb.AttributeValue)
scanner.retry = make(chan map[string]*dynamodb.AttributeValue, 100)
scanner.batched = make(chan []*dynamodb.WriteRequest)
var deleteGroup sync.WaitGroup
deleteGroup.Add(1 + scanner.deleters)
var pending sync.WaitGroup
go func() {
scanner.batcher(&pending)
deleteGroup.Done()
}()
for i := 0; i < scanner.deleters; i++ {
go func() {
scanner.deleteLoop(&pending)
deleteGroup.Done()
}()
}
totals := newSummary()
var orgWait sync.WaitGroup
@@ -195,15 +169,6 @@ func main() {
}
orgWait.Wait()
// Ensure that batcher has received all items so it won't call Add() any more
scanner.delete <- nil
// Wait for pending items to be sent to DynamoDB
pending.Wait()
// Close chans to signal deleter(s) and batcher to terminate
close(scanner.batched)
close(scanner.retry)
deleteGroup.Wait()
fmt.Printf("\n")
totals.print()
}
@@ -239,7 +204,11 @@ func (sc *scanner) deleteOneOrgHour(ctx context.Context, org string, hour int) i
}
wait.Add(1)
go func(start, end int) {
sc.deleteFromS3AndDynamoDB(ctx, keys[start:end])
sc.deleteFromS3(ctx, keys[start:end])
for _, key := range keys {
delete(key, reportField) // not part of key in dynamoDB
}
sc.deleteFromDynamoDB(keys)
wait.Done()
}(start, end)
}
@@ -248,7 +217,7 @@ func (sc *scanner) deleteOneOrgHour(ctx context.Context, org string, hour int) i
return len(keys)
}
func (sc *scanner) deleteFromS3AndDynamoDB(ctx context.Context, keys []map[string]*dynamodb.AttributeValue) {
func (sc *scanner) deleteFromS3(ctx context.Context, keys []map[string]*dynamodb.AttributeValue) {
// Build multiple-object delete request for S3
d := &s3.Delete{}
for _, key := range keys {
@@ -267,11 +236,6 @@ func (sc *scanner) deleteFromS3AndDynamoDB(ctx context.Context, keys []map[strin
if err != nil {
log.Errorf("S3 delete: err %s", err)
}
// Now send to be deleted from DynamoDB
for _, key := range keys {
delete(key, reportField) // not part of key in dynamoDB
sc.delete <- key
}
}
func queryDynamo(ctx context.Context, db *dynamodb.DynamoDB, tableName, userid string, row int64) ([]map[string]*dynamodb.AttributeValue, error) {
@@ -347,99 +311,53 @@ func throttled(err error) bool {
return ok && (awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException)
}
func (sc *scanner) deleteLoop(pending *sync.WaitGroup) {
for {
batch, ok := <-sc.batched
if !ok {
return
// input is map from table to attribute-value
func (sc *scanner) deleteFromDynamoDB(batch []map[string]*dynamodb.AttributeValue) {
var requests []*dynamodb.WriteRequest
for _, keyMap := range batch {
requests = append(requests, &dynamodb.WriteRequest{
DeleteRequest: &dynamodb.DeleteRequest{
Key: keyMap,
},
})
}
log.Debug("about to delete", len(batch))
var ret *dynamodb.BatchWriteItemOutput
var err error
for len(requests) > 0 {
numToSend := len(requests)
if numToSend > dynamoDBMaxWriteBatchSize {
numToSend = dynamoDBMaxWriteBatchSize
}
log.Debug("about to delete", len(batch))
var ret *dynamodb.BatchWriteItemOutput
var err error
instrument.TimeRequestHistogram(context.Background(), "DynamoDB.Delete", dynamoRequestDuration, func(_ context.Context) error {
ret, err = sc.dynamoDB.BatchWriteItem(&dynamodb.BatchWriteItemInput{
RequestItems: map[string][]*dynamodb.WriteRequest{
sc.tableName: batch,
sc.tableName: requests[:numToSend],
},
ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal),
})
return err
})
if ret.ConsumedCapacity != nil {
for _, cc := range ret.ConsumedCapacity {
dynamoConsumedCapacity.WithLabelValues("BatchWriteItem").
Add(float64(*cc.CapacityUnits))
}
for _, cc := range ret.ConsumedCapacity {
dynamoConsumedCapacity.WithLabelValues("BatchWriteItem").
Add(float64(*cc.CapacityUnits))
}
if err != nil {
if throttled(err) {
sc.writeLimiter.WaitN(context.Background(), len(batch))
// Send the whole request back into the batcher
for _, item := range batch {
sc.retry <- item.DeleteRequest.Key
}
// Back round the loop without taking anything away from the batch
continue
} else {
log.Error("msg", "unable to delete", "err", err)
pending.Add(-len(batch))
}
continue
}
count := 0
if len(ret.UnprocessedItems) > 0 {
sc.writeLimiter.WaitN(context.Background(), len(ret.UnprocessedItems))
}
// Send unprocessed items back into the batcher
for _, items := range ret.UnprocessedItems {
count += len(items)
for _, item := range items {
sc.retry <- item.DeleteRequest.Key
// drop this batch
}
}
pending.Add(-(len(batch) - count))
}
}
// Receive individual requests, and batch them up into groups to send to DynamoDB
func (sc *scanner) batcher(pending *sync.WaitGroup) {
finished := false
var requests []*dynamodb.WriteRequest
for {
// We will allow in new data if the queue isn't too long
var in chan map[string]*dynamodb.AttributeValue
if len(requests) < 1000 {
in = sc.delete
}
// We will send out a batch if the queue is big enough, or if we're finishing
var out chan []*dynamodb.WriteRequest
outlen := len(requests)
if len(requests) >= sc.deleteBatchSize {
out = sc.batched
outlen = sc.deleteBatchSize
} else if finished && len(requests) > 0 {
out = sc.batched
}
var keyMap map[string]*dynamodb.AttributeValue
var ok bool
select {
case keyMap = <-in:
if keyMap == nil { // Nil used as interlock to know we received all previous values
finished = true
} else {
pending.Add(1)
}
case keyMap, ok = <-sc.retry:
if !ok {
return
}
case out <- requests[:outlen]:
requests = requests[outlen:]
}
if keyMap != nil {
requests = append(requests, &dynamodb.WriteRequest{
DeleteRequest: &dynamodb.DeleteRequest{
Key: keyMap,
},
})
requests = requests[numToSend:]
// Add unprocessed items onto the end of requests
for _, v := range ret.UnprocessedItems {
sc.writeLimiter.WaitN(context.Background(), len(v))
requests = append(requests, v...)
}
}
}