mirror of
https://github.com/weaveworks/scope.git
synced 2026-03-03 18:20:27 +00:00
Merge pull request #1418 from weaveworks/log-n-report-merger
A caching, log(n) complexity report merger
This commit is contained in:
@@ -1,11 +1,9 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/spaolacci/murmur3"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/weaveworks/scope/common/mtime"
|
||||
@@ -35,10 +33,12 @@ type Collector interface {
|
||||
// Collector receives published reports from multiple producers. It yields a
|
||||
// single merged report, representing all collected reports.
|
||||
type collector struct {
|
||||
mtx sync.Mutex
|
||||
reports []timestampReport
|
||||
window time.Duration
|
||||
cached *report.Report
|
||||
mtx sync.Mutex
|
||||
reports []report.Report
|
||||
timestamps []time.Time
|
||||
window time.Duration
|
||||
cached *report.Report
|
||||
merger Merger
|
||||
waitableCondition
|
||||
}
|
||||
|
||||
@@ -78,6 +78,7 @@ func NewCollector(window time.Duration) Collector {
|
||||
waitableCondition: waitableCondition{
|
||||
waiters: map[chan struct{}]struct{}{},
|
||||
},
|
||||
merger: NewSmartMerger(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -85,8 +86,10 @@ func NewCollector(window time.Duration) Collector {
|
||||
func (c *collector) Add(_ context.Context, rpt report.Report) error {
|
||||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
c.reports = append(c.reports, timestampReport{mtime.Now(), rpt})
|
||||
c.reports = clean(c.reports, c.window)
|
||||
c.reports = append(c.reports, rpt)
|
||||
c.timestamps = append(c.timestamps, mtime.Now())
|
||||
|
||||
c.clean()
|
||||
c.cached = nil
|
||||
if rpt.Shortcut {
|
||||
c.Broadcast()
|
||||
@@ -104,37 +107,27 @@ func (c *collector) Report(_ context.Context) (report.Report, error) {
|
||||
// and there is a cached report, return that.
|
||||
if c.cached != nil && len(c.reports) > 0 {
|
||||
oldest := mtime.Now().Add(-c.window)
|
||||
if c.reports[0].timestamp.After(oldest) {
|
||||
if c.timestamps[0].After(oldest) {
|
||||
return *c.cached, nil
|
||||
}
|
||||
}
|
||||
c.reports = clean(c.reports, c.window)
|
||||
|
||||
rpt := report.MakeReport()
|
||||
id := murmur3.New64()
|
||||
for _, tr := range c.reports {
|
||||
rpt = rpt.Merge(tr.report)
|
||||
id.Write([]byte(tr.report.ID))
|
||||
}
|
||||
rpt.ID = fmt.Sprintf("%x", id.Sum64())
|
||||
c.cached = &rpt
|
||||
return rpt, nil
|
||||
c.clean()
|
||||
return c.merger.Merge(c.reports), nil
|
||||
}
|
||||
|
||||
type timestampReport struct {
|
||||
timestamp time.Time
|
||||
report report.Report
|
||||
}
|
||||
|
||||
func clean(reports []timestampReport, window time.Duration) []timestampReport {
|
||||
func (c *collector) clean() {
|
||||
var (
|
||||
cleaned = make([]timestampReport, 0, len(reports))
|
||||
oldest = mtime.Now().Add(-window)
|
||||
cleanedReports = make([]report.Report, 0, len(c.reports))
|
||||
cleanedTimestamps = make([]time.Time, 0, len(c.timestamps))
|
||||
oldest = mtime.Now().Add(-c.window)
|
||||
)
|
||||
for _, tr := range reports {
|
||||
if tr.timestamp.After(oldest) {
|
||||
cleaned = append(cleaned, tr)
|
||||
for i, r := range c.reports {
|
||||
if c.timestamps[i].After(oldest) {
|
||||
cleanedReports = append(cleanedReports, r)
|
||||
cleanedTimestamps = append(cleanedTimestamps, c.timestamps[i])
|
||||
}
|
||||
}
|
||||
return cleaned
|
||||
c.reports = cleanedReports
|
||||
c.timestamps = cleanedTimestamps
|
||||
}
|
||||
|
||||
138
app/merger.go
Normal file
138
app/merger.go
Normal file
@@ -0,0 +1,138 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
|
||||
"github.com/bluele/gcache"
|
||||
"github.com/spaolacci/murmur3"
|
||||
|
||||
"github.com/weaveworks/scope/report"
|
||||
)
|
||||
|
||||
// Merger is the type for a thing that can merge reports.
|
||||
type Merger interface {
|
||||
Merge([]report.Report) report.Report
|
||||
}
|
||||
|
||||
type dumbMerger struct{}
|
||||
|
||||
// MakeDumbMerger makes a Merger which merges together reports in the simplest possible way.
|
||||
func MakeDumbMerger() Merger {
|
||||
return dumbMerger{}
|
||||
}
|
||||
|
||||
func (dumbMerger) Merge(reports []report.Report) report.Report {
|
||||
rpt := report.MakeReport()
|
||||
id := murmur3.New64()
|
||||
for _, r := range reports {
|
||||
rpt = rpt.Merge(r)
|
||||
id.Write([]byte(r.ID))
|
||||
}
|
||||
rpt.ID = fmt.Sprintf("%x", id.Sum64())
|
||||
return rpt
|
||||
}
|
||||
|
||||
type smartMerger struct {
|
||||
cache gcache.Cache
|
||||
}
|
||||
|
||||
// NewSmartMerger makes a Merger which merges together reports, caching intermediate merges
|
||||
// to accelerate future merges. Idea is to cache pair-wise merged reports, forming a merge
|
||||
// tree. Merging a new report into this tree should be log(N).
|
||||
func NewSmartMerger() Merger {
|
||||
return &smartMerger{
|
||||
cache: gcache.New(1000).LRU().Build(),
|
||||
}
|
||||
}
|
||||
|
||||
type node struct {
|
||||
id uint64
|
||||
rpt report.Report
|
||||
}
|
||||
|
||||
type byID []*node
|
||||
|
||||
func (ns byID) Len() int { return len(ns) }
|
||||
func (ns byID) Swap(i, j int) { ns[i], ns[j] = ns[j], ns[i] }
|
||||
func (ns byID) Less(i, j int) bool { return ns[i].id < ns[j].id }
|
||||
|
||||
func hash(ids ...string) uint64 {
|
||||
id := murmur3.New64()
|
||||
for _, i := range ids {
|
||||
id.Write([]byte(i))
|
||||
}
|
||||
return id.Sum64()
|
||||
}
|
||||
|
||||
func (s *smartMerger) ClearCache() {
|
||||
s.cache.Purge()
|
||||
}
|
||||
|
||||
func (s *smartMerger) Merge(reports []report.Report) report.Report {
|
||||
// Start with a sorted list of leaves.
|
||||
// Note we must dedupe reports with the same ID to ensure the
|
||||
// algorithm below doesn't go into an infinite loop. This is
|
||||
// fine as reports with the same ID are assumed to be the same.
|
||||
nodes := []*node{}
|
||||
seen := map[uint64]struct{}{}
|
||||
for _, r := range reports {
|
||||
id := hash(r.ID)
|
||||
if _, ok := seen[id]; ok {
|
||||
continue
|
||||
}
|
||||
seen[id] = struct{}{}
|
||||
nodes = append(nodes, &node{
|
||||
id: id,
|
||||
rpt: r,
|
||||
})
|
||||
}
|
||||
sort.Sort(byID(nodes))
|
||||
|
||||
// Define how to merge two nodes together. The result of merging
|
||||
// two reports is cached.
|
||||
merge := func(left, right *node) *node {
|
||||
id := hash(left.rpt.ID, right.rpt.ID)
|
||||
|
||||
if result, err := s.cache.Get(id); err == nil {
|
||||
return result.(*node)
|
||||
}
|
||||
|
||||
n := &node{
|
||||
id: id,
|
||||
rpt: report.MakeReport().Merge(left.rpt).Merge(right.rpt),
|
||||
}
|
||||
s.cache.Set(id, n)
|
||||
return n
|
||||
}
|
||||
|
||||
// Define how to reduce n nodes to 1.
|
||||
// Min and max are both inclusive!
|
||||
var reduce func(min, max uint64, nodes []*node) *node
|
||||
reduce = func(min, max uint64, nodes []*node) *node {
|
||||
switch len(nodes) {
|
||||
case 0:
|
||||
return &node{rpt: report.MakeReport()}
|
||||
case 1:
|
||||
return nodes[0]
|
||||
case 2:
|
||||
return merge(nodes[0], nodes[1])
|
||||
}
|
||||
|
||||
partition := min + ((max - min) / 2)
|
||||
index := sort.Search(len(nodes), func(i int) bool {
|
||||
return nodes[i].id > partition
|
||||
})
|
||||
if index == len(nodes) {
|
||||
return reduce(min, partition, nodes)
|
||||
} else if index == 0 {
|
||||
return reduce(partition+1, max, nodes)
|
||||
}
|
||||
left := reduce(min, partition, nodes[:index])
|
||||
right := reduce(partition+1, max, nodes[index:])
|
||||
return merge(left, right)
|
||||
}
|
||||
|
||||
return reduce(0, math.MaxUint64, nodes).rpt
|
||||
}
|
||||
122
app/merger_test.go
Normal file
122
app/merger_test.go
Normal file
@@ -0,0 +1,122 @@
|
||||
package app_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
"github.com/weaveworks/scope/app"
|
||||
"github.com/weaveworks/scope/report"
|
||||
"github.com/weaveworks/scope/test"
|
||||
"github.com/weaveworks/scope/test/reflect"
|
||||
)
|
||||
|
||||
func TestMerger(t *testing.T) {
|
||||
// Use 3 reports to check the pair-wise merging in SmartMerger
|
||||
report1 := report.MakeReport()
|
||||
report1.Endpoint.AddNode(report.MakeNode("foo"))
|
||||
report2 := report.MakeReport()
|
||||
report2.Endpoint.AddNode(report.MakeNode("bar"))
|
||||
report3 := report.MakeReport()
|
||||
report3.Endpoint.AddNode(report.MakeNode("baz"))
|
||||
reports := []report.Report{
|
||||
report1, report2, report3,
|
||||
}
|
||||
want := report.MakeReport()
|
||||
want.Endpoint.
|
||||
AddNode(report.MakeNode("foo")).
|
||||
AddNode(report.MakeNode("bar")).
|
||||
AddNode(report.MakeNode("baz"))
|
||||
|
||||
for _, merger := range []app.Merger{app.MakeDumbMerger(), app.NewSmartMerger()} {
|
||||
// Test the empty list case
|
||||
if have := merger.Merge([]report.Report{}); !reflect.DeepEqual(have, report.MakeReport()) {
|
||||
t.Errorf("Bad merge: %s", test.Diff(have, want))
|
||||
}
|
||||
|
||||
if have := merger.Merge(reports); !reflect.DeepEqual(have, want) {
|
||||
t.Errorf("Bad merge: %s", test.Diff(have, want))
|
||||
}
|
||||
|
||||
// Repeat the above test to ensure caching works
|
||||
if have := merger.Merge(reports); !reflect.DeepEqual(have, want) {
|
||||
t.Errorf("Bad merge: %s", test.Diff(have, want))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSmartMerger(t *testing.T) {
|
||||
// Use 3 reports _WITH SAME ID_
|
||||
report1 := report.MakeReport()
|
||||
report1.Endpoint.AddNode(report.MakeNode("foo"))
|
||||
report1.ID = "foo"
|
||||
report2 := report.MakeReport()
|
||||
report2.Endpoint.AddNode(report.MakeNode("bar"))
|
||||
report2.ID = "foo"
|
||||
report3 := report.MakeReport()
|
||||
report3.Endpoint.AddNode(report.MakeNode("baz"))
|
||||
report3.ID = "foo"
|
||||
reports := []report.Report{
|
||||
report1, report2, report3,
|
||||
}
|
||||
want := report.MakeReport()
|
||||
want.Endpoint.AddNode(report.MakeNode("foo"))
|
||||
|
||||
merger := app.NewSmartMerger()
|
||||
if have := merger.Merge(reports); !reflect.DeepEqual(have, want) {
|
||||
t.Errorf("Bad merge: %s", test.Diff(have, want))
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkSmartMerger(b *testing.B) {
|
||||
benchmarkMerger(b, app.NewSmartMerger(), false)
|
||||
}
|
||||
|
||||
func BenchmarkSmartMergerWithoutCaching(b *testing.B) {
|
||||
benchmarkMerger(b, app.NewSmartMerger(), true)
|
||||
}
|
||||
|
||||
func BenchmarkDumbMerger(b *testing.B) {
|
||||
benchmarkMerger(b, app.MakeDumbMerger(), false)
|
||||
}
|
||||
|
||||
const numHosts = 15
|
||||
|
||||
func benchmarkMerger(b *testing.B, merger app.Merger, clearCache bool) {
|
||||
makeReport := func() report.Report {
|
||||
rpt := report.MakeReport()
|
||||
for i := 0; i < 100; i++ {
|
||||
rpt.Endpoint.AddNode(report.MakeNode(fmt.Sprintf("%x", rand.Int63())))
|
||||
}
|
||||
return rpt
|
||||
}
|
||||
|
||||
reports := []report.Report{}
|
||||
for i := 0; i < numHosts*5; i++ {
|
||||
reports = append(reports, makeReport())
|
||||
}
|
||||
merger.Merge(reports) // prime the cache
|
||||
if clearable, ok := merger.(interface {
|
||||
ClearCache()
|
||||
}); ok && clearCache {
|
||||
clearable.ClearCache()
|
||||
}
|
||||
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
// replace 1/3 of hosts work of reports & merge them all
|
||||
for i := 0; i < numHosts/3; i++ {
|
||||
reports[rand.Intn(len(reports))] = makeReport()
|
||||
}
|
||||
|
||||
merger.Merge(reports)
|
||||
|
||||
if clearable, ok := merger.(interface {
|
||||
ClearCache()
|
||||
}); ok && clearCache {
|
||||
clearable.ClearCache()
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user