From eca45ca9d5513ff63cee68e0c68dbd748815f1cf Mon Sep 17 00:00:00 2001 From: Peter Bourgon Date: Fri, 18 Sep 2015 10:40:43 +0200 Subject: [PATCH] Need to copy the buffer in the MultiPublisher --- xfer/background_publisher.go | 34 +++++++++++++++++----------------- xfer/http_publisher.go | 2 +- xfer/multi_publisher.go | 2 +- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/xfer/background_publisher.go b/xfer/background_publisher.go index ac511830e..726659dec 100644 --- a/xfer/background_publisher.go +++ b/xfer/background_publisher.go @@ -16,37 +16,37 @@ const ( // concurrent publishes are dropped. type BackgroundPublisher struct { publisher Publisher - reports chan *bytes.Buffer + buffers chan *bytes.Buffer quit chan struct{} } // NewBackgroundPublisher creates a new BackgroundPublisher with the given publisher func NewBackgroundPublisher(p Publisher) *BackgroundPublisher { - result := &BackgroundPublisher{ + bp := &BackgroundPublisher{ publisher: p, - reports: make(chan *bytes.Buffer), + buffers: make(chan *bytes.Buffer), quit: make(chan struct{}), } - go result.loop() - return result + go bp.loop() + return bp } -func (b *BackgroundPublisher) loop() { +func (bp *BackgroundPublisher) loop() { backoff := initialBackoff - for r := range b.reports { - err := b.publisher.Publish(r) + for buf := range bp.buffers { + err := bp.publisher.Publish(buf) if err == nil { backoff = initialBackoff continue } - log.Printf("Error publishing to %s, backing off %s: %v", b.publisher, backoff, err) + log.Printf("Error publishing to %s, backing off %s: %v", bp.publisher, backoff, err) select { case <-time.After(backoff): - case <-b.quit: + case <-bp.quit: } - backoff = backoff * 2 + backoff *= 2 if backoff > maxBackoff { backoff = maxBackoff } @@ -54,17 +54,17 @@ func (b *BackgroundPublisher) loop() { } // Publish implements Publisher -func (b *BackgroundPublisher) Publish(buf *bytes.Buffer) error { +func (bp *BackgroundPublisher) Publish(buf *bytes.Buffer) error { select { - case b.reports <- buf: + case bp.buffers <- buf: default: } return nil } // Stop implements Publisher -func (b *BackgroundPublisher) Stop() { - close(b.reports) - close(b.quit) - b.publisher.Stop() +func (bp *BackgroundPublisher) Stop() { + close(bp.buffers) + close(bp.quit) + bp.publisher.Stop() } diff --git a/xfer/http_publisher.go b/xfer/http_publisher.go index 42eb3c70b..87baa922a 100644 --- a/xfer/http_publisher.go +++ b/xfer/http_publisher.go @@ -9,7 +9,7 @@ import ( "github.com/weaveworks/scope/common/sanitize" ) -// HTTPPublisher publishes reports by POST to a fixed endpoint. +// HTTPPublisher publishes buffers by POST to a fixed endpoint. type HTTPPublisher struct { url string token string diff --git a/xfer/multi_publisher.go b/xfer/multi_publisher.go index eadf1ff90..cda0bca32 100644 --- a/xfer/multi_publisher.go +++ b/xfer/multi_publisher.go @@ -74,7 +74,7 @@ func (p *MultiPublisher) Publish(buf *bytes.Buffer) error { if _, ok := ids[t.id]; ok { continue } - if err := t.publisher.Publish(buf); err != nil { + if err := t.publisher.Publish(bytes.NewBuffer(buf.Bytes())); err != nil { errs = append(errs, err.Error()) continue }