mirror of
https://github.com/weaveworks/scope.git
synced 2026-03-03 18:20:27 +00:00
Use smart merger in the dynamodb collector. (#1543)
This commit is contained in:
@@ -47,6 +47,7 @@ type dynamoDBCollector struct {
|
||||
userIDer UserIDer
|
||||
db *dynamodb.DynamoDB
|
||||
tableName string
|
||||
merger app.Merger
|
||||
}
|
||||
|
||||
// NewDynamoDBCollector the reaper of souls
|
||||
@@ -56,6 +57,7 @@ func NewDynamoDBCollector(config *aws.Config, userIDer UserIDer, tableName strin
|
||||
db: dynamodb.New(session.New(config)),
|
||||
userIDer: userIDer,
|
||||
tableName: tableName,
|
||||
merger: app.NewSmartMerger(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -111,7 +113,7 @@ func (c *dynamoDBCollector) CreateTables() error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *dynamoDBCollector) getRows(userid string, row int64, start, end time.Time, input report.Report) (report.Report, error) {
|
||||
func (c *dynamoDBCollector) getRows(userid string, row int64, start, end time.Time) ([]report.Report, error) {
|
||||
rowKey := fmt.Sprintf("%s-%s", userid, strconv.FormatInt(row, 10))
|
||||
startTime := time.Now()
|
||||
resp, err := c.db.Query(&dynamodb.QueryInput{
|
||||
@@ -135,10 +137,10 @@ func (c *dynamoDBCollector) getRows(userid string, row int64, start, end time.Ti
|
||||
duration := time.Now().Sub(startTime)
|
||||
if err != nil {
|
||||
dynamoRequestDuration.WithLabelValues("Query", "500").Observe(float64(duration.Nanoseconds()))
|
||||
return report.MakeReport(), err
|
||||
return nil, err
|
||||
}
|
||||
dynamoRequestDuration.WithLabelValues("Query", "200").Observe(float64(duration.Nanoseconds()))
|
||||
result := input
|
||||
reports := []report.Report{}
|
||||
for _, item := range resp.Items {
|
||||
b := item[reportField].B
|
||||
if b == nil {
|
||||
@@ -156,9 +158,9 @@ func (c *dynamoDBCollector) getRows(userid string, row int64, start, end time.Ti
|
||||
log.Errorf("Failed to decode report: %v", err)
|
||||
continue
|
||||
}
|
||||
result = result.Merge(rep)
|
||||
reports = append(reports, rep)
|
||||
}
|
||||
return result, nil
|
||||
return reports, nil
|
||||
}
|
||||
|
||||
func (c *dynamoDBCollector) Report(ctx context.Context) (report.Report, error) {
|
||||
@@ -166,8 +168,8 @@ func (c *dynamoDBCollector) Report(ctx context.Context) (report.Report, error) {
|
||||
now = time.Now()
|
||||
start = now.Add(-15 * time.Second)
|
||||
rowStart, rowEnd = start.UnixNano() / time.Hour.Nanoseconds(), now.UnixNano() / time.Hour.Nanoseconds()
|
||||
result = report.MakeReport()
|
||||
userid, err = c.userIDer(ctx)
|
||||
reports []report.Report
|
||||
)
|
||||
if err != nil {
|
||||
return report.MakeReport(), err
|
||||
@@ -175,14 +177,24 @@ func (c *dynamoDBCollector) Report(ctx context.Context) (report.Report, error) {
|
||||
|
||||
// Queries will only every span 2 rows max.
|
||||
if rowStart != rowEnd {
|
||||
if result, err = c.getRows(userid, rowStart, start, now, result); err != nil {
|
||||
reports1, err := c.getRows(userid, rowStart, start, now)
|
||||
if err != nil {
|
||||
return report.MakeReport(), err
|
||||
}
|
||||
|
||||
reports2, err := c.getRows(userid, rowEnd, start, now)
|
||||
if err != nil {
|
||||
return report.MakeReport(), err
|
||||
}
|
||||
|
||||
reports = append(reports1, reports2...)
|
||||
} else {
|
||||
if reports, err = c.getRows(userid, rowEnd, start, now); err != nil {
|
||||
return report.MakeReport(), err
|
||||
}
|
||||
}
|
||||
if result, err = c.getRows(userid, rowEnd, start, now, result); err != nil {
|
||||
return report.MakeReport(), err
|
||||
}
|
||||
return result, nil
|
||||
|
||||
return c.merger.Merge(reports), nil
|
||||
}
|
||||
|
||||
func (c *dynamoDBCollector) Add(ctx context.Context, rep report.Report) error {
|
||||
|
||||
Reference in New Issue
Block a user