From ebccc414b1ae595b432a25c506eb3366fd9beef0 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Thu, 9 May 2019 07:58:13 +0000 Subject: [PATCH] Refactor: extract functions for hour query and parallel delete --- extras/scanner/main.go | 56 +++++++++++++++++++++++------------------- 1 file changed, 31 insertions(+), 25 deletions(-) diff --git a/extras/scanner/main.go b/extras/scanner/main.go index c35fe1400..35e2e8424 100644 --- a/extras/scanner/main.go +++ b/extras/scanner/main.go @@ -198,39 +198,45 @@ func main() { func (sc *scanner) processOrg(ctx context.Context, org string) { deleted := 0 for hour := sc.startHour; hour <= sc.stopHour; hour++ { - var keys []map[string]*dynamodb.AttributeValue - for { - sc.queryLimiter.Wait(ctx) - var err error - keys, err = queryDynamo(ctx, sc.dynamoDB, sc.tableName, org, int64(hour)) - if throttled(err) { - continue - } - checkFatal(err) - break - } - var wait sync.WaitGroup + keys := sc.getKeys(ctx, org, hour) if len(keys) > 0 { log.Debugf("deleting org: %s hour: %d num: %d", org, hour, len(keys)) } - for start := 0; start < len(keys); start += s3deleteBatchSize { - end := start + s3deleteBatchSize - if end > len(keys) { - end = len(keys) - } - wait.Add(1) - go func(start, end int) { - sc.deleteFromS3AndDynamoDB(ctx, keys[start:end]) - wait.Done() - }(start, end) - } - wait.Wait() + sc.parallelDelete(ctx, keys) deleted += len(keys) - s3ItemsDeleted.Add(float64(len(keys))) } log.Infof("done %s: %d", org, deleted) } +func (sc *scanner) getKeys(ctx context.Context, org string, hour int) []map[string]*dynamodb.AttributeValue { + for { + sc.queryLimiter.Wait(ctx) + keys, err := queryDynamo(ctx, sc.dynamoDB, sc.tableName, org, int64(hour)) + if throttled(err) { + continue + } + checkFatal(err) + return keys + } +} + +func (sc *scanner) parallelDelete(ctx context.Context, keys []map[string]*dynamodb.AttributeValue) { + var wait sync.WaitGroup + for start := 0; start < len(keys); start += s3deleteBatchSize { + end := start + s3deleteBatchSize + if end > len(keys) { + end = len(keys) + } + wait.Add(1) + go func(start, end int) { + sc.deleteFromS3AndDynamoDB(ctx, keys[start:end]) + wait.Done() + }(start, end) + } + wait.Wait() + s3ItemsDeleted.Add(float64(len(keys))) +} + func (sc *scanner) deleteFromS3AndDynamoDB(ctx context.Context, keys []map[string]*dynamodb.AttributeValue) { // Build multiple-object delete request for S3 d := &s3.Delete{}