From 9a739fda464b7b30b6fc17810b46c031fe67711b Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Mon, 13 Apr 2020 15:28:43 +0000 Subject: [PATCH] Parallelise sending merged reports to store Writes to DynamoDB and S3 can be done in parallel, which will reduce the overall flush time. --- app/multitenant/aws_collector.go | 31 +++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/app/multitenant/aws_collector.go b/app/multitenant/aws_collector.go index 33a82d28c..11b4a5ae0 100644 --- a/app/multitenant/aws_collector.go +++ b/app/multitenant/aws_collector.go @@ -215,6 +215,27 @@ func (c *awsCollector) flushLoop() { // Range over all users (instances) that have pending reports and send to store func (c *awsCollector) flushPending(ctx context.Context) { instrument.CollectedRequest(ctx, "FlushPending", flushDuration, nil, func(ctx context.Context) error { + type queueEntry struct { + userid string + buf []byte + } + queue := make(chan queueEntry) + const numParallel = 10 + var group sync.WaitGroup + group.Add(numParallel) + // Run n parallel goroutines fetching reports from the queue and flushing them + for i := 0; i < numParallel; i++ { + go func() { + for entry := range queue { + rowKey, colKey, reportKey := calculateReportKeys(entry.userid, time.Now()) + err := c.persistReport(ctx, entry.userid, rowKey, colKey, reportKey, entry.buf) + if err != nil { + log.Errorf("Could not persist combined report: %v", err) + } + } + group.Done() + }() + } c.pending.Range(func(key, value interface{}) bool { userid := key.(string) entry := value.(*pendingEntry) @@ -225,20 +246,18 @@ func (c *awsCollector) flushPending(ctx context.Context) { entry.Unlock() if count > 0 { + // serialise reports on one goroutine to limit CPU usage buf, err := rpt.WriteBinary() if err != nil { log.Errorf("Could not serialise combined report: %v", err) return true } - rowKey, colKey, reportKey := calculateReportKeys(userid, time.Now()) - err = c.persistReport(ctx, userid, rowKey, colKey, reportKey, buf.Bytes()) - if err != nil { - log.Errorf("Could not persist combined report: %v", err) - return true - } + queue <- queueEntry{userid: userid, buf: buf.Bytes()} } return true }) + close(queue) + group.Wait() return nil }) }