From 953106f80fedbd6b6389f9eb3e46a6dcfe9f8b06 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 7 Sep 2015 20:35:46 +0000 Subject: [PATCH 1/9] Warn when generating the report takes too long. --- probe/main.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/probe/main.go b/probe/main.go index a43b99a84..aedf73404 100644 --- a/probe/main.go +++ b/probe/main.go @@ -186,6 +186,7 @@ func main() { r = report.MakeReport() case <-spyTick: + start := time.Now() if err := processCache.Update(); err != nil { log.Printf("error reading processes: %v", err) } @@ -198,6 +199,10 @@ func main() { } r = Apply(r, taggers) + if took := time.Since(start); took > *spyInterval { + log.Printf("report generation took too long (%s)", took) + } + case <-quit: return } From d411afd9167cc0d72cb361f0c33b0ca32ef0b998 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 8 Sep 2015 16:53:33 +0000 Subject: [PATCH 2/9] WithNode is a CPU hog. --- experimental/demoprobe/main.go | 10 +++++----- experimental/genreport/generate.go | 10 +++++----- probe/endpoint/reporter.go | 8 ++++---- probe/sniff/sniffer.go | 4 ++-- report/topology.go | 15 ++++++++------- 5 files changed, 24 insertions(+), 23 deletions(-) diff --git a/experimental/demoprobe/main.go b/experimental/demoprobe/main.go index c6513382d..b49370a22 100644 --- a/experimental/demoprobe/main.go +++ b/experimental/demoprobe/main.go @@ -84,14 +84,14 @@ func demoReport(nodeCount int) report.Report { ) // Endpoint topology - r.Endpoint = r.Endpoint.WithNode(srcPortID, report.MakeNode().WithMetadata(map[string]string{ + r.Endpoint = r.Endpoint.AddNode(srcPortID, report.MakeNode().WithMetadata(map[string]string{ process.PID: "4000", "name": c.srcProc, "domain": "node-" + src, }).WithEdge(dstPortID, report.EdgeMetadata{ MaxConnCountTCP: newu64(uint64(rand.Intn(100) + 10)), })) - r.Endpoint = r.Endpoint.WithNode(dstPortID, report.MakeNode().WithMetadata(map[string]string{ + r.Endpoint = r.Endpoint.AddNode(dstPortID, report.MakeNode().WithMetadata(map[string]string{ process.PID: "4000", "name": c.dstProc, "domain": "node-" + dst, @@ -100,15 +100,15 @@ func demoReport(nodeCount int) report.Report { })) // Address topology - r.Address = r.Address.WithNode(srcAddressID, report.MakeNode().WithMetadata(map[string]string{ + r.Address = r.Address.AddNode(srcAddressID, report.MakeNode().WithMetadata(map[string]string{ docker.Name: src, }).WithAdjacent(dstAddressID)) - r.Address = r.Address.WithNode(srcAddressID, report.MakeNode().WithMetadata(map[string]string{ + r.Address = r.Address.AddNode(srcAddressID, report.MakeNode().WithMetadata(map[string]string{ docker.Name: dst, }).WithAdjacent(srcAddressID)) // Host data - r.Host = r.Host.WithNode("hostX", report.MakeNodeWith(map[string]string{ + r.Host = r.Host.AddNode("hostX", report.MakeNodeWith(map[string]string{ "ts": time.Now().UTC().Format(time.RFC3339Nano), "host_name": "host-x", "local_networks": localNet.String(), diff --git a/experimental/genreport/generate.go b/experimental/genreport/generate.go index bb5d19e12..bc90cd78e 100644 --- a/experimental/genreport/generate.go +++ b/experimental/genreport/generate.go @@ -64,14 +64,14 @@ func DemoReport(nodeCount int) report.Report { ) // Endpoint topology - r.Endpoint = r.Endpoint.WithNode(srcPortID, report.MakeNode().WithMetadata(map[string]string{ + r.Endpoint = r.Endpoint.AddNode(srcPortID, report.MakeNode().WithMetadata(map[string]string{ "pid": "4000", "name": c.srcProc, "domain": "node-" + src, }).WithEdge(dstPortID, report.EdgeMetadata{ MaxConnCountTCP: newu64(uint64(rand.Intn(100) + 10)), })) - r.Endpoint = r.Endpoint.WithNode(dstPortID, report.MakeNode().WithMetadata(map[string]string{ + r.Endpoint = r.Endpoint.AddNode(dstPortID, report.MakeNode().WithMetadata(map[string]string{ "pid": "4000", "name": c.dstProc, "domain": "node-" + dst, @@ -80,15 +80,15 @@ func DemoReport(nodeCount int) report.Report { })) // Address topology - r.Address = r.Address.WithNode(srcAddressID, report.MakeNode().WithMetadata(map[string]string{ + r.Address = r.Address.AddNode(srcAddressID, report.MakeNode().WithMetadata(map[string]string{ "name": src, }).WithAdjacent(dstAddressID)) - r.Address = r.Address.WithNode(dstAddressID, report.MakeNode().WithMetadata(map[string]string{ + r.Address = r.Address.AddNode(dstAddressID, report.MakeNode().WithMetadata(map[string]string{ "name": dst, }).WithAdjacent(srcAddressID)) // Host data - r.Host = r.Host.WithNode("hostX", report.MakeNodeWith(map[string]string{ + r.Host = r.Host.AddNode("hostX", report.MakeNodeWith(map[string]string{ "ts": time.Now().UTC().Format(time.RFC3339Nano), "host_name": "host-x", "local_networks": localNet.String(), diff --git a/probe/endpoint/reporter.go b/probe/endpoint/reporter.go index 095b3b085..6df5eb229 100644 --- a/probe/endpoint/reporter.go +++ b/probe/endpoint/reporter.go @@ -178,8 +178,8 @@ func (r *Reporter) addConnection(rpt *report.Report, localAddr, remoteAddr strin }) } - rpt.Address = rpt.Address.WithNode(localAddressNodeID, localNode) - rpt.Address = rpt.Address.WithNode(remoteAddressNodeID, remoteNode) + rpt.Address = rpt.Address.AddNode(localAddressNodeID, localNode) + rpt.Address = rpt.Address.AddNode(remoteAddressNodeID, remoteNode) } // Update endpoint topology @@ -225,8 +225,8 @@ func (r *Reporter) addConnection(rpt *report.Report, localAddr, remoteAddr strin if extraRemoteNode != nil { remoteNode = remoteNode.Merge(*extraRemoteNode) } - rpt.Endpoint = rpt.Endpoint.WithNode(localEndpointNodeID, localNode) - rpt.Endpoint = rpt.Endpoint.WithNode(remoteEndpointNodeID, remoteNode) + rpt.Endpoint = rpt.Endpoint.AddNode(localEndpointNodeID, localNode) + rpt.Endpoint = rpt.Endpoint.AddNode(remoteEndpointNodeID, remoteNode) } } diff --git a/probe/sniff/sniffer.go b/probe/sniff/sniffer.go index a586d1a38..134911241 100644 --- a/probe/sniff/sniffer.go +++ b/probe/sniff/sniffer.go @@ -253,8 +253,8 @@ func (s *Sniffer) Merge(p Packet, rpt *report.Report) { } addAdjacency := func(t report.Topology, srcNodeID, dstNodeID string) report.Topology { - result := t.WithNode(srcNodeID, report.MakeNode().WithAdjacent(dstNodeID)) - result = result.WithNode(dstNodeID, report.MakeNode()) + result := t.AddNode(srcNodeID, report.MakeNode().WithAdjacent(dstNodeID)) + result = result.AddNode(dstNodeID, report.MakeNode()) return result } diff --git a/report/topology.go b/report/topology.go index 033ddaed7..4ac349c72 100644 --- a/report/topology.go +++ b/report/topology.go @@ -20,16 +20,17 @@ func MakeTopology() Topology { } } -// WithNode produces a topology from t, with nmd added under key nodeID; if a -// node already exists for this key, nmd is merged with that node. Note that a -// fresh topology is returned. -func (t Topology) WithNode(nodeID string, nmd Node) Topology { +// AddNode adds node to the topology under key nodeID; if a +// node already exists for this key, nmd is merged with that node. +// The same topology is returned to enable chaining. +// This method is different from all the other similar methods +// in that it mutates the Topology, to solve issues of GC pressure. +func (t Topology) AddNode(nodeID string, nmd Node) Topology { if existing, ok := t.Nodes[nodeID]; ok { nmd = nmd.Merge(existing) } - result := t.Copy() - result.Nodes[nodeID] = nmd - return result + t.Nodes[nodeID] = nmd + return t } // Copy returns a value copy of the Topology. From 2f760f2f3337b9ee2c47646b35d39e9786639f63 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 8 Sep 2015 17:51:11 +0000 Subject: [PATCH 3/9] Cache generated ids to relieve pressure on the GC --- report/id.go | 36 +++++++++++++++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/report/id.go b/report/id.go index 5f676850c..19501ce04 100644 --- a/report/id.go +++ b/report/id.go @@ -1,8 +1,13 @@ package report import ( + "hash" + "hash/fnv" "net" "strings" + "sync" + + "github.com/bluele/gcache" ) // TheInternet is used as a node ID to indicate a remote IP. @@ -21,9 +26,38 @@ const ( EdgeDelim = "|" ) +var ( + idCache = gcache.New(1024).LRU().Build() + hashers = sync.Pool{ + New: func() interface{} { + return fnv.New64a() + }, + } +) + +func lookupID(part1, part2, part3 string, f func() string) string { + h := hashers.Get().(hash.Hash64) + h.Write([]byte(part1)) + h.Write([]byte(part2)) + h.Write([]byte(part3)) + sum := h.Sum64() + var result string + if id, err := idCache.Get(sum); id != nil && err != nil { + result = id.(string) + } else { + result = f() + idCache.Set(sum, result) + } + h.Reset() + hashers.Put(h) + return result +} + // MakeEndpointNodeID produces an endpoint node ID from its composite parts. func MakeEndpointNodeID(hostID, address, port string) string { - return MakeAddressNodeID(hostID, address) + ScopeDelim + port + return lookupID(hostID, address, port, func() string { + return MakeAddressNodeID(hostID, address) + ScopeDelim + port + }) } // MakeAddressNodeID produces an address node ID from its composite parts. From 5bd324db3f18e2e6c25306fa056d1b11806487ab Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 8 Sep 2015 18:08:46 +0000 Subject: [PATCH 4/9] Generate reports in parallel (NB this doesn't actually seem to be worth it.) --- probe/main.go | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/probe/main.go b/probe/main.go index aedf73404..385f82f07 100644 --- a/probe/main.go +++ b/probe/main.go @@ -190,13 +190,8 @@ func main() { if err := processCache.Update(); err != nil { log.Printf("error reading processes: %v", err) } - for _, reporter := range reporters { - newReport, err := reporter.Report() - if err != nil { - log.Printf("error generating report: %v", err) - } - r = r.Merge(newReport) - } + + r = r.Merge(doReport(reporters)) r = Apply(r, taggers) if took := time.Since(start); took > *spyInterval { @@ -211,6 +206,26 @@ func main() { log.Printf("%s", <-interrupt()) } +func doReport(reporters []Reporter) report.Report { + reports := make(chan report.Report, len(reporters)) + for _, rep := range reporters { + go func(rep Reporter) { + newReport, err := rep.Report() + if err != nil { + log.Printf("error generating report: %v", err) + newReport = report.MakeReport() // empty is OK to merge + } + reports <- newReport + }(rep) + } + + result := report.MakeReport() + for i := 0; i < cap(reports); i++ { + result = result.Merge(<-reports) + } + return result +} + func interrupt() chan os.Signal { c := make(chan os.Signal) signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) From b7c22b7a8f8011401989ccfef6f25c175ee6546b Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 8 Sep 2015 18:16:03 +0000 Subject: [PATCH 5/9] Only fetch weave status report once per tick. --- probe/main.go | 9 +++++++-- probe/overlay/weave.go | 33 +++++++++++++++------------------ probe/overlay/weave_test.go | 2 ++ probe/process/walker.go | 4 ++-- probe/process/walker_test.go | 4 ++-- probe/tag_report.go | 7 +++++++ 6 files changed, 35 insertions(+), 24 deletions(-) diff --git a/probe/main.go b/probe/main.go index 385f82f07..161276719 100644 --- a/probe/main.go +++ b/probe/main.go @@ -113,6 +113,7 @@ func main() { var ( endpointReporter = endpoint.NewReporter(hostID, hostName, *spyProcs, *useConntrack) processCache = process.NewCachingWalker(process.NewWalker(*procRoot)) + tickers = []Ticker{processCache} reporters = []Reporter{ endpointReporter, host.NewReporter(hostID, hostName, localNets), @@ -142,6 +143,7 @@ func main() { if err != nil { log.Fatalf("failed to start Weave tagger: %v", err) } + tickers = append(tickers, weave) taggers = append(taggers, weave) reporters = append(reporters, weave) } @@ -187,8 +189,11 @@ func main() { case <-spyTick: start := time.Now() - if err := processCache.Update(); err != nil { - log.Printf("error reading processes: %v", err) + + for _, ticker := range tickers { + if err := ticker.Tick(); err != nil { + log.Printf("error doing ticker: %v", err) + } } r = r.Merge(doReport(reporters)) diff --git a/probe/overlay/weave.go b/probe/overlay/weave.go index b9699cdd3..42d3f55e5 100644 --- a/probe/overlay/weave.go +++ b/probe/overlay/weave.go @@ -43,6 +43,7 @@ var ipMatch = regexp.MustCompile(`([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3 type Weave struct { url string hostID string + status weaveStatus } type weaveStatus struct { @@ -75,24 +76,30 @@ func NewWeave(hostID, weaveRouterAddress string) (*Weave, error) { }, nil } -func (w Weave) update() (weaveStatus, error) { +// Tick implements Ticker +func (w *Weave) Tick() error { var result weaveStatus req, err := http.NewRequest("GET", w.url, nil) if err != nil { - return result, err + return err } req.Header.Add("Accept", "application/json") resp, err := http.DefaultClient.Do(req) if err != nil { - return result, err + return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - return result, fmt.Errorf("Weave Tagger: got %d", resp.StatusCode) + return fmt.Errorf("Weave Tagger: got %d", resp.StatusCode) } - return result, json.NewDecoder(resp.Body).Decode(&result) + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return err + } + + w.status = result + return nil } type psEntry struct { @@ -132,7 +139,7 @@ func (w Weave) ps() ([]psEntry, error) { return result, scanner.Err() } -func (w Weave) tagContainer(r report.Report, containerIDPrefix, macAddress string, ips []string) { +func (w *Weave) tagContainer(r report.Report, containerIDPrefix, macAddress string, ips []string) { for nodeid, nmd := range r.Container.Nodes { idPrefix := nmd.Metadata[docker.ContainerID][:12] if idPrefix != containerIDPrefix { @@ -150,12 +157,7 @@ func (w Weave) tagContainer(r report.Report, containerIDPrefix, macAddress strin // Tag implements Tagger. func (w Weave) Tag(r report.Report) (report.Report, error) { - status, err := w.update() - if err != nil { - return r, nil - } - - for _, entry := range status.DNS.Entries { + for _, entry := range w.status.DNS.Entries { if entry.Tombstone > 0 { continue } @@ -183,12 +185,7 @@ func (w Weave) Tag(r report.Report) (report.Report, error) { // Report implements Reporter. func (w Weave) Report() (report.Report, error) { r := report.MakeReport() - status, err := w.update() - if err != nil { - return r, err - } - - for _, peer := range status.Router.Peers { + for _, peer := range w.status.Router.Peers { r.Overlay.Nodes[report.MakeOverlayNodeID(peer.Name)] = report.MakeNodeWith(map[string]string{ WeavePeerName: peer.Name, WeavePeerNickName: peer.NickName, diff --git a/probe/overlay/weave_test.go b/probe/overlay/weave_test.go index f5d302a65..bb5f18f6e 100644 --- a/probe/overlay/weave_test.go +++ b/probe/overlay/weave_test.go @@ -30,6 +30,8 @@ func TestWeaveTaggerOverlayTopology(t *testing.T) { t.Fatal(err) } + w.Tick() + { have, err := w.Report() if err != nil { diff --git a/probe/process/walker.go b/probe/process/walker.go index 4a88c45cd..4ba20027e 100644 --- a/probe/process/walker.go +++ b/probe/process/walker.go @@ -39,8 +39,8 @@ func (c *CachingWalker) Walk(f func(Process)) error { return nil } -// Update updates cached copy of process list -func (c *CachingWalker) Update() error { +// Tick updates cached copy of process list +func (c *CachingWalker) Tick() error { newCache := []Process{} err := c.source.Walk(func(p Process) { newCache = append(newCache, p) diff --git a/probe/process/walker_test.go b/probe/process/walker_test.go index 37d445891..6046006bb 100644 --- a/probe/process/walker_test.go +++ b/probe/process/walker_test.go @@ -29,7 +29,7 @@ func TestCache(t *testing.T) { processes: processes, } cachingWalker := process.NewCachingWalker(walker) - err := cachingWalker.Update() + err := cachingWalker.Tick() if err != nil { t.Fatal(err) } @@ -45,7 +45,7 @@ func TestCache(t *testing.T) { t.Errorf("%v (%v)", test.Diff(processes, have), err) } - err = cachingWalker.Update() + err = cachingWalker.Tick() if err != nil { t.Fatal(err) } diff --git a/probe/tag_report.go b/probe/tag_report.go index 15024d77a..5a3bfb4e5 100644 --- a/probe/tag_report.go +++ b/probe/tag_report.go @@ -16,6 +16,13 @@ type Reporter interface { Report() (report.Report, error) } +// Ticker is something which will be invoked every spyDuration. +// It's useful for things that should be updated on that interval. +// For example, cached shared state between Taggers and Reporters. +type Ticker interface { + Tick() error +} + // Apply tags the report with all the taggers. func Apply(r report.Report, taggers []Tagger) report.Report { var err error From d5570f27a78e38384d0b6ca0c9afb0fdb9680d12 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 9 Sep 2015 12:27:00 +0000 Subject: [PATCH 6/9] Only conntrack tcp connections to reduce cpu load from xml marshalling. --- probe/endpoint/conntrack.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/probe/endpoint/conntrack.go b/probe/endpoint/conntrack.go index deefa4575..e853855bc 100644 --- a/probe/endpoint/conntrack.go +++ b/probe/endpoint/conntrack.go @@ -105,7 +105,7 @@ var ConntrackModulePresent = func() bool { // NB this is not re-entrant! func (c *Conntracker) run(args ...string) { - args = append([]string{"-E", "-o", "xml"}, args...) + args = append([]string{"-E", "-o", "xml", "-p", "tcp"}, args...) cmd := exec.Command("conntrack", args...) stdout, err := cmd.StdoutPipe() if err != nil { From bb20a81338887b21510f1512becb33f9aca6ab19 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 9 Sep 2015 13:45:06 +0000 Subject: [PATCH 7/9] Only serialise and compress reports once per publish. --- experimental/demoprobe/main.go | 3 +- experimental/fixprobe/main.go | 3 +- probe/main.go | 3 +- xfer/publisher.go | 36 +++++++--------- xfer/publisher_test.go | 11 ++--- xfer/report_publisher.go | 79 ++++++++++++++++++++++++++++++++++ 6 files changed, 106 insertions(+), 29 deletions(-) create mode 100644 xfer/report_publisher.go diff --git a/experimental/demoprobe/main.go b/experimental/demoprobe/main.go index b49370a22..55cb91fb6 100644 --- a/experimental/demoprobe/main.go +++ b/experimental/demoprobe/main.go @@ -27,10 +27,11 @@ func main() { if err != nil { log.Fatal(err) } + rp := xfer.NewReportPublisher(publisher) rand.Seed(time.Now().UnixNano()) for range time.Tick(*publishInterval) { - if err := publisher.Publish(demoReport(*hostCount)); err != nil { + if err := rp.Publish(demoReport(*hostCount)); err != nil { log.Print(err) } } diff --git a/experimental/fixprobe/main.go b/experimental/fixprobe/main.go index f0d0a1812..1bd1294b6 100644 --- a/experimental/fixprobe/main.go +++ b/experimental/fixprobe/main.go @@ -39,7 +39,8 @@ func main() { log.Fatal(err) } + rp := xfer.NewReportPublisher(publisher) for range time.Tick(*publishInterval) { - publisher.Publish(fixedReport) + rp.Publish(fixedReport) } } diff --git a/probe/main.go b/probe/main.go index 161276719..eb4495ff9 100644 --- a/probe/main.go +++ b/probe/main.go @@ -175,6 +175,7 @@ func main() { pubTick = time.Tick(*publishInterval) spyTick = time.Tick(*spyInterval) r = report.MakeReport() + p = xfer.NewReportPublisher(publishers) ) for { @@ -182,7 +183,7 @@ func main() { case <-pubTick: publishTicks.WithLabelValues().Add(1) r.Window = *publishInterval - if err := publishers.Publish(r); err != nil { + if err := p.Publish(r); err != nil { log.Printf("publish: %v", err) } r = report.MakeReport() diff --git a/xfer/publisher.go b/xfer/publisher.go index 1eb1fcdd2..e71621ca3 100644 --- a/xfer/publisher.go +++ b/xfer/publisher.go @@ -1,9 +1,6 @@ package xfer import ( - "bytes" - "compress/gzip" - "encoding/gob" "fmt" "log" "net/http" @@ -11,8 +8,6 @@ import ( "strings" "sync" "time" - - "github.com/weaveworks/scope/report" ) const ( @@ -22,7 +17,7 @@ const ( // Publisher is something which can send a report to a remote collector. type Publisher interface { - Publish(report.Report) error + Publish(*Buffer) error Stop() } @@ -63,16 +58,10 @@ func (p HTTPPublisher) String() string { } // Publish publishes the report to the URL. -func (p HTTPPublisher) Publish(rpt report.Report) error { - gzbuf := bytes.Buffer{} - gzwriter := gzip.NewWriter(&gzbuf) +func (p HTTPPublisher) Publish(buf *Buffer) error { + defer buf.Put() - if err := gob.NewEncoder(gzwriter).Encode(rpt); err != nil { - return err - } - gzwriter.Close() // otherwise the content won't get flushed to the output stream - - req, err := http.NewRequest("POST", p.url, &gzbuf) + req, err := http.NewRequest("POST", p.url, buf) if err != nil { return err } @@ -108,7 +97,7 @@ func AuthorizationHeader(token string) string { // concurrent publishes are dropped. type BackgroundPublisher struct { publisher Publisher - reports chan report.Report + reports chan *Buffer quit chan struct{} } @@ -116,7 +105,7 @@ type BackgroundPublisher struct { func NewBackgroundPublisher(p Publisher) *BackgroundPublisher { result := &BackgroundPublisher{ publisher: p, - reports: make(chan report.Report), + reports: make(chan *Buffer), quit: make(chan struct{}), } go result.loop() @@ -146,9 +135,9 @@ func (b *BackgroundPublisher) loop() { } // Publish implements Publisher -func (b *BackgroundPublisher) Publish(r report.Report) error { +func (b *BackgroundPublisher) Publish(buf *Buffer) error { select { - case b.reports <- r: + case b.reports <- buf: default: } return nil @@ -198,13 +187,18 @@ func (p *MultiPublisher) Add(target string) { } // Publish implements Publisher by emitting the report to all publishers. -func (p *MultiPublisher) Publish(rpt report.Report) error { +func (p *MultiPublisher) Publish(buf *Buffer) error { p.mtx.RLock() defer p.mtx.RUnlock() + // First take a reference for each publisher + for range p.m { + buf.Get() + } + var errs []string for _, publisher := range p.m { - if err := publisher.Publish(rpt); err != nil { + if err := publisher.Publish(buf); err != nil { errs = append(errs, err.Error()) } } diff --git a/xfer/publisher_test.go b/xfer/publisher_test.go index ecf0148b0..195d0b0bf 100644 --- a/xfer/publisher_test.go +++ b/xfer/publisher_test.go @@ -64,7 +64,8 @@ func TestHTTPPublisher(t *testing.T) { if err != nil { t.Fatal(err) } - if err := p.Publish(rpt); err != nil { + rp := xfer.NewReportPublisher(p) + if err := rp.Publish(rpt); err != nil { t.Error(err) } @@ -83,7 +84,7 @@ func TestMultiPublisher(t *testing.T) { ) multiPublisher.Add("first") - if err := multiPublisher.Publish(report.MakeReport()); err != nil { + if err := multiPublisher.Publish(xfer.NewBuffer(nil)); err != nil { t.Error(err) } if want, have := 1, p.count; want != have { @@ -91,7 +92,7 @@ func TestMultiPublisher(t *testing.T) { } multiPublisher.Add("second") // but factory returns same mockPublisher - if err := multiPublisher.Publish(report.MakeReport()); err != nil { + if err := multiPublisher.Publish(xfer.NewBuffer(nil)); err != nil { t.Error(err) } if want, have := 3, p.count; want != have { @@ -101,5 +102,5 @@ func TestMultiPublisher(t *testing.T) { type mockPublisher struct{ count int } -func (p *mockPublisher) Publish(report.Report) error { p.count++; return nil } -func (p *mockPublisher) Stop() {} +func (p *mockPublisher) Publish(*xfer.Buffer) error { p.count++; return nil } +func (p *mockPublisher) Stop() {} diff --git a/xfer/report_publisher.go b/xfer/report_publisher.go new file mode 100644 index 000000000..e2af70af5 --- /dev/null +++ b/xfer/report_publisher.go @@ -0,0 +1,79 @@ +package xfer + +import ( + "bytes" + "compress/gzip" + "encoding/gob" + "sync" + "sync/atomic" + + "github.com/weaveworks/scope/report" +) + +// A Buffer is a reference counted bytes.Buffer, which belongs +// to a sync.Pool +type Buffer struct { + bytes.Buffer + pool *sync.Pool + refs int32 +} + +// NewBuffer creates a new buffer +func NewBuffer(pool *sync.Pool) *Buffer { + return &Buffer{ + pool: pool, + refs: 0, + } +} + +// Get increases the reference count. It is safe for concurrent calls. +func (b *Buffer) Get() { + atomic.AddInt32(&b.refs, 1) +} + +// Put decreases the reference count, and when it hits zero, puts the +// buffer back in the pool. +func (b *Buffer) Put() { + if atomic.AddInt32(&b.refs, -1) == 0 { + b.Reset() + b.pool.Put(b) + } +} + +// NewBufferPool creates a new buffer pool. +func NewBufferPool() *sync.Pool { + result := &sync.Pool{} + result.New = func() interface{} { + return NewBuffer(result) + } + return result +} + +// A ReportPublisher uses a buffer pool to serialise reports, which it +// then passes to a publisher +type ReportPublisher struct { + buffers *sync.Pool + publisher Publisher +} + +// NewReportPublisher creates a new report publisher +func NewReportPublisher(publisher Publisher) *ReportPublisher { + return &ReportPublisher{ + buffers: NewBufferPool(), + publisher: publisher, + } +} + +// Publish serialises and compresses a report, then passes it to a publisher +func (p *ReportPublisher) Publish(r report.Report) error { + buf := p.buffers.Get().(*Buffer) + gzwriter := gzip.NewWriter(buf) + if err := gob.NewEncoder(gzwriter).Encode(r); err != nil { + buf.Reset() + p.buffers.Put(buf) + return err + } + gzwriter.Close() // otherwise the content won't get flushed to the output stream + + return p.publisher.Publish(buf) +} From a8c163b5d7523ed1d585f70041d94cd81459d60b Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 9 Sep 2015 13:58:41 +0000 Subject: [PATCH 8/9] Remove O(n^2) behaviour in weave tagger. --- probe/overlay/weave.go | 34 ++++++++++++++++------------------ 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/probe/overlay/weave.go b/probe/overlay/weave.go index 42d3f55e5..878e84d11 100644 --- a/probe/overlay/weave.go +++ b/probe/overlay/weave.go @@ -139,24 +139,9 @@ func (w Weave) ps() ([]psEntry, error) { return result, scanner.Err() } -func (w *Weave) tagContainer(r report.Report, containerIDPrefix, macAddress string, ips []string) { - for nodeid, nmd := range r.Container.Nodes { - idPrefix := nmd.Metadata[docker.ContainerID][:12] - if idPrefix != containerIDPrefix { - continue - } - - existingIPs := report.MakeIDList(docker.ExtractContainerIPs(nmd)...) - existingIPs = existingIPs.Add(ips...) - nmd.Metadata[docker.ContainerIPs] = strings.Join(existingIPs, " ") - nmd.Metadata[WeaveMACAddress] = macAddress - r.Container.Nodes[nodeid] = nmd - break - } -} - // Tag implements Tagger. func (w Weave) Tag(r report.Report) (report.Report, error) { + // Put information from weaveDNS on the container nodes for _, entry := range w.status.DNS.Entries { if entry.Tombstone > 0 { continue @@ -169,15 +154,28 @@ func (w Weave) Tag(r report.Report) (report.Report, error) { hostnames := report.IDList(strings.Fields(node.Metadata[WeaveDNSHostname])) hostnames = hostnames.Add(strings.TrimSuffix(entry.Hostname, ".")) node.Metadata[WeaveDNSHostname] = strings.Join(hostnames, " ") - r.Container.Nodes[nodeID] = node } + // Put information from weave ps on the container nodes psEntries, err := w.ps() if err != nil { return r, nil } + containersByPrefix := map[string]report.Node{} + for _, node := range r.Container.Nodes { + prefix := node.Metadata[docker.ContainerID][:12] + containersByPrefix[prefix] = node + } for _, e := range psEntries { - w.tagContainer(r, e.containerIDPrefix, e.macAddress, e.ips) + node, ok := containersByPrefix[e.containerIDPrefix] + if !ok { + continue + } + + existingIPs := report.MakeIDList(docker.ExtractContainerIPs(node)...) + existingIPs = existingIPs.Add(e.ips...) + node.Metadata[docker.ContainerIPs] = strings.Join(existingIPs, " ") + node.Metadata[WeaveMACAddress] = e.macAddress } return r, nil } From 65b78206eeb400f95732bd15de7399324ff1ea6e Mon Sep 17 00:00:00 2001 From: Peter Bourgon Date: Fri, 11 Sep 2015 09:26:58 +0200 Subject: [PATCH 9/9] xfer: move Buffer to own file; update comment overlay: mutex for Weave status --- probe/overlay/weave.go | 19 +++++++++++++---- xfer/buffer.go | 46 ++++++++++++++++++++++++++++++++++++++++ xfer/publisher.go | 3 ++- xfer/report_publisher.go | 41 ----------------------------------- 4 files changed, 63 insertions(+), 46 deletions(-) create mode 100644 xfer/buffer.go diff --git a/probe/overlay/weave.go b/probe/overlay/weave.go index 878e84d11..f1d60d157 100644 --- a/probe/overlay/weave.go +++ b/probe/overlay/weave.go @@ -10,6 +10,7 @@ import ( "net/url" "regexp" "strings" + "sync" "github.com/weaveworks/scope/common/exec" "github.com/weaveworks/scope/probe/docker" @@ -43,6 +44,8 @@ var ipMatch = regexp.MustCompile(`([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3 type Weave struct { url string hostID string + + mtx sync.RWMutex status weaveStatus } @@ -78,7 +81,6 @@ func NewWeave(hostID, weaveRouterAddress string) (*Weave, error) { // Tick implements Ticker func (w *Weave) Tick() error { - var result weaveStatus req, err := http.NewRequest("GET", w.url, nil) if err != nil { return err @@ -94,10 +96,13 @@ func (w *Weave) Tick() error { return fmt.Errorf("Weave Tagger: got %d", resp.StatusCode) } + var result weaveStatus if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { return err } + w.mtx.Lock() + defer w.mtx.Unlock() w.status = result return nil } @@ -108,7 +113,7 @@ type psEntry struct { ips []string } -func (w Weave) ps() ([]psEntry, error) { +func (w *Weave) ps() ([]psEntry, error) { var result []psEntry cmd := exec.Command("weave", "--local", "ps") out, err := cmd.StdoutPipe() @@ -140,7 +145,10 @@ func (w Weave) ps() ([]psEntry, error) { } // Tag implements Tagger. -func (w Weave) Tag(r report.Report) (report.Report, error) { +func (w *Weave) Tag(r report.Report) (report.Report, error) { + w.mtx.RLock() + defer w.mtx.RUnlock() + // Put information from weaveDNS on the container nodes for _, entry := range w.status.DNS.Entries { if entry.Tombstone > 0 { @@ -181,7 +189,10 @@ func (w Weave) Tag(r report.Report) (report.Report, error) { } // Report implements Reporter. -func (w Weave) Report() (report.Report, error) { +func (w *Weave) Report() (report.Report, error) { + w.mtx.RLock() + defer w.mtx.RUnlock() + r := report.MakeReport() for _, peer := range w.status.Router.Peers { r.Overlay.Nodes[report.MakeOverlayNodeID(peer.Name)] = report.MakeNodeWith(map[string]string{ diff --git a/xfer/buffer.go b/xfer/buffer.go new file mode 100644 index 000000000..452411e52 --- /dev/null +++ b/xfer/buffer.go @@ -0,0 +1,46 @@ +package xfer + +import ( + "bytes" + "sync" + "sync/atomic" +) + +// A Buffer is a reference counted bytes.Buffer, which belongs +// to a sync.Pool +type Buffer struct { + bytes.Buffer + pool *sync.Pool + refs int32 +} + +// NewBuffer creates a new buffer +func NewBuffer(pool *sync.Pool) *Buffer { + return &Buffer{ + pool: pool, + refs: 0, + } +} + +// Get increases the reference count. It is safe for concurrent calls. +func (b *Buffer) Get() { + atomic.AddInt32(&b.refs, 1) +} + +// Put decreases the reference count, and when it hits zero, puts the +// buffer back in the pool. +func (b *Buffer) Put() { + if atomic.AddInt32(&b.refs, -1) == 0 { + b.Reset() + b.pool.Put(b) + } +} + +// NewBufferPool creates a new buffer pool. +func NewBufferPool() *sync.Pool { + result := &sync.Pool{} + result.New = func() interface{} { + return NewBuffer(result) + } + return result +} diff --git a/xfer/publisher.go b/xfer/publisher.go index e71621ca3..d1612e78e 100644 --- a/xfer/publisher.go +++ b/xfer/publisher.go @@ -15,7 +15,8 @@ const ( maxBackoff = 60 * time.Second ) -// Publisher is something which can send a report to a remote collector. +// Publisher is something which can send a buffered set of data somewhere, +// probably to a collector. type Publisher interface { Publish(*Buffer) error Stop() diff --git a/xfer/report_publisher.go b/xfer/report_publisher.go index e2af70af5..cdf5a0101 100644 --- a/xfer/report_publisher.go +++ b/xfer/report_publisher.go @@ -1,54 +1,13 @@ package xfer import ( - "bytes" "compress/gzip" "encoding/gob" "sync" - "sync/atomic" "github.com/weaveworks/scope/report" ) -// A Buffer is a reference counted bytes.Buffer, which belongs -// to a sync.Pool -type Buffer struct { - bytes.Buffer - pool *sync.Pool - refs int32 -} - -// NewBuffer creates a new buffer -func NewBuffer(pool *sync.Pool) *Buffer { - return &Buffer{ - pool: pool, - refs: 0, - } -} - -// Get increases the reference count. It is safe for concurrent calls. -func (b *Buffer) Get() { - atomic.AddInt32(&b.refs, 1) -} - -// Put decreases the reference count, and when it hits zero, puts the -// buffer back in the pool. -func (b *Buffer) Put() { - if atomic.AddInt32(&b.refs, -1) == 0 { - b.Reset() - b.pool.Put(b) - } -} - -// NewBufferPool creates a new buffer pool. -func NewBufferPool() *sync.Pool { - result := &sync.Pool{} - result.New = func() interface{} { - return NewBuffer(result) - } - return result -} - // A ReportPublisher uses a buffer pool to serialise reports, which it // then passes to a publisher type ReportPublisher struct {