From 65b78206eeb400f95732bd15de7399324ff1ea6e Mon Sep 17 00:00:00 2001 From: Peter Bourgon Date: Fri, 11 Sep 2015 09:26:58 +0200 Subject: [PATCH] xfer: move Buffer to own file; update comment overlay: mutex for Weave status --- probe/overlay/weave.go | 19 +++++++++++++---- xfer/buffer.go | 46 ++++++++++++++++++++++++++++++++++++++++ xfer/publisher.go | 3 ++- xfer/report_publisher.go | 41 ----------------------------------- 4 files changed, 63 insertions(+), 46 deletions(-) create mode 100644 xfer/buffer.go diff --git a/probe/overlay/weave.go b/probe/overlay/weave.go index 878e84d11..f1d60d157 100644 --- a/probe/overlay/weave.go +++ b/probe/overlay/weave.go @@ -10,6 +10,7 @@ import ( "net/url" "regexp" "strings" + "sync" "github.com/weaveworks/scope/common/exec" "github.com/weaveworks/scope/probe/docker" @@ -43,6 +44,8 @@ var ipMatch = regexp.MustCompile(`([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3 type Weave struct { url string hostID string + + mtx sync.RWMutex status weaveStatus } @@ -78,7 +81,6 @@ func NewWeave(hostID, weaveRouterAddress string) (*Weave, error) { // Tick implements Ticker func (w *Weave) Tick() error { - var result weaveStatus req, err := http.NewRequest("GET", w.url, nil) if err != nil { return err @@ -94,10 +96,13 @@ func (w *Weave) Tick() error { return fmt.Errorf("Weave Tagger: got %d", resp.StatusCode) } + var result weaveStatus if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { return err } + w.mtx.Lock() + defer w.mtx.Unlock() w.status = result return nil } @@ -108,7 +113,7 @@ type psEntry struct { ips []string } -func (w Weave) ps() ([]psEntry, error) { +func (w *Weave) ps() ([]psEntry, error) { var result []psEntry cmd := exec.Command("weave", "--local", "ps") out, err := cmd.StdoutPipe() @@ -140,7 +145,10 @@ func (w Weave) ps() ([]psEntry, error) { } // Tag implements Tagger. -func (w Weave) Tag(r report.Report) (report.Report, error) { +func (w *Weave) Tag(r report.Report) (report.Report, error) { + w.mtx.RLock() + defer w.mtx.RUnlock() + // Put information from weaveDNS on the container nodes for _, entry := range w.status.DNS.Entries { if entry.Tombstone > 0 { @@ -181,7 +189,10 @@ func (w Weave) Tag(r report.Report) (report.Report, error) { } // Report implements Reporter. -func (w Weave) Report() (report.Report, error) { +func (w *Weave) Report() (report.Report, error) { + w.mtx.RLock() + defer w.mtx.RUnlock() + r := report.MakeReport() for _, peer := range w.status.Router.Peers { r.Overlay.Nodes[report.MakeOverlayNodeID(peer.Name)] = report.MakeNodeWith(map[string]string{ diff --git a/xfer/buffer.go b/xfer/buffer.go new file mode 100644 index 000000000..452411e52 --- /dev/null +++ b/xfer/buffer.go @@ -0,0 +1,46 @@ +package xfer + +import ( + "bytes" + "sync" + "sync/atomic" +) + +// A Buffer is a reference counted bytes.Buffer, which belongs +// to a sync.Pool +type Buffer struct { + bytes.Buffer + pool *sync.Pool + refs int32 +} + +// NewBuffer creates a new buffer +func NewBuffer(pool *sync.Pool) *Buffer { + return &Buffer{ + pool: pool, + refs: 0, + } +} + +// Get increases the reference count. It is safe for concurrent calls. +func (b *Buffer) Get() { + atomic.AddInt32(&b.refs, 1) +} + +// Put decreases the reference count, and when it hits zero, puts the +// buffer back in the pool. +func (b *Buffer) Put() { + if atomic.AddInt32(&b.refs, -1) == 0 { + b.Reset() + b.pool.Put(b) + } +} + +// NewBufferPool creates a new buffer pool. +func NewBufferPool() *sync.Pool { + result := &sync.Pool{} + result.New = func() interface{} { + return NewBuffer(result) + } + return result +} diff --git a/xfer/publisher.go b/xfer/publisher.go index e71621ca3..d1612e78e 100644 --- a/xfer/publisher.go +++ b/xfer/publisher.go @@ -15,7 +15,8 @@ const ( maxBackoff = 60 * time.Second ) -// Publisher is something which can send a report to a remote collector. +// Publisher is something which can send a buffered set of data somewhere, +// probably to a collector. type Publisher interface { Publish(*Buffer) error Stop() diff --git a/xfer/report_publisher.go b/xfer/report_publisher.go index e2af70af5..cdf5a0101 100644 --- a/xfer/report_publisher.go +++ b/xfer/report_publisher.go @@ -1,54 +1,13 @@ package xfer import ( - "bytes" "compress/gzip" "encoding/gob" "sync" - "sync/atomic" "github.com/weaveworks/scope/report" ) -// A Buffer is a reference counted bytes.Buffer, which belongs -// to a sync.Pool -type Buffer struct { - bytes.Buffer - pool *sync.Pool - refs int32 -} - -// NewBuffer creates a new buffer -func NewBuffer(pool *sync.Pool) *Buffer { - return &Buffer{ - pool: pool, - refs: 0, - } -} - -// Get increases the reference count. It is safe for concurrent calls. -func (b *Buffer) Get() { - atomic.AddInt32(&b.refs, 1) -} - -// Put decreases the reference count, and when it hits zero, puts the -// buffer back in the pool. -func (b *Buffer) Put() { - if atomic.AddInt32(&b.refs, -1) == 0 { - b.Reset() - b.pool.Put(b) - } -} - -// NewBufferPool creates a new buffer pool. -func NewBufferPool() *sync.Pool { - result := &sync.Pool{} - result.New = func() interface{} { - return NewBuffer(result) - } - return result -} - // A ReportPublisher uses a buffer pool to serialise reports, which it // then passes to a publisher type ReportPublisher struct {