Add 'just scan' option

This commit is contained in:
Bryan Boreham
2019-04-09 09:46:19 +00:00
parent b23f7a7b0d
commit 388d273e5f
2 changed files with 132 additions and 0 deletions

View File

@@ -92,6 +92,10 @@ func main() {
scanner scanner
loglevel string
justBigScan bool
segments int
pagesPerDot int
)
flag.StringVar(&collectorURL, "app.collector", "local", "Collector to use (local, dynamodb, or file/directory)")
@@ -107,6 +111,10 @@ func main() {
flag.StringVar(&orgsFile, "delete-orgs-file", "", "File containing IDs of orgs to delete")
flag.StringVar(&loglevel, "log-level", "info", "Debug level: debug, info, warning, error")
flag.BoolVar(&justBigScan, "big-scan", false, "If true, just scan the whole index and print summaries")
flag.IntVar(&segments, "segments", 1, "Number of segments to run in parallel")
flag.IntVar(&pagesPerDot, "pages-per-dot", 10, "Print a dot per N pages in DynamoDB (0 to disable)")
flag.Parse()
level, err := log.ParseLevel(loglevel)
@@ -135,6 +143,11 @@ func main() {
checkFatal(http.ListenAndServe(scanner.address, nil))
}()
if justBigScan {
bigScan(dynamoDBConfig, segments, pagesPerDot)
return
}
orgs := []string{}
if orgsFile != "" {
content, err := ioutil.ReadFile(orgsFile)

119
extras/scanner/scan.go Normal file
View File

@@ -0,0 +1,119 @@
package main
import (
"fmt"
"sync"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
)
type bigScanner struct {
segments int
tableName string
dynamoDB *dynamodb.DynamoDB
}
var (
pagesPerDot int
)
func bigScan(config *aws.Config, segments, ppd int) {
var (
scanner bigScanner
)
scanner.segments = segments
pagesPerDot = ppd // hack!
session := session.New(config)
scanner.dynamoDB = dynamodb.New(session)
var group sync.WaitGroup
group.Add(scanner.segments)
totals := newBigSummary()
var totalsMutex sync.Mutex
for segment := 0; segment < scanner.segments; segment++ {
go func(segment int) {
handler := newHandler()
err := scanner.segmentScan(segment, handler)
checkFatal(err)
totalsMutex.Lock()
totals.accumulate(handler.summary)
totalsMutex.Unlock()
group.Done()
}(segment)
}
group.Wait()
fmt.Printf("\n")
totals.print()
}
func (sc bigScanner) segmentScan(segment int, handler handler) error {
input := &dynamodb.ScanInput{
TableName: aws.String(sc.tableName),
ProjectionExpression: aws.String("#h"),
// Need to do this because "hour" is a reserved word
ExpressionAttributeNames: map[string]*string{"#h": aws.String(hourField)},
Segment: aws.Int64(int64(segment)),
TotalSegments: aws.Int64(int64(sc.segments)),
//ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal),
}
return sc.dynamoDB.ScanPages(input, handler.handlePage)
}
type bigSummary struct {
counts map[string]int
}
func newBigSummary() bigSummary {
return bigSummary{
counts: map[string]int{},
}
}
func (s *bigSummary) accumulate(b bigSummary) {
for k, v := range b.counts {
s.counts[k] += v
}
}
func (s bigSummary) print() {
for user, count := range s.counts {
fmt.Printf("%s, %d\n", user, count)
}
}
type handler struct {
pages int
summary bigSummary
}
func newHandler() handler {
return handler{
summary: newBigSummary(),
}
}
func (h *handler) reset() {
h.summary.counts = map[string]int{}
}
func (h *handler) handlePage(page *dynamodb.ScanOutput, lastPage bool) bool {
h.pages++
if pagesPerDot > 0 && h.pages%pagesPerDot == 0 {
fmt.Printf(".")
}
for _, m := range page.Items {
v := m[hourField]
if v.S != nil {
key := *v.S
h.summary.counts[key]++
}
}
return true
}