mirror of
https://github.com/weaveworks/scope.git
synced 2026-05-06 01:08:03 +00:00
Merge pull request #1288 from weaveworks/1286-collector-expire-cache
Correctly expire the cache in the collector
This commit is contained in:
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/spaolacci/murmur3"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/weaveworks/scope/common/mtime"
|
||||
"github.com/weaveworks/scope/report"
|
||||
)
|
||||
|
||||
@@ -80,13 +81,11 @@ func NewCollector(window time.Duration) Collector {
|
||||
}
|
||||
}
|
||||
|
||||
var now = time.Now
|
||||
|
||||
// Add adds a report to the collector's internal state. It implements Adder.
|
||||
func (c *collector) Add(_ context.Context, rpt report.Report) error {
|
||||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
c.reports = append(c.reports, timestampReport{now(), rpt})
|
||||
c.reports = append(c.reports, timestampReport{mtime.Now(), rpt})
|
||||
c.reports = clean(c.reports, c.window)
|
||||
c.cached = nil
|
||||
if rpt.Shortcut {
|
||||
@@ -104,8 +103,8 @@ func (c *collector) Report(_ context.Context) (report.Report, error) {
|
||||
// If the oldest report is still within range,
|
||||
// and there is a cached report, return that.
|
||||
if c.cached != nil && len(c.reports) > 0 {
|
||||
oldest := now().Add(-c.window)
|
||||
if c.reports[0].timestamp.Before(oldest) {
|
||||
oldest := mtime.Now().Add(-c.window)
|
||||
if c.reports[0].timestamp.After(oldest) {
|
||||
return *c.cached, nil
|
||||
}
|
||||
}
|
||||
@@ -130,13 +129,12 @@ type timestampReport struct {
|
||||
func clean(reports []timestampReport, window time.Duration) []timestampReport {
|
||||
var (
|
||||
cleaned = make([]timestampReport, 0, len(reports))
|
||||
oldest = now().Add(-window)
|
||||
oldest = mtime.Now().Add(-window)
|
||||
)
|
||||
for _, tr := range reports {
|
||||
if tr.timestamp.Before(oldest) {
|
||||
continue
|
||||
if tr.timestamp.After(oldest) {
|
||||
cleaned = append(cleaned, tr)
|
||||
}
|
||||
cleaned = append(cleaned, tr)
|
||||
}
|
||||
return cleaned
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/weaveworks/scope/app"
|
||||
"github.com/weaveworks/scope/common/mtime"
|
||||
"github.com/weaveworks/scope/report"
|
||||
"github.com/weaveworks/scope/test"
|
||||
"github.com/weaveworks/scope/test/reflect"
|
||||
@@ -53,6 +54,47 @@ func TestCollector(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestCollectorExpire(t *testing.T) {
|
||||
now := time.Now()
|
||||
mtime.NowForce(now)
|
||||
defer mtime.NowReset()
|
||||
|
||||
ctx := context.Background()
|
||||
window := 10 * time.Second
|
||||
c := app.NewCollector(window)
|
||||
|
||||
// 1st check the collector is empty
|
||||
have, err := c.Report(ctx)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if want := report.MakeReport(); !reflect.DeepEqual(want, have) {
|
||||
t.Error(test.Diff(want, have))
|
||||
}
|
||||
|
||||
// Now check an added report is returned
|
||||
r1 := report.MakeReport()
|
||||
r1.Endpoint.AddNode("foo", report.MakeNode())
|
||||
c.Add(ctx, r1)
|
||||
have, err = c.Report(ctx)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if want := r1; !reflect.DeepEqual(want, have) {
|
||||
t.Error(test.Diff(want, have))
|
||||
}
|
||||
|
||||
// Finally move time forward to expire the report
|
||||
mtime.NowForce(now.Add(window))
|
||||
have, err = c.Report(ctx)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if want := report.MakeReport(); !reflect.DeepEqual(want, have) {
|
||||
t.Error(test.Diff(want, have))
|
||||
}
|
||||
}
|
||||
|
||||
func TestCollectorWait(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
window := time.Millisecond
|
||||
|
||||
Reference in New Issue
Block a user