mirror of
https://github.com/weaveworks/scope.git
synced 2026-03-03 02:00:43 +00:00
Merge pull request #1827 from weaveworks/parallel-merger
make smartMerger.Merge merge reports in parallel
This commit is contained in:
@@ -2,10 +2,7 @@ package app
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
|
||||
"github.com/bluele/gcache"
|
||||
"github.com/spaolacci/murmur3"
|
||||
|
||||
"github.com/weaveworks/scope/report"
|
||||
@@ -34,91 +31,31 @@ func (dumbMerger) Merge(reports []report.Report) report.Report {
|
||||
return rpt
|
||||
}
|
||||
|
||||
type smartMerger struct {
|
||||
cache gcache.Cache
|
||||
}
|
||||
type smartMerger struct{}
|
||||
|
||||
// NewSmartMerger makes a Merger which merges together reports as
|
||||
// a binary tree of reports. Speed up comes from the fact that
|
||||
// most merges are between small reports.
|
||||
// NewSmartMerger makes a Merger which merges reports in
|
||||
// parallel. Speed up comes from the fact that a) most merges are
|
||||
// between small reports, and b) we take advantage of available cores.
|
||||
func NewSmartMerger() Merger {
|
||||
return smartMerger{}
|
||||
}
|
||||
|
||||
type node struct {
|
||||
id uint64
|
||||
rpt report.Report
|
||||
}
|
||||
|
||||
type byID []*node
|
||||
|
||||
func (ns byID) Len() int { return len(ns) }
|
||||
func (ns byID) Swap(i, j int) { ns[i], ns[j] = ns[j], ns[i] }
|
||||
func (ns byID) Less(i, j int) bool { return ns[i].id < ns[j].id }
|
||||
|
||||
func hash(ids ...string) uint64 {
|
||||
id := murmur3.New64()
|
||||
for _, i := range ids {
|
||||
id.Write([]byte(i))
|
||||
func (smartMerger) Merge(reports []report.Report) report.Report {
|
||||
l := len(reports)
|
||||
switch l {
|
||||
case 0:
|
||||
return report.MakeReport()
|
||||
case 1:
|
||||
return reports[0]
|
||||
}
|
||||
return id.Sum64()
|
||||
}
|
||||
|
||||
func (s smartMerger) Merge(reports []report.Report) report.Report {
|
||||
// Start with a sorted list of leaves.
|
||||
// Note we must dedupe reports with the same ID to ensure the
|
||||
// algorithm below doesn't go into an infinite loop. This is
|
||||
// fine as reports with the same ID are assumed to be the same.
|
||||
nodes := []*node{}
|
||||
seen := map[uint64]struct{}{}
|
||||
c := make(chan report.Report, l)
|
||||
for _, r := range reports {
|
||||
id := hash(r.ID)
|
||||
if _, ok := seen[id]; ok {
|
||||
continue
|
||||
}
|
||||
seen[id] = struct{}{}
|
||||
nodes = append(nodes, &node{
|
||||
id: id,
|
||||
rpt: r,
|
||||
})
|
||||
c <- r
|
||||
}
|
||||
sort.Sort(byID(nodes))
|
||||
|
||||
// Define how to merge two nodes together. The result of merging
|
||||
// two reports is cached.
|
||||
merge := func(left, right *node) *node {
|
||||
return &node{
|
||||
id: hash(left.rpt.ID, right.rpt.ID),
|
||||
rpt: report.MakeReport().Merge(left.rpt).Merge(right.rpt),
|
||||
}
|
||||
for ; l > 1; l-- {
|
||||
go func(left, right report.Report) {
|
||||
c <- left.Merge(right)
|
||||
}(<-c, <-c)
|
||||
}
|
||||
|
||||
// Define how to reduce n nodes to 1.
|
||||
// Min and max are both inclusive!
|
||||
var reduce func(min, max uint64, nodes []*node) *node
|
||||
reduce = func(min, max uint64, nodes []*node) *node {
|
||||
switch len(nodes) {
|
||||
case 0:
|
||||
return &node{rpt: report.MakeReport()}
|
||||
case 1:
|
||||
return nodes[0]
|
||||
case 2:
|
||||
return merge(nodes[0], nodes[1])
|
||||
}
|
||||
|
||||
partition := min + ((max - min) / 2)
|
||||
index := sort.Search(len(nodes), func(i int) bool {
|
||||
return nodes[i].id > partition
|
||||
})
|
||||
if index == len(nodes) {
|
||||
return reduce(min, partition, nodes)
|
||||
} else if index == 0 {
|
||||
return reduce(partition+1, max, nodes)
|
||||
}
|
||||
left := reduce(min, partition, nodes[:index])
|
||||
right := reduce(partition+1, max, nodes[index:])
|
||||
return merge(left, right)
|
||||
}
|
||||
|
||||
return reduce(0, math.MaxUint64, nodes).rpt
|
||||
return <-c
|
||||
}
|
||||
|
||||
@@ -45,29 +45,6 @@ func TestMerger(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSmartMerger(t *testing.T) {
|
||||
// Use 3 reports _WITH SAME ID_
|
||||
report1 := report.MakeReport()
|
||||
report1.Endpoint.AddNode(report.MakeNode("foo"))
|
||||
report1.ID = "foo"
|
||||
report2 := report.MakeReport()
|
||||
report2.Endpoint.AddNode(report.MakeNode("bar"))
|
||||
report2.ID = "foo"
|
||||
report3 := report.MakeReport()
|
||||
report3.Endpoint.AddNode(report.MakeNode("baz"))
|
||||
report3.ID = "foo"
|
||||
reports := []report.Report{
|
||||
report1, report2, report3,
|
||||
}
|
||||
want := report.MakeReport()
|
||||
want.Endpoint.AddNode(report.MakeNode("foo"))
|
||||
|
||||
merger := app.NewSmartMerger()
|
||||
if have := merger.Merge(reports); !reflect.DeepEqual(have, want) {
|
||||
t.Errorf("Bad merge: %s", test.Diff(have, want))
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkSmartMerger(b *testing.B) {
|
||||
benchmarkMerger(b, app.NewSmartMerger())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user