Merge pull request #2983 from weaveworks/2982-cheap-connectedness

cheap probe connectedness api endpoint

Fixes #2982.
This commit is contained in:
Matthias Radestock
2017-12-14 11:04:03 +00:00
committed by GitHub
3 changed files with 75 additions and 32 deletions

View File

@@ -32,6 +32,16 @@ type probeDesc struct {
// Probe handler
func makeProbeHandler(rep Reporter) CtxHandlerFunc {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
r.ParseForm()
if _, sparse := r.Form["sparse"]; sparse {
// if we have reports, we must have connected probes
hasProbes, err := rep.HasReports(ctx, time.Now())
if err != nil {
respondWith(w, http.StatusInternalServerError, err)
}
respondWith(w, http.StatusOK, hasProbes)
return
}
rpt, err := rep.Report(ctx, time.Now())
if err != nil {
respondWith(w, http.StatusInternalServerError, err)

View File

@@ -29,6 +29,7 @@ const reportQuantisationInterval = 3 * time.Second
// interface for parts of the app, and several experimental components.
type Reporter interface {
Report(context.Context, time.Time) (report.Report, error)
HasReports(context.Context, time.Time) (bool, error)
HasHistoricReports() bool
WaitOn(context.Context, chan struct{})
UnWait(context.Context, chan struct{})
@@ -161,6 +162,19 @@ func (c *collector) Report(_ context.Context, timestamp time.Time) (report.Repor
return rpt, nil
}
// HasReports indicates whether the collector contains reports between
// timestamp-app.window and timestamp.
func (c *collector) HasReports(ctx context.Context, timestamp time.Time) (bool, error) {
c.mtx.Lock()
defer c.mtx.Unlock()
if len(c.timestamps) < 1 {
return false, nil
}
return !c.timestamps[0].After(timestamp) && !c.timestamps[len(c.reports)-1].Before(timestamp.Add(-c.window)), nil
}
// HasHistoricReports indicates whether the collector contains reports
// older than now-app.window.
func (c *collector) HasHistoricReports() bool {
@@ -223,6 +237,12 @@ func (c StaticCollector) Report(context.Context, time.Time) (report.Report, erro
return report.Report(c), nil
}
// HasReports indicates whether the collector contains reports between
// timestamp-app.window and timestamp.
func (c StaticCollector) HasReports(context.Context, time.Time) (bool, error) {
return true, nil
}
// HasHistoricReports indicates whether the collector contains reports
// older than now-app.window.
func (c StaticCollector) HasHistoricReports() bool {

View File

@@ -215,8 +215,8 @@ func (c *awsCollector) CreateTables() error {
return err
}
// getReportKeys gets the s3 keys for reports in this range
func (c *awsCollector) getReportKeys(ctx context.Context, userid string, row int64, start, end time.Time) ([]string, error) {
// reportKeysInRange returns the s3 keys for reports in the specified range
func (c *awsCollector) reportKeysInRange(ctx context.Context, userid string, row int64, start, end time.Time) ([]string, error) {
rowKey := fmt.Sprintf("%s-%s", userid, strconv.FormatInt(row, 10))
var resp *dynamodb.QueryOutput
err := instrument.TimeRequestHistogram(ctx, "DynamoDB.Query", dynamoRequestDuration, func(_ context.Context) error {
@@ -264,6 +264,42 @@ func (c *awsCollector) getReportKeys(ctx context.Context, userid string, row int
return result, nil
}
// getReportKeys returns the S3 for reports in the reporting window ending at timestamp.
func (c *awsCollector) getReportKeys(ctx context.Context, timestamp time.Time) ([]string, error) {
var (
end = timestamp
start = end.Add(-c.window)
rowStart = start.UnixNano() / time.Hour.Nanoseconds()
rowEnd = end.UnixNano() / time.Hour.Nanoseconds()
)
userid, err := c.userIDer(ctx)
if err != nil {
return nil, err
}
// Queries will only every span 2 rows max.
var reportKeys []string
if rowStart != rowEnd {
reportKeys1, err := c.reportKeysInRange(ctx, userid, rowStart, start, end)
if err != nil {
return nil, err
}
reportKeys2, err := c.reportKeysInRange(ctx, userid, rowEnd, start, end)
if err != nil {
return nil, err
}
reportKeys = append(reportKeys, reportKeys1...)
reportKeys = append(reportKeys, reportKeys2...)
} else {
if reportKeys, err = c.reportKeysInRange(ctx, userid, rowEnd, start, end); err != nil {
return nil, err
}
}
return reportKeys, nil
}
func (c *awsCollector) getReports(ctx context.Context, reportKeys []string) ([]report.Report, error) {
missing := reportKeys
@@ -300,39 +336,11 @@ func (c *awsCollector) getReports(ctx context.Context, reportKeys []string) ([]r
}
func (c *awsCollector) Report(ctx context.Context, timestamp time.Time) (report.Report, error) {
var (
end = timestamp
start = end.Add(-c.window)
rowStart = start.UnixNano() / time.Hour.Nanoseconds()
rowEnd = end.UnixNano() / time.Hour.Nanoseconds()
userid, err = c.userIDer(ctx)
)
reportKeys, err := c.getReportKeys(ctx, timestamp)
if err != nil {
return report.MakeReport(), err
}
// Queries will only every span 2 rows max.
var reportKeys []string
if rowStart != rowEnd {
reportKeys1, err := c.getReportKeys(ctx, userid, rowStart, start, end)
if err != nil {
return report.MakeReport(), err
}
reportKeys2, err := c.getReportKeys(ctx, userid, rowEnd, start, end)
if err != nil {
return report.MakeReport(), err
}
reportKeys = append(reportKeys, reportKeys1...)
reportKeys = append(reportKeys, reportKeys2...)
} else {
if reportKeys, err = c.getReportKeys(ctx, userid, rowEnd, start, end); err != nil {
return report.MakeReport(), err
}
}
log.Debugf("Fetching %d reports from %v to %v", len(reportKeys), start, end)
log.Debugf("Fetching %d reports to %v", len(reportKeys), timestamp)
reports, err := c.getReports(ctx, reportKeys)
if err != nil {
return report.MakeReport(), err
@@ -341,6 +349,11 @@ func (c *awsCollector) Report(ctx context.Context, timestamp time.Time) (report.
return c.merger.Merge(reports), nil
}
func (c *awsCollector) HasReports(ctx context.Context, timestamp time.Time) (bool, error) {
reportKeys, err := c.getReportKeys(ctx, timestamp)
return len(reportKeys) > 0, err
}
func (c *awsCollector) HasHistoricReports() bool {
return true
}