Get non-cached reports in parallel

This commit is contained in:
Jonathan Lange
2016-06-07 19:23:45 +01:00
parent 0907cdfa0d
commit 48fc985a3e

View File

@@ -240,21 +240,38 @@ func (c *dynamoDBCollector) getCached(reportKeys []string) ([]report.Report, []s
return foundReports, missingReports
}
// Fetch multiple reports in parallel from S3.
func (c *dynamoDBCollector) getNonCached(reportKeys []string) ([]report.Report, error) {
reports := []report.Report{}
type result struct {
key string
report *report.Report
err error
}
ch := make(chan result, len(reportKeys))
for _, reportKey := range reportKeys {
rep, err := c.getNonCachedReport(reportKey)
if err != nil {
return nil, err
go func(reportKey string) {
r := result{key: reportKey}
r.report, r.err = c.getNonCachedReport(reportKey)
ch <- r
}(reportKey)
}
reports := []report.Report{}
for range reportKeys {
r := <-ch
if r.err != nil {
return nil, r.err
}
reports = append(reports, rep)
c.cache.Set(reportKey, rep)
reports = append(reports, *r.report)
c.cache.Set(r.key, *r.report)
}
return reports, nil
}
// Fetch a single report from S3.
func (c *dynamoDBCollector) getNonCachedReport(reportKey string) (report.Report, error) {
func (c *dynamoDBCollector) getNonCachedReport(reportKey string) (*report.Report, error) {
var resp *s3.GetObjectOutput
err := timeRequest("Get", s3RequestDuration, func() error {
var err error
@@ -265,17 +282,17 @@ func (c *dynamoDBCollector) getNonCachedReport(reportKey string) (report.Report,
return err
})
if err != nil {
return report.Report{}, err
return nil, err
}
reader, err := gzip.NewReader(resp.Body)
if err != nil {
return report.Report{}, err
return nil, err
}
rep := report.MakeReport()
if err := codec.NewDecoder(reader, &codec.MsgpackHandle{}).Decode(&rep); err != nil {
return report.Report{}, err
return nil, err
}
return rep, nil
return &rep, nil
}
func (c *dynamoDBCollector) getReports(userid string, row int64, start, end time.Time) ([]report.Report, error) {