diff --git a/app/multitenant/aws_collector.go b/app/multitenant/aws_collector.go index 33a82d28c..11b4a5ae0 100644 --- a/app/multitenant/aws_collector.go +++ b/app/multitenant/aws_collector.go @@ -215,6 +215,27 @@ func (c *awsCollector) flushLoop() { // Range over all users (instances) that have pending reports and send to store func (c *awsCollector) flushPending(ctx context.Context) { instrument.CollectedRequest(ctx, "FlushPending", flushDuration, nil, func(ctx context.Context) error { + type queueEntry struct { + userid string + buf []byte + } + queue := make(chan queueEntry) + const numParallel = 10 + var group sync.WaitGroup + group.Add(numParallel) + // Run n parallel goroutines fetching reports from the queue and flushing them + for i := 0; i < numParallel; i++ { + go func() { + for entry := range queue { + rowKey, colKey, reportKey := calculateReportKeys(entry.userid, time.Now()) + err := c.persistReport(ctx, entry.userid, rowKey, colKey, reportKey, entry.buf) + if err != nil { + log.Errorf("Could not persist combined report: %v", err) + } + } + group.Done() + }() + } c.pending.Range(func(key, value interface{}) bool { userid := key.(string) entry := value.(*pendingEntry) @@ -225,20 +246,18 @@ func (c *awsCollector) flushPending(ctx context.Context) { entry.Unlock() if count > 0 { + // serialise reports on one goroutine to limit CPU usage buf, err := rpt.WriteBinary() if err != nil { log.Errorf("Could not serialise combined report: %v", err) return true } - rowKey, colKey, reportKey := calculateReportKeys(userid, time.Now()) - err = c.persistReport(ctx, userid, rowKey, colKey, reportKey, buf.Bytes()) - if err != nil { - log.Errorf("Could not persist combined report: %v", err) - return true - } + queue <- queueEntry{userid: userid, buf: buf.Bytes()} } return true }) + close(queue) + group.Wait() return nil }) }