Refactor: extract functions for hour query and parallel delete

This commit is contained in:
Bryan Boreham
2019-05-09 07:58:13 +00:00
parent b23f7a7b0d
commit ebccc414b1

View File

@@ -198,39 +198,45 @@ func main() {
func (sc *scanner) processOrg(ctx context.Context, org string) {
deleted := 0
for hour := sc.startHour; hour <= sc.stopHour; hour++ {
var keys []map[string]*dynamodb.AttributeValue
for {
sc.queryLimiter.Wait(ctx)
var err error
keys, err = queryDynamo(ctx, sc.dynamoDB, sc.tableName, org, int64(hour))
if throttled(err) {
continue
}
checkFatal(err)
break
}
var wait sync.WaitGroup
keys := sc.getKeys(ctx, org, hour)
if len(keys) > 0 {
log.Debugf("deleting org: %s hour: %d num: %d", org, hour, len(keys))
}
for start := 0; start < len(keys); start += s3deleteBatchSize {
end := start + s3deleteBatchSize
if end > len(keys) {
end = len(keys)
}
wait.Add(1)
go func(start, end int) {
sc.deleteFromS3AndDynamoDB(ctx, keys[start:end])
wait.Done()
}(start, end)
}
wait.Wait()
sc.parallelDelete(ctx, keys)
deleted += len(keys)
s3ItemsDeleted.Add(float64(len(keys)))
}
log.Infof("done %s: %d", org, deleted)
}
func (sc *scanner) getKeys(ctx context.Context, org string, hour int) []map[string]*dynamodb.AttributeValue {
for {
sc.queryLimiter.Wait(ctx)
keys, err := queryDynamo(ctx, sc.dynamoDB, sc.tableName, org, int64(hour))
if throttled(err) {
continue
}
checkFatal(err)
return keys
}
}
func (sc *scanner) parallelDelete(ctx context.Context, keys []map[string]*dynamodb.AttributeValue) {
var wait sync.WaitGroup
for start := 0; start < len(keys); start += s3deleteBatchSize {
end := start + s3deleteBatchSize
if end > len(keys) {
end = len(keys)
}
wait.Add(1)
go func(start, end int) {
sc.deleteFromS3AndDynamoDB(ctx, keys[start:end])
wait.Done()
}(start, end)
}
wait.Wait()
s3ItemsDeleted.Add(float64(len(keys)))
}
func (sc *scanner) deleteFromS3AndDynamoDB(ctx context.Context, keys []map[string]*dynamodb.AttributeValue) {
// Build multiple-object delete request for S3
d := &s3.Delete{}