Parallelise sending merged reports to store

Writes to DynamoDB and S3 can be done in parallel, which will reduce
the overall flush time.
This commit is contained in:
Bryan Boreham
2020-04-13 15:28:43 +00:00
parent 2629d13780
commit 9a739fda46

View File

@@ -215,6 +215,27 @@ func (c *awsCollector) flushLoop() {
// Range over all users (instances) that have pending reports and send to store
func (c *awsCollector) flushPending(ctx context.Context) {
instrument.CollectedRequest(ctx, "FlushPending", flushDuration, nil, func(ctx context.Context) error {
type queueEntry struct {
userid string
buf []byte
}
queue := make(chan queueEntry)
const numParallel = 10
var group sync.WaitGroup
group.Add(numParallel)
// Run n parallel goroutines fetching reports from the queue and flushing them
for i := 0; i < numParallel; i++ {
go func() {
for entry := range queue {
rowKey, colKey, reportKey := calculateReportKeys(entry.userid, time.Now())
err := c.persistReport(ctx, entry.userid, rowKey, colKey, reportKey, entry.buf)
if err != nil {
log.Errorf("Could not persist combined report: %v", err)
}
}
group.Done()
}()
}
c.pending.Range(func(key, value interface{}) bool {
userid := key.(string)
entry := value.(*pendingEntry)
@@ -225,20 +246,18 @@ func (c *awsCollector) flushPending(ctx context.Context) {
entry.Unlock()
if count > 0 {
// serialise reports on one goroutine to limit CPU usage
buf, err := rpt.WriteBinary()
if err != nil {
log.Errorf("Could not serialise combined report: %v", err)
return true
}
rowKey, colKey, reportKey := calculateReportKeys(userid, time.Now())
err = c.persistReport(ctx, userid, rowKey, colKey, reportKey, buf.Bytes())
if err != nil {
log.Errorf("Could not persist combined report: %v", err)
return true
}
queue <- queueEntry{userid: userid, buf: buf.Bytes()}
}
return true
})
close(queue)
group.Wait()
return nil
})
}