diff --git a/extras/scanner/main.go b/extras/scanner/main.go index 25421a46d..bf2eee79f 100644 --- a/extras/scanner/main.go +++ b/extras/scanner/main.go @@ -29,30 +29,25 @@ import ( ) type scanner struct { - startHour int - stopHour int - segments int - deleters int - deleteBatchSize int - tableName string - bucketName string - address string + startHour int + stopHour int + segments int + tableName string + bucketName string + address string writeLimiter *rate.Limiter queryLimiter *rate.Limiter dynamoDB *dynamodb.DynamoDB s3 *s3.S3 - - // Readers send items on this chan to be deleted - delete chan map[string]*dynamodb.AttributeValue - retry chan map[string]*dynamodb.AttributeValue - // Deleters read batches of items from this chan - batched chan []*dynamodb.WriteRequest } const ( s3deleteBatchSize = 250 + + // See http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Limits.html. + dynamoDBMaxWriteBatchSize = 25 ) var ( @@ -105,8 +100,6 @@ func main() { flag.IntVar(&scanner.startHour, "start-hour", 406848, "Hour number to start") flag.IntVar(&scanner.stopHour, "stop-hour", 0, "Hour number to stop (0 for current hour)") flag.IntVar(&scanner.segments, "segments", 1, "Number of segments to read in parallel") - flag.IntVar(&scanner.deleters, "deleters", 1, "Number of deleters to run in parallel") - flag.IntVar(&scanner.deleteBatchSize, "delete-batch-size", 25, "Number of delete requests to batch up") flag.StringVar(&scanner.address, "address", ":6060", "Address to listen on, for profiling, etc.") flag.StringVar(&orgsFile, "delete-orgs-file", "", "File containing IDs of orgs to delete") flag.StringVar(&loglevel, "log-level", "info", "Debug level: debug, info, warning, error") @@ -163,25 +156,6 @@ func main() { session := session.New(dynamoDBConfig) scanner.dynamoDB = dynamodb.New(session) - // Unbuffered chan so we can tell when batcher has received all items - scanner.delete = make(chan map[string]*dynamodb.AttributeValue) - scanner.retry = make(chan map[string]*dynamodb.AttributeValue, 100) - scanner.batched = make(chan []*dynamodb.WriteRequest) - - var deleteGroup sync.WaitGroup - deleteGroup.Add(1 + scanner.deleters) - var pending sync.WaitGroup - go func() { - scanner.batcher(&pending) - deleteGroup.Done() - }() - for i := 0; i < scanner.deleters; i++ { - go func() { - scanner.deleteLoop(&pending) - deleteGroup.Done() - }() - } - totals := newSummary() var orgWait sync.WaitGroup @@ -195,15 +169,6 @@ func main() { } orgWait.Wait() - // Ensure that batcher has received all items so it won't call Add() any more - scanner.delete <- nil - // Wait for pending items to be sent to DynamoDB - pending.Wait() - // Close chans to signal deleter(s) and batcher to terminate - close(scanner.batched) - close(scanner.retry) - deleteGroup.Wait() - fmt.Printf("\n") totals.print() } @@ -239,7 +204,11 @@ func (sc *scanner) deleteOneOrgHour(ctx context.Context, org string, hour int) i } wait.Add(1) go func(start, end int) { - sc.deleteFromS3AndDynamoDB(ctx, keys[start:end]) + sc.deleteFromS3(ctx, keys[start:end]) + for _, key := range keys { + delete(key, reportField) // not part of key in dynamoDB + } + sc.deleteFromDynamoDB(keys) wait.Done() }(start, end) } @@ -248,7 +217,7 @@ func (sc *scanner) deleteOneOrgHour(ctx context.Context, org string, hour int) i return len(keys) } -func (sc *scanner) deleteFromS3AndDynamoDB(ctx context.Context, keys []map[string]*dynamodb.AttributeValue) { +func (sc *scanner) deleteFromS3(ctx context.Context, keys []map[string]*dynamodb.AttributeValue) { // Build multiple-object delete request for S3 d := &s3.Delete{} for _, key := range keys { @@ -267,11 +236,6 @@ func (sc *scanner) deleteFromS3AndDynamoDB(ctx context.Context, keys []map[strin if err != nil { log.Errorf("S3 delete: err %s", err) } - // Now send to be deleted from DynamoDB - for _, key := range keys { - delete(key, reportField) // not part of key in dynamoDB - sc.delete <- key - } } func queryDynamo(ctx context.Context, db *dynamodb.DynamoDB, tableName, userid string, row int64) ([]map[string]*dynamodb.AttributeValue, error) { @@ -347,99 +311,53 @@ func throttled(err error) bool { return ok && (awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException) } -func (sc *scanner) deleteLoop(pending *sync.WaitGroup) { - for { - batch, ok := <-sc.batched - if !ok { - return +// input is map from table to attribute-value +func (sc *scanner) deleteFromDynamoDB(batch []map[string]*dynamodb.AttributeValue) { + var requests []*dynamodb.WriteRequest + + for _, keyMap := range batch { + requests = append(requests, &dynamodb.WriteRequest{ + DeleteRequest: &dynamodb.DeleteRequest{ + Key: keyMap, + }, + }) + } + log.Debug("about to delete", len(batch)) + var ret *dynamodb.BatchWriteItemOutput + var err error + for len(requests) > 0 { + numToSend := len(requests) + if numToSend > dynamoDBMaxWriteBatchSize { + numToSend = dynamoDBMaxWriteBatchSize } - log.Debug("about to delete", len(batch)) - var ret *dynamodb.BatchWriteItemOutput - var err error instrument.TimeRequestHistogram(context.Background(), "DynamoDB.Delete", dynamoRequestDuration, func(_ context.Context) error { ret, err = sc.dynamoDB.BatchWriteItem(&dynamodb.BatchWriteItemInput{ RequestItems: map[string][]*dynamodb.WriteRequest{ - sc.tableName: batch, + sc.tableName: requests[:numToSend], }, ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal), }) return err }) - if ret.ConsumedCapacity != nil { - for _, cc := range ret.ConsumedCapacity { - dynamoConsumedCapacity.WithLabelValues("BatchWriteItem"). - Add(float64(*cc.CapacityUnits)) - } + for _, cc := range ret.ConsumedCapacity { + dynamoConsumedCapacity.WithLabelValues("BatchWriteItem"). + Add(float64(*cc.CapacityUnits)) } if err != nil { if throttled(err) { sc.writeLimiter.WaitN(context.Background(), len(batch)) - // Send the whole request back into the batcher - for _, item := range batch { - sc.retry <- item.DeleteRequest.Key - } + // Back round the loop without taking anything away from the batch + continue } else { log.Error("msg", "unable to delete", "err", err) - pending.Add(-len(batch)) - } - continue - } - count := 0 - if len(ret.UnprocessedItems) > 0 { - sc.writeLimiter.WaitN(context.Background(), len(ret.UnprocessedItems)) - } - // Send unprocessed items back into the batcher - for _, items := range ret.UnprocessedItems { - count += len(items) - for _, item := range items { - sc.retry <- item.DeleteRequest.Key + // drop this batch } } - pending.Add(-(len(batch) - count)) - } -} - -// Receive individual requests, and batch them up into groups to send to DynamoDB -func (sc *scanner) batcher(pending *sync.WaitGroup) { - finished := false - var requests []*dynamodb.WriteRequest - for { - // We will allow in new data if the queue isn't too long - var in chan map[string]*dynamodb.AttributeValue - if len(requests) < 1000 { - in = sc.delete - } - // We will send out a batch if the queue is big enough, or if we're finishing - var out chan []*dynamodb.WriteRequest - outlen := len(requests) - if len(requests) >= sc.deleteBatchSize { - out = sc.batched - outlen = sc.deleteBatchSize - } else if finished && len(requests) > 0 { - out = sc.batched - } - var keyMap map[string]*dynamodb.AttributeValue - var ok bool - select { - case keyMap = <-in: - if keyMap == nil { // Nil used as interlock to know we received all previous values - finished = true - } else { - pending.Add(1) - } - case keyMap, ok = <-sc.retry: - if !ok { - return - } - case out <- requests[:outlen]: - requests = requests[outlen:] - } - if keyMap != nil { - requests = append(requests, &dynamodb.WriteRequest{ - DeleteRequest: &dynamodb.DeleteRequest{ - Key: keyMap, - }, - }) + requests = requests[numToSend:] + // Add unprocessed items onto the end of requests + for _, v := range ret.UnprocessedItems { + sc.writeLimiter.WaitN(context.Background(), len(v)) + requests = append(requests, v...) } } }