diff --git a/xfer/buffer.go b/xfer/buffer.go deleted file mode 100644 index 452411e52..000000000 --- a/xfer/buffer.go +++ /dev/null @@ -1,46 +0,0 @@ -package xfer - -import ( - "bytes" - "sync" - "sync/atomic" -) - -// A Buffer is a reference counted bytes.Buffer, which belongs -// to a sync.Pool -type Buffer struct { - bytes.Buffer - pool *sync.Pool - refs int32 -} - -// NewBuffer creates a new buffer -func NewBuffer(pool *sync.Pool) *Buffer { - return &Buffer{ - pool: pool, - refs: 0, - } -} - -// Get increases the reference count. It is safe for concurrent calls. -func (b *Buffer) Get() { - atomic.AddInt32(&b.refs, 1) -} - -// Put decreases the reference count, and when it hits zero, puts the -// buffer back in the pool. -func (b *Buffer) Put() { - if atomic.AddInt32(&b.refs, -1) == 0 { - b.Reset() - b.pool.Put(b) - } -} - -// NewBufferPool creates a new buffer pool. -func NewBufferPool() *sync.Pool { - result := &sync.Pool{} - result.New = func() interface{} { - return NewBuffer(result) - } - return result -} diff --git a/xfer/publisher.go b/xfer/publisher.go index d1612e78e..759e48a6e 100644 --- a/xfer/publisher.go +++ b/xfer/publisher.go @@ -1,6 +1,7 @@ package xfer import ( + "bytes" "fmt" "log" "net/http" @@ -18,7 +19,7 @@ const ( // Publisher is something which can send a buffered set of data somewhere, // probably to a collector. type Publisher interface { - Publish(*Buffer) error + Publish(*bytes.Buffer) error Stop() } @@ -59,9 +60,7 @@ func (p HTTPPublisher) String() string { } // Publish publishes the report to the URL. -func (p HTTPPublisher) Publish(buf *Buffer) error { - defer buf.Put() - +func (p HTTPPublisher) Publish(buf *bytes.Buffer) error { req, err := http.NewRequest("POST", p.url, buf) if err != nil { return err @@ -98,7 +97,7 @@ func AuthorizationHeader(token string) string { // concurrent publishes are dropped. type BackgroundPublisher struct { publisher Publisher - reports chan *Buffer + reports chan *bytes.Buffer quit chan struct{} } @@ -106,7 +105,7 @@ type BackgroundPublisher struct { func NewBackgroundPublisher(p Publisher) *BackgroundPublisher { result := &BackgroundPublisher{ publisher: p, - reports: make(chan *Buffer), + reports: make(chan *bytes.Buffer), quit: make(chan struct{}), } go result.loop() @@ -136,7 +135,7 @@ func (b *BackgroundPublisher) loop() { } // Publish implements Publisher -func (b *BackgroundPublisher) Publish(buf *Buffer) error { +func (b *BackgroundPublisher) Publish(buf *bytes.Buffer) error { select { case b.reports <- buf: default: @@ -188,18 +187,13 @@ func (p *MultiPublisher) Add(target string) { } // Publish implements Publisher by emitting the report to all publishers. -func (p *MultiPublisher) Publish(buf *Buffer) error { +func (p *MultiPublisher) Publish(buf *bytes.Buffer) error { p.mtx.RLock() defer p.mtx.RUnlock() - // First take a reference for each publisher - for range p.m { - buf.Get() - } - var errs []string for _, publisher := range p.m { - if err := publisher.Publish(buf); err != nil { + if err := publisher.Publish(bytes.NewBuffer(buf.Bytes())); err != nil { errs = append(errs, err.Error()) } } diff --git a/xfer/publisher_test.go b/xfer/publisher_test.go index 195d0b0bf..0bef0861e 100644 --- a/xfer/publisher_test.go +++ b/xfer/publisher_test.go @@ -1,6 +1,7 @@ package xfer_test import ( + "bytes" "compress/gzip" "encoding/gob" "net/http" @@ -84,7 +85,7 @@ func TestMultiPublisher(t *testing.T) { ) multiPublisher.Add("first") - if err := multiPublisher.Publish(xfer.NewBuffer(nil)); err != nil { + if err := multiPublisher.Publish(&bytes.Buffer{}); err != nil { t.Error(err) } if want, have := 1, p.count; want != have { @@ -92,7 +93,7 @@ func TestMultiPublisher(t *testing.T) { } multiPublisher.Add("second") // but factory returns same mockPublisher - if err := multiPublisher.Publish(xfer.NewBuffer(nil)); err != nil { + if err := multiPublisher.Publish(&bytes.Buffer{}); err != nil { t.Error(err) } if want, have := 3, p.count; want != have { @@ -102,5 +103,5 @@ func TestMultiPublisher(t *testing.T) { type mockPublisher struct{ count int } -func (p *mockPublisher) Publish(*xfer.Buffer) error { p.count++; return nil } -func (p *mockPublisher) Stop() {} +func (p *mockPublisher) Publish(*bytes.Buffer) error { p.count++; return nil } +func (p *mockPublisher) Stop() {} diff --git a/xfer/report_publisher.go b/xfer/report_publisher.go index cdf5a0101..efde88da4 100644 --- a/xfer/report_publisher.go +++ b/xfer/report_publisher.go @@ -1,9 +1,9 @@ package xfer import ( + "bytes" "compress/gzip" "encoding/gob" - "sync" "github.com/weaveworks/scope/report" ) @@ -11,25 +11,21 @@ import ( // A ReportPublisher uses a buffer pool to serialise reports, which it // then passes to a publisher type ReportPublisher struct { - buffers *sync.Pool publisher Publisher } // NewReportPublisher creates a new report publisher func NewReportPublisher(publisher Publisher) *ReportPublisher { return &ReportPublisher{ - buffers: NewBufferPool(), publisher: publisher, } } // Publish serialises and compresses a report, then passes it to a publisher func (p *ReportPublisher) Publish(r report.Report) error { - buf := p.buffers.Get().(*Buffer) + buf := &bytes.Buffer{} gzwriter := gzip.NewWriter(buf) if err := gob.NewEncoder(gzwriter).Encode(r); err != nil { - buf.Reset() - p.buffers.Put(buf) return err } gzwriter.Close() // otherwise the content won't get flushed to the output stream