From 902ba88479a34dae21a3a9d1b6c0f25782228051 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 23 Aug 2016 22:58:07 +0100 Subject: [PATCH] make smartMerger.Merge merge reports in parallel for reduced latency --- app/merger.go | 99 +++++++++------------------------------------- app/merger_test.go | 23 ----------- 2 files changed, 18 insertions(+), 104 deletions(-) diff --git a/app/merger.go b/app/merger.go index f901d2e41..be4d149b2 100644 --- a/app/merger.go +++ b/app/merger.go @@ -2,10 +2,7 @@ package app import ( "fmt" - "math" - "sort" - "github.com/bluele/gcache" "github.com/spaolacci/murmur3" "github.com/weaveworks/scope/report" @@ -34,91 +31,31 @@ func (dumbMerger) Merge(reports []report.Report) report.Report { return rpt } -type smartMerger struct { - cache gcache.Cache -} +type smartMerger struct{} -// NewSmartMerger makes a Merger which merges together reports as -// a binary tree of reports. Speed up comes from the fact that -// most merges are between small reports. +// NewSmartMerger makes a Merger which merges reports in +// parallel. Speed up comes from the fact that a) most merges are +// between small reports, and b) we take advantage of available cores. func NewSmartMerger() Merger { return smartMerger{} } -type node struct { - id uint64 - rpt report.Report -} - -type byID []*node - -func (ns byID) Len() int { return len(ns) } -func (ns byID) Swap(i, j int) { ns[i], ns[j] = ns[j], ns[i] } -func (ns byID) Less(i, j int) bool { return ns[i].id < ns[j].id } - -func hash(ids ...string) uint64 { - id := murmur3.New64() - for _, i := range ids { - id.Write([]byte(i)) +func (smartMerger) Merge(reports []report.Report) report.Report { + l := len(reports) + switch l { + case 0: + return report.MakeReport() + case 1: + return reports[0] } - return id.Sum64() -} - -func (s smartMerger) Merge(reports []report.Report) report.Report { - // Start with a sorted list of leaves. - // Note we must dedupe reports with the same ID to ensure the - // algorithm below doesn't go into an infinite loop. This is - // fine as reports with the same ID are assumed to be the same. - nodes := []*node{} - seen := map[uint64]struct{}{} + c := make(chan report.Report, l) for _, r := range reports { - id := hash(r.ID) - if _, ok := seen[id]; ok { - continue - } - seen[id] = struct{}{} - nodes = append(nodes, &node{ - id: id, - rpt: r, - }) + c <- r } - sort.Sort(byID(nodes)) - - // Define how to merge two nodes together. The result of merging - // two reports is cached. - merge := func(left, right *node) *node { - return &node{ - id: hash(left.rpt.ID, right.rpt.ID), - rpt: report.MakeReport().Merge(left.rpt).Merge(right.rpt), - } + for ; l > 1; l-- { + go func(left, right report.Report) { + c <- left.Merge(right) + }(<-c, <-c) } - - // Define how to reduce n nodes to 1. - // Min and max are both inclusive! - var reduce func(min, max uint64, nodes []*node) *node - reduce = func(min, max uint64, nodes []*node) *node { - switch len(nodes) { - case 0: - return &node{rpt: report.MakeReport()} - case 1: - return nodes[0] - case 2: - return merge(nodes[0], nodes[1]) - } - - partition := min + ((max - min) / 2) - index := sort.Search(len(nodes), func(i int) bool { - return nodes[i].id > partition - }) - if index == len(nodes) { - return reduce(min, partition, nodes) - } else if index == 0 { - return reduce(partition+1, max, nodes) - } - left := reduce(min, partition, nodes[:index]) - right := reduce(partition+1, max, nodes[index:]) - return merge(left, right) - } - - return reduce(0, math.MaxUint64, nodes).rpt + return <-c } diff --git a/app/merger_test.go b/app/merger_test.go index 94227594a..574d673c6 100644 --- a/app/merger_test.go +++ b/app/merger_test.go @@ -45,29 +45,6 @@ func TestMerger(t *testing.T) { } } -func TestSmartMerger(t *testing.T) { - // Use 3 reports _WITH SAME ID_ - report1 := report.MakeReport() - report1.Endpoint.AddNode(report.MakeNode("foo")) - report1.ID = "foo" - report2 := report.MakeReport() - report2.Endpoint.AddNode(report.MakeNode("bar")) - report2.ID = "foo" - report3 := report.MakeReport() - report3.Endpoint.AddNode(report.MakeNode("baz")) - report3.ID = "foo" - reports := []report.Report{ - report1, report2, report3, - } - want := report.MakeReport() - want.Endpoint.AddNode(report.MakeNode("foo")) - - merger := app.NewSmartMerger() - if have := merger.Merge(reports); !reflect.DeepEqual(have, want) { - t.Errorf("Bad merge: %s", test.Diff(have, want)) - } -} - func BenchmarkSmartMerger(b *testing.B) { benchmarkMerger(b, app.NewSmartMerger()) }