Merge pull request #492 from weaveworks/buffers-simple

Even simpler buffer management for report publishing.
This commit is contained in:
Tom Wilkie
2015-09-16 17:18:42 +08:00
4 changed files with 15 additions and 70 deletions

View File

@@ -1,46 +0,0 @@
package xfer
import (
"bytes"
"sync"
"sync/atomic"
)
// A Buffer is a reference counted bytes.Buffer, which belongs
// to a sync.Pool
type Buffer struct {
bytes.Buffer
pool *sync.Pool
refs int32
}
// NewBuffer creates a new buffer
func NewBuffer(pool *sync.Pool) *Buffer {
return &Buffer{
pool: pool,
refs: 0,
}
}
// Get increases the reference count. It is safe for concurrent calls.
func (b *Buffer) Get() {
atomic.AddInt32(&b.refs, 1)
}
// Put decreases the reference count, and when it hits zero, puts the
// buffer back in the pool.
func (b *Buffer) Put() {
if atomic.AddInt32(&b.refs, -1) == 0 {
b.Reset()
b.pool.Put(b)
}
}
// NewBufferPool creates a new buffer pool.
func NewBufferPool() *sync.Pool {
result := &sync.Pool{}
result.New = func() interface{} {
return NewBuffer(result)
}
return result
}

View File

@@ -1,6 +1,7 @@
package xfer
import (
"bytes"
"fmt"
"log"
"net/http"
@@ -18,7 +19,7 @@ const (
// Publisher is something which can send a buffered set of data somewhere,
// probably to a collector.
type Publisher interface {
Publish(*Buffer) error
Publish(*bytes.Buffer) error
Stop()
}
@@ -59,9 +60,7 @@ func (p HTTPPublisher) String() string {
}
// Publish publishes the report to the URL.
func (p HTTPPublisher) Publish(buf *Buffer) error {
defer buf.Put()
func (p HTTPPublisher) Publish(buf *bytes.Buffer) error {
req, err := http.NewRequest("POST", p.url, buf)
if err != nil {
return err
@@ -98,7 +97,7 @@ func AuthorizationHeader(token string) string {
// concurrent publishes are dropped.
type BackgroundPublisher struct {
publisher Publisher
reports chan *Buffer
reports chan *bytes.Buffer
quit chan struct{}
}
@@ -106,7 +105,7 @@ type BackgroundPublisher struct {
func NewBackgroundPublisher(p Publisher) *BackgroundPublisher {
result := &BackgroundPublisher{
publisher: p,
reports: make(chan *Buffer),
reports: make(chan *bytes.Buffer),
quit: make(chan struct{}),
}
go result.loop()
@@ -136,7 +135,7 @@ func (b *BackgroundPublisher) loop() {
}
// Publish implements Publisher
func (b *BackgroundPublisher) Publish(buf *Buffer) error {
func (b *BackgroundPublisher) Publish(buf *bytes.Buffer) error {
select {
case b.reports <- buf:
default:
@@ -188,18 +187,13 @@ func (p *MultiPublisher) Add(target string) {
}
// Publish implements Publisher by emitting the report to all publishers.
func (p *MultiPublisher) Publish(buf *Buffer) error {
func (p *MultiPublisher) Publish(buf *bytes.Buffer) error {
p.mtx.RLock()
defer p.mtx.RUnlock()
// First take a reference for each publisher
for range p.m {
buf.Get()
}
var errs []string
for _, publisher := range p.m {
if err := publisher.Publish(buf); err != nil {
if err := publisher.Publish(bytes.NewBuffer(buf.Bytes())); err != nil {
errs = append(errs, err.Error())
}
}

View File

@@ -1,6 +1,7 @@
package xfer_test
import (
"bytes"
"compress/gzip"
"encoding/gob"
"net/http"
@@ -84,7 +85,7 @@ func TestMultiPublisher(t *testing.T) {
)
multiPublisher.Add("first")
if err := multiPublisher.Publish(xfer.NewBuffer(nil)); err != nil {
if err := multiPublisher.Publish(&bytes.Buffer{}); err != nil {
t.Error(err)
}
if want, have := 1, p.count; want != have {
@@ -92,7 +93,7 @@ func TestMultiPublisher(t *testing.T) {
}
multiPublisher.Add("second") // but factory returns same mockPublisher
if err := multiPublisher.Publish(xfer.NewBuffer(nil)); err != nil {
if err := multiPublisher.Publish(&bytes.Buffer{}); err != nil {
t.Error(err)
}
if want, have := 3, p.count; want != have {
@@ -102,5 +103,5 @@ func TestMultiPublisher(t *testing.T) {
type mockPublisher struct{ count int }
func (p *mockPublisher) Publish(*xfer.Buffer) error { p.count++; return nil }
func (p *mockPublisher) Stop() {}
func (p *mockPublisher) Publish(*bytes.Buffer) error { p.count++; return nil }
func (p *mockPublisher) Stop() {}

View File

@@ -1,9 +1,9 @@
package xfer
import (
"bytes"
"compress/gzip"
"encoding/gob"
"sync"
"github.com/weaveworks/scope/report"
)
@@ -11,25 +11,21 @@ import (
// A ReportPublisher uses a buffer pool to serialise reports, which it
// then passes to a publisher
type ReportPublisher struct {
buffers *sync.Pool
publisher Publisher
}
// NewReportPublisher creates a new report publisher
func NewReportPublisher(publisher Publisher) *ReportPublisher {
return &ReportPublisher{
buffers: NewBufferPool(),
publisher: publisher,
}
}
// Publish serialises and compresses a report, then passes it to a publisher
func (p *ReportPublisher) Publish(r report.Report) error {
buf := p.buffers.Get().(*Buffer)
buf := &bytes.Buffer{}
gzwriter := gzip.NewWriter(buf)
if err := gob.NewEncoder(gzwriter).Encode(r); err != nil {
buf.Reset()
p.buffers.Put(buf)
return err
}
gzwriter.Close() // otherwise the content won't get flushed to the output stream