xfer: move Buffer to own file; update comment

overlay: mutex for Weave status
This commit is contained in:
Peter Bourgon
2015-09-11 09:26:58 +02:00
parent a8c163b5d7
commit 65b78206ee
4 changed files with 63 additions and 46 deletions

View File

@@ -10,6 +10,7 @@ import (
"net/url"
"regexp"
"strings"
"sync"
"github.com/weaveworks/scope/common/exec"
"github.com/weaveworks/scope/probe/docker"
@@ -43,6 +44,8 @@ var ipMatch = regexp.MustCompile(`([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3
type Weave struct {
url string
hostID string
mtx sync.RWMutex
status weaveStatus
}
@@ -78,7 +81,6 @@ func NewWeave(hostID, weaveRouterAddress string) (*Weave, error) {
// Tick implements Ticker
func (w *Weave) Tick() error {
var result weaveStatus
req, err := http.NewRequest("GET", w.url, nil)
if err != nil {
return err
@@ -94,10 +96,13 @@ func (w *Weave) Tick() error {
return fmt.Errorf("Weave Tagger: got %d", resp.StatusCode)
}
var result weaveStatus
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return err
}
w.mtx.Lock()
defer w.mtx.Unlock()
w.status = result
return nil
}
@@ -108,7 +113,7 @@ type psEntry struct {
ips []string
}
func (w Weave) ps() ([]psEntry, error) {
func (w *Weave) ps() ([]psEntry, error) {
var result []psEntry
cmd := exec.Command("weave", "--local", "ps")
out, err := cmd.StdoutPipe()
@@ -140,7 +145,10 @@ func (w Weave) ps() ([]psEntry, error) {
}
// Tag implements Tagger.
func (w Weave) Tag(r report.Report) (report.Report, error) {
func (w *Weave) Tag(r report.Report) (report.Report, error) {
w.mtx.RLock()
defer w.mtx.RUnlock()
// Put information from weaveDNS on the container nodes
for _, entry := range w.status.DNS.Entries {
if entry.Tombstone > 0 {
@@ -181,7 +189,10 @@ func (w Weave) Tag(r report.Report) (report.Report, error) {
}
// Report implements Reporter.
func (w Weave) Report() (report.Report, error) {
func (w *Weave) Report() (report.Report, error) {
w.mtx.RLock()
defer w.mtx.RUnlock()
r := report.MakeReport()
for _, peer := range w.status.Router.Peers {
r.Overlay.Nodes[report.MakeOverlayNodeID(peer.Name)] = report.MakeNodeWith(map[string]string{

46
xfer/buffer.go Normal file
View File

@@ -0,0 +1,46 @@
package xfer
import (
"bytes"
"sync"
"sync/atomic"
)
// A Buffer is a reference counted bytes.Buffer, which belongs
// to a sync.Pool
type Buffer struct {
bytes.Buffer
pool *sync.Pool
refs int32
}
// NewBuffer creates a new buffer
func NewBuffer(pool *sync.Pool) *Buffer {
return &Buffer{
pool: pool,
refs: 0,
}
}
// Get increases the reference count. It is safe for concurrent calls.
func (b *Buffer) Get() {
atomic.AddInt32(&b.refs, 1)
}
// Put decreases the reference count, and when it hits zero, puts the
// buffer back in the pool.
func (b *Buffer) Put() {
if atomic.AddInt32(&b.refs, -1) == 0 {
b.Reset()
b.pool.Put(b)
}
}
// NewBufferPool creates a new buffer pool.
func NewBufferPool() *sync.Pool {
result := &sync.Pool{}
result.New = func() interface{} {
return NewBuffer(result)
}
return result
}

View File

@@ -15,7 +15,8 @@ const (
maxBackoff = 60 * time.Second
)
// Publisher is something which can send a report to a remote collector.
// Publisher is something which can send a buffered set of data somewhere,
// probably to a collector.
type Publisher interface {
Publish(*Buffer) error
Stop()

View File

@@ -1,54 +1,13 @@
package xfer
import (
"bytes"
"compress/gzip"
"encoding/gob"
"sync"
"sync/atomic"
"github.com/weaveworks/scope/report"
)
// A Buffer is a reference counted bytes.Buffer, which belongs
// to a sync.Pool
type Buffer struct {
bytes.Buffer
pool *sync.Pool
refs int32
}
// NewBuffer creates a new buffer
func NewBuffer(pool *sync.Pool) *Buffer {
return &Buffer{
pool: pool,
refs: 0,
}
}
// Get increases the reference count. It is safe for concurrent calls.
func (b *Buffer) Get() {
atomic.AddInt32(&b.refs, 1)
}
// Put decreases the reference count, and when it hits zero, puts the
// buffer back in the pool.
func (b *Buffer) Put() {
if atomic.AddInt32(&b.refs, -1) == 0 {
b.Reset()
b.pool.Put(b)
}
}
// NewBufferPool creates a new buffer pool.
func NewBufferPool() *sync.Pool {
result := &sync.Pool{}
result.New = func() interface{} {
return NewBuffer(result)
}
return result
}
// A ReportPublisher uses a buffer pool to serialise reports, which it
// then passes to a publisher
type ReportPublisher struct {