Refactor: extract function to run through all orgs

This commit is contained in:
Bryan Boreham
2019-05-09 08:06:13 +00:00
parent a3638c58a5
commit 8d0c7169d5

View File

@@ -29,8 +29,6 @@ import (
)
type scanner struct {
startHour int
stopHour int
segments int
deleters int
deleteBatchSize int
@@ -88,7 +86,9 @@ func main() {
queryRateLimit float64
writeRateLimit float64
orgsFile string
orgsFile string
startHour int
stopHour int
scanner scanner
loglevel string
@@ -98,8 +98,8 @@ func main() {
flag.StringVar(&s3URL, "app.collector.s3", "local", "S3 URL to use (when collector is dynamodb)")
flag.Float64Var(&queryRateLimit, "query-rate-limit", 100, "Max rate to query DynamoDB")
flag.Float64Var(&writeRateLimit, "write-rate-limit", 100, "Rate-limit on throttling from DynamoDB")
flag.IntVar(&scanner.startHour, "start-hour", 406848, "Hour number to start")
flag.IntVar(&scanner.stopHour, "stop-hour", 0, "Hour number to stop (0 for current hour)")
flag.IntVar(&startHour, "start-hour", 406848, "Hour number to start")
flag.IntVar(&stopHour, "stop-hour", 0, "Hour number to stop (0 for current hour)")
flag.IntVar(&scanner.segments, "segments", 1, "Number of segments to read in parallel")
flag.IntVar(&scanner.deleters, "deleters", 1, "Number of deleters to run in parallel")
flag.IntVar(&scanner.deleteBatchSize, "delete-batch-size", 25, "Number of delete requests to batch up")
@@ -135,17 +135,6 @@ func main() {
checkFatal(http.ListenAndServe(scanner.address, nil))
}()
orgs := []string{}
if orgsFile != "" {
content, err := ioutil.ReadFile(orgsFile)
checkFatal(err)
orgs = strings.Fields(string(content))
}
if scanner.stopHour == 0 {
scanner.stopHour = int(time.Now().Unix() / int64(time.Hour/time.Second))
}
dynamoDBConfig = dynamoDBConfig.WithMaxRetries(0) // We do our own retries, with a rate-limiter
session := session.New(dynamoDBConfig)
scanner.dynamoDB = dynamodb.New(session)
@@ -171,16 +160,9 @@ func main() {
totals := newSummary()
var orgWait sync.WaitGroup
orgWait.Add(len(orgs))
for _, org := range orgs {
go func(org string) {
scanner.processOrg(context.Background(), org)
orgWait.Done()
}(org)
if orgsFile != "" {
scanner.processOrgsFile(context.Background(), orgsFile, startHour, stopHour)
}
orgWait.Wait()
// Ensure that batcher has received all items so it won't call Add() any more
scanner.delete <- nil
@@ -195,9 +177,30 @@ func main() {
totals.print()
}
func (sc *scanner) processOrg(ctx context.Context, org string) {
func (sc *scanner) processOrgsFile(ctx context.Context, orgsFile string, startHour, stopHour int) {
if stopHour == 0 {
stopHour = int(time.Now().Unix() / int64(time.Hour/time.Second))
}
content, err := ioutil.ReadFile(orgsFile)
checkFatal(err)
orgs := strings.Fields(string(content))
var orgWait sync.WaitGroup
orgWait.Add(len(orgs))
for _, org := range orgs {
go func(org string) {
sc.processOrg(ctx, org, startHour, stopHour)
orgWait.Done()
}(org)
}
orgWait.Wait()
}
func (sc *scanner) processOrg(ctx context.Context, org string, startHour, stopHour int) {
deleted := 0
for hour := sc.startHour; hour <= sc.stopHour; hour++ {
for hour := startHour; hour <= stopHour; hour++ {
keys := sc.getKeys(ctx, org, hour)
if len(keys) > 0 {
log.Debugf("deleting org: %s hour: %d num: %d", org, hour, len(keys))