mirror of
https://github.com/weaveworks/scope.git
synced 2026-03-03 10:11:03 +00:00
Merge pull request #470 from weaveworks/40-percent-cpu
Reduce probe cpu usage
This commit is contained in:
@@ -27,10 +27,11 @@ func main() {
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
rp := xfer.NewReportPublisher(publisher)
|
||||
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
for range time.Tick(*publishInterval) {
|
||||
if err := publisher.Publish(demoReport(*hostCount)); err != nil {
|
||||
if err := rp.Publish(demoReport(*hostCount)); err != nil {
|
||||
log.Print(err)
|
||||
}
|
||||
}
|
||||
@@ -84,14 +85,14 @@ func demoReport(nodeCount int) report.Report {
|
||||
)
|
||||
|
||||
// Endpoint topology
|
||||
r.Endpoint = r.Endpoint.WithNode(srcPortID, report.MakeNode().WithMetadata(map[string]string{
|
||||
r.Endpoint = r.Endpoint.AddNode(srcPortID, report.MakeNode().WithMetadata(map[string]string{
|
||||
process.PID: "4000",
|
||||
"name": c.srcProc,
|
||||
"domain": "node-" + src,
|
||||
}).WithEdge(dstPortID, report.EdgeMetadata{
|
||||
MaxConnCountTCP: newu64(uint64(rand.Intn(100) + 10)),
|
||||
}))
|
||||
r.Endpoint = r.Endpoint.WithNode(dstPortID, report.MakeNode().WithMetadata(map[string]string{
|
||||
r.Endpoint = r.Endpoint.AddNode(dstPortID, report.MakeNode().WithMetadata(map[string]string{
|
||||
process.PID: "4000",
|
||||
"name": c.dstProc,
|
||||
"domain": "node-" + dst,
|
||||
@@ -100,15 +101,15 @@ func demoReport(nodeCount int) report.Report {
|
||||
}))
|
||||
|
||||
// Address topology
|
||||
r.Address = r.Address.WithNode(srcAddressID, report.MakeNode().WithMetadata(map[string]string{
|
||||
r.Address = r.Address.AddNode(srcAddressID, report.MakeNode().WithMetadata(map[string]string{
|
||||
docker.Name: src,
|
||||
}).WithAdjacent(dstAddressID))
|
||||
r.Address = r.Address.WithNode(srcAddressID, report.MakeNode().WithMetadata(map[string]string{
|
||||
r.Address = r.Address.AddNode(srcAddressID, report.MakeNode().WithMetadata(map[string]string{
|
||||
docker.Name: dst,
|
||||
}).WithAdjacent(srcAddressID))
|
||||
|
||||
// Host data
|
||||
r.Host = r.Host.WithNode("hostX", report.MakeNodeWith(map[string]string{
|
||||
r.Host = r.Host.AddNode("hostX", report.MakeNodeWith(map[string]string{
|
||||
"ts": time.Now().UTC().Format(time.RFC3339Nano),
|
||||
"host_name": "host-x",
|
||||
"local_networks": localNet.String(),
|
||||
|
||||
@@ -39,7 +39,8 @@ func main() {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
rp := xfer.NewReportPublisher(publisher)
|
||||
for range time.Tick(*publishInterval) {
|
||||
publisher.Publish(fixedReport)
|
||||
rp.Publish(fixedReport)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -64,14 +64,14 @@ func DemoReport(nodeCount int) report.Report {
|
||||
)
|
||||
|
||||
// Endpoint topology
|
||||
r.Endpoint = r.Endpoint.WithNode(srcPortID, report.MakeNode().WithMetadata(map[string]string{
|
||||
r.Endpoint = r.Endpoint.AddNode(srcPortID, report.MakeNode().WithMetadata(map[string]string{
|
||||
"pid": "4000",
|
||||
"name": c.srcProc,
|
||||
"domain": "node-" + src,
|
||||
}).WithEdge(dstPortID, report.EdgeMetadata{
|
||||
MaxConnCountTCP: newu64(uint64(rand.Intn(100) + 10)),
|
||||
}))
|
||||
r.Endpoint = r.Endpoint.WithNode(dstPortID, report.MakeNode().WithMetadata(map[string]string{
|
||||
r.Endpoint = r.Endpoint.AddNode(dstPortID, report.MakeNode().WithMetadata(map[string]string{
|
||||
"pid": "4000",
|
||||
"name": c.dstProc,
|
||||
"domain": "node-" + dst,
|
||||
@@ -80,15 +80,15 @@ func DemoReport(nodeCount int) report.Report {
|
||||
}))
|
||||
|
||||
// Address topology
|
||||
r.Address = r.Address.WithNode(srcAddressID, report.MakeNode().WithMetadata(map[string]string{
|
||||
r.Address = r.Address.AddNode(srcAddressID, report.MakeNode().WithMetadata(map[string]string{
|
||||
"name": src,
|
||||
}).WithAdjacent(dstAddressID))
|
||||
r.Address = r.Address.WithNode(dstAddressID, report.MakeNode().WithMetadata(map[string]string{
|
||||
r.Address = r.Address.AddNode(dstAddressID, report.MakeNode().WithMetadata(map[string]string{
|
||||
"name": dst,
|
||||
}).WithAdjacent(srcAddressID))
|
||||
|
||||
// Host data
|
||||
r.Host = r.Host.WithNode("hostX", report.MakeNodeWith(map[string]string{
|
||||
r.Host = r.Host.AddNode("hostX", report.MakeNodeWith(map[string]string{
|
||||
"ts": time.Now().UTC().Format(time.RFC3339Nano),
|
||||
"host_name": "host-x",
|
||||
"local_networks": localNet.String(),
|
||||
|
||||
@@ -105,7 +105,7 @@ var ConntrackModulePresent = func() bool {
|
||||
|
||||
// NB this is not re-entrant!
|
||||
func (c *Conntracker) run(args ...string) {
|
||||
args = append([]string{"-E", "-o", "xml"}, args...)
|
||||
args = append([]string{"-E", "-o", "xml", "-p", "tcp"}, args...)
|
||||
cmd := exec.Command("conntrack", args...)
|
||||
stdout, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
|
||||
@@ -178,8 +178,8 @@ func (r *Reporter) addConnection(rpt *report.Report, localAddr, remoteAddr strin
|
||||
})
|
||||
}
|
||||
|
||||
rpt.Address = rpt.Address.WithNode(localAddressNodeID, localNode)
|
||||
rpt.Address = rpt.Address.WithNode(remoteAddressNodeID, remoteNode)
|
||||
rpt.Address = rpt.Address.AddNode(localAddressNodeID, localNode)
|
||||
rpt.Address = rpt.Address.AddNode(remoteAddressNodeID, remoteNode)
|
||||
}
|
||||
|
||||
// Update endpoint topology
|
||||
@@ -225,8 +225,8 @@ func (r *Reporter) addConnection(rpt *report.Report, localAddr, remoteAddr strin
|
||||
if extraRemoteNode != nil {
|
||||
remoteNode = remoteNode.Merge(*extraRemoteNode)
|
||||
}
|
||||
rpt.Endpoint = rpt.Endpoint.WithNode(localEndpointNodeID, localNode)
|
||||
rpt.Endpoint = rpt.Endpoint.WithNode(remoteEndpointNodeID, remoteNode)
|
||||
rpt.Endpoint = rpt.Endpoint.AddNode(localEndpointNodeID, localNode)
|
||||
rpt.Endpoint = rpt.Endpoint.AddNode(remoteEndpointNodeID, remoteNode)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -113,6 +113,7 @@ func main() {
|
||||
var (
|
||||
endpointReporter = endpoint.NewReporter(hostID, hostName, *spyProcs, *useConntrack)
|
||||
processCache = process.NewCachingWalker(process.NewWalker(*procRoot))
|
||||
tickers = []Ticker{processCache}
|
||||
reporters = []Reporter{
|
||||
endpointReporter,
|
||||
host.NewReporter(hostID, hostName, localNets),
|
||||
@@ -142,6 +143,7 @@ func main() {
|
||||
if err != nil {
|
||||
log.Fatalf("failed to start Weave tagger: %v", err)
|
||||
}
|
||||
tickers = append(tickers, weave)
|
||||
taggers = append(taggers, weave)
|
||||
reporters = append(reporters, weave)
|
||||
}
|
||||
@@ -173,6 +175,7 @@ func main() {
|
||||
pubTick = time.Tick(*publishInterval)
|
||||
spyTick = time.Tick(*spyInterval)
|
||||
r = report.MakeReport()
|
||||
p = xfer.NewReportPublisher(publishers)
|
||||
)
|
||||
|
||||
for {
|
||||
@@ -180,24 +183,27 @@ func main() {
|
||||
case <-pubTick:
|
||||
publishTicks.WithLabelValues().Add(1)
|
||||
r.Window = *publishInterval
|
||||
if err := publishers.Publish(r); err != nil {
|
||||
if err := p.Publish(r); err != nil {
|
||||
log.Printf("publish: %v", err)
|
||||
}
|
||||
r = report.MakeReport()
|
||||
|
||||
case <-spyTick:
|
||||
if err := processCache.Update(); err != nil {
|
||||
log.Printf("error reading processes: %v", err)
|
||||
}
|
||||
for _, reporter := range reporters {
|
||||
newReport, err := reporter.Report()
|
||||
if err != nil {
|
||||
log.Printf("error generating report: %v", err)
|
||||
start := time.Now()
|
||||
|
||||
for _, ticker := range tickers {
|
||||
if err := ticker.Tick(); err != nil {
|
||||
log.Printf("error doing ticker: %v", err)
|
||||
}
|
||||
r = r.Merge(newReport)
|
||||
}
|
||||
|
||||
r = r.Merge(doReport(reporters))
|
||||
r = Apply(r, taggers)
|
||||
|
||||
if took := time.Since(start); took > *spyInterval {
|
||||
log.Printf("report generation took too long (%s)", took)
|
||||
}
|
||||
|
||||
case <-quit:
|
||||
return
|
||||
}
|
||||
@@ -206,6 +212,26 @@ func main() {
|
||||
log.Printf("%s", <-interrupt())
|
||||
}
|
||||
|
||||
func doReport(reporters []Reporter) report.Report {
|
||||
reports := make(chan report.Report, len(reporters))
|
||||
for _, rep := range reporters {
|
||||
go func(rep Reporter) {
|
||||
newReport, err := rep.Report()
|
||||
if err != nil {
|
||||
log.Printf("error generating report: %v", err)
|
||||
newReport = report.MakeReport() // empty is OK to merge
|
||||
}
|
||||
reports <- newReport
|
||||
}(rep)
|
||||
}
|
||||
|
||||
result := report.MakeReport()
|
||||
for i := 0; i < cap(reports); i++ {
|
||||
result = result.Merge(<-reports)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func interrupt() chan os.Signal {
|
||||
c := make(chan os.Signal)
|
||||
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"net/url"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/weaveworks/scope/common/exec"
|
||||
"github.com/weaveworks/scope/probe/docker"
|
||||
@@ -43,6 +44,9 @@ var ipMatch = regexp.MustCompile(`([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3
|
||||
type Weave struct {
|
||||
url string
|
||||
hostID string
|
||||
|
||||
mtx sync.RWMutex
|
||||
status weaveStatus
|
||||
}
|
||||
|
||||
type weaveStatus struct {
|
||||
@@ -75,24 +79,32 @@ func NewWeave(hostID, weaveRouterAddress string) (*Weave, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (w Weave) update() (weaveStatus, error) {
|
||||
var result weaveStatus
|
||||
// Tick implements Ticker
|
||||
func (w *Weave) Tick() error {
|
||||
req, err := http.NewRequest("GET", w.url, nil)
|
||||
if err != nil {
|
||||
return result, err
|
||||
return err
|
||||
}
|
||||
req.Header.Add("Accept", "application/json")
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return result, err
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return result, fmt.Errorf("Weave Tagger: got %d", resp.StatusCode)
|
||||
return fmt.Errorf("Weave Tagger: got %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
return result, json.NewDecoder(resp.Body).Decode(&result)
|
||||
var result weaveStatus
|
||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w.mtx.Lock()
|
||||
defer w.mtx.Unlock()
|
||||
w.status = result
|
||||
return nil
|
||||
}
|
||||
|
||||
type psEntry struct {
|
||||
@@ -101,7 +113,7 @@ type psEntry struct {
|
||||
ips []string
|
||||
}
|
||||
|
||||
func (w Weave) ps() ([]psEntry, error) {
|
||||
func (w *Weave) ps() ([]psEntry, error) {
|
||||
var result []psEntry
|
||||
cmd := exec.Command("weave", "--local", "ps")
|
||||
out, err := cmd.StdoutPipe()
|
||||
@@ -132,30 +144,13 @@ func (w Weave) ps() ([]psEntry, error) {
|
||||
return result, scanner.Err()
|
||||
}
|
||||
|
||||
func (w Weave) tagContainer(r report.Report, containerIDPrefix, macAddress string, ips []string) {
|
||||
for nodeid, nmd := range r.Container.Nodes {
|
||||
idPrefix := nmd.Metadata[docker.ContainerID][:12]
|
||||
if idPrefix != containerIDPrefix {
|
||||
continue
|
||||
}
|
||||
|
||||
existingIPs := report.MakeIDList(docker.ExtractContainerIPs(nmd)...)
|
||||
existingIPs = existingIPs.Add(ips...)
|
||||
nmd.Metadata[docker.ContainerIPs] = strings.Join(existingIPs, " ")
|
||||
nmd.Metadata[WeaveMACAddress] = macAddress
|
||||
r.Container.Nodes[nodeid] = nmd
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Tag implements Tagger.
|
||||
func (w Weave) Tag(r report.Report) (report.Report, error) {
|
||||
status, err := w.update()
|
||||
if err != nil {
|
||||
return r, nil
|
||||
}
|
||||
func (w *Weave) Tag(r report.Report) (report.Report, error) {
|
||||
w.mtx.RLock()
|
||||
defer w.mtx.RUnlock()
|
||||
|
||||
for _, entry := range status.DNS.Entries {
|
||||
// Put information from weaveDNS on the container nodes
|
||||
for _, entry := range w.status.DNS.Entries {
|
||||
if entry.Tombstone > 0 {
|
||||
continue
|
||||
}
|
||||
@@ -167,28 +162,39 @@ func (w Weave) Tag(r report.Report) (report.Report, error) {
|
||||
hostnames := report.IDList(strings.Fields(node.Metadata[WeaveDNSHostname]))
|
||||
hostnames = hostnames.Add(strings.TrimSuffix(entry.Hostname, "."))
|
||||
node.Metadata[WeaveDNSHostname] = strings.Join(hostnames, " ")
|
||||
r.Container.Nodes[nodeID] = node
|
||||
}
|
||||
|
||||
// Put information from weave ps on the container nodes
|
||||
psEntries, err := w.ps()
|
||||
if err != nil {
|
||||
return r, nil
|
||||
}
|
||||
containersByPrefix := map[string]report.Node{}
|
||||
for _, node := range r.Container.Nodes {
|
||||
prefix := node.Metadata[docker.ContainerID][:12]
|
||||
containersByPrefix[prefix] = node
|
||||
}
|
||||
for _, e := range psEntries {
|
||||
w.tagContainer(r, e.containerIDPrefix, e.macAddress, e.ips)
|
||||
node, ok := containersByPrefix[e.containerIDPrefix]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
existingIPs := report.MakeIDList(docker.ExtractContainerIPs(node)...)
|
||||
existingIPs = existingIPs.Add(e.ips...)
|
||||
node.Metadata[docker.ContainerIPs] = strings.Join(existingIPs, " ")
|
||||
node.Metadata[WeaveMACAddress] = e.macAddress
|
||||
}
|
||||
return r, nil
|
||||
}
|
||||
|
||||
// Report implements Reporter.
|
||||
func (w Weave) Report() (report.Report, error) {
|
||||
r := report.MakeReport()
|
||||
status, err := w.update()
|
||||
if err != nil {
|
||||
return r, err
|
||||
}
|
||||
func (w *Weave) Report() (report.Report, error) {
|
||||
w.mtx.RLock()
|
||||
defer w.mtx.RUnlock()
|
||||
|
||||
for _, peer := range status.Router.Peers {
|
||||
r := report.MakeReport()
|
||||
for _, peer := range w.status.Router.Peers {
|
||||
r.Overlay.Nodes[report.MakeOverlayNodeID(peer.Name)] = report.MakeNodeWith(map[string]string{
|
||||
WeavePeerName: peer.Name,
|
||||
WeavePeerNickName: peer.NickName,
|
||||
|
||||
@@ -30,6 +30,8 @@ func TestWeaveTaggerOverlayTopology(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
w.Tick()
|
||||
|
||||
{
|
||||
have, err := w.Report()
|
||||
if err != nil {
|
||||
|
||||
@@ -39,8 +39,8 @@ func (c *CachingWalker) Walk(f func(Process)) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Update updates cached copy of process list
|
||||
func (c *CachingWalker) Update() error {
|
||||
// Tick updates cached copy of process list
|
||||
func (c *CachingWalker) Tick() error {
|
||||
newCache := []Process{}
|
||||
err := c.source.Walk(func(p Process) {
|
||||
newCache = append(newCache, p)
|
||||
|
||||
@@ -29,7 +29,7 @@ func TestCache(t *testing.T) {
|
||||
processes: processes,
|
||||
}
|
||||
cachingWalker := process.NewCachingWalker(walker)
|
||||
err := cachingWalker.Update()
|
||||
err := cachingWalker.Tick()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -45,7 +45,7 @@ func TestCache(t *testing.T) {
|
||||
t.Errorf("%v (%v)", test.Diff(processes, have), err)
|
||||
}
|
||||
|
||||
err = cachingWalker.Update()
|
||||
err = cachingWalker.Tick()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -253,8 +253,8 @@ func (s *Sniffer) Merge(p Packet, rpt *report.Report) {
|
||||
}
|
||||
|
||||
addAdjacency := func(t report.Topology, srcNodeID, dstNodeID string) report.Topology {
|
||||
result := t.WithNode(srcNodeID, report.MakeNode().WithAdjacent(dstNodeID))
|
||||
result = result.WithNode(dstNodeID, report.MakeNode())
|
||||
result := t.AddNode(srcNodeID, report.MakeNode().WithAdjacent(dstNodeID))
|
||||
result = result.AddNode(dstNodeID, report.MakeNode())
|
||||
return result
|
||||
}
|
||||
|
||||
|
||||
@@ -16,6 +16,13 @@ type Reporter interface {
|
||||
Report() (report.Report, error)
|
||||
}
|
||||
|
||||
// Ticker is something which will be invoked every spyDuration.
|
||||
// It's useful for things that should be updated on that interval.
|
||||
// For example, cached shared state between Taggers and Reporters.
|
||||
type Ticker interface {
|
||||
Tick() error
|
||||
}
|
||||
|
||||
// Apply tags the report with all the taggers.
|
||||
func Apply(r report.Report, taggers []Tagger) report.Report {
|
||||
var err error
|
||||
|
||||
36
report/id.go
36
report/id.go
@@ -1,8 +1,13 @@
|
||||
package report
|
||||
|
||||
import (
|
||||
"hash"
|
||||
"hash/fnv"
|
||||
"net"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/bluele/gcache"
|
||||
)
|
||||
|
||||
// TheInternet is used as a node ID to indicate a remote IP.
|
||||
@@ -21,9 +26,38 @@ const (
|
||||
EdgeDelim = "|"
|
||||
)
|
||||
|
||||
var (
|
||||
idCache = gcache.New(1024).LRU().Build()
|
||||
hashers = sync.Pool{
|
||||
New: func() interface{} {
|
||||
return fnv.New64a()
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
func lookupID(part1, part2, part3 string, f func() string) string {
|
||||
h := hashers.Get().(hash.Hash64)
|
||||
h.Write([]byte(part1))
|
||||
h.Write([]byte(part2))
|
||||
h.Write([]byte(part3))
|
||||
sum := h.Sum64()
|
||||
var result string
|
||||
if id, err := idCache.Get(sum); id != nil && err != nil {
|
||||
result = id.(string)
|
||||
} else {
|
||||
result = f()
|
||||
idCache.Set(sum, result)
|
||||
}
|
||||
h.Reset()
|
||||
hashers.Put(h)
|
||||
return result
|
||||
}
|
||||
|
||||
// MakeEndpointNodeID produces an endpoint node ID from its composite parts.
|
||||
func MakeEndpointNodeID(hostID, address, port string) string {
|
||||
return MakeAddressNodeID(hostID, address) + ScopeDelim + port
|
||||
return lookupID(hostID, address, port, func() string {
|
||||
return MakeAddressNodeID(hostID, address) + ScopeDelim + port
|
||||
})
|
||||
}
|
||||
|
||||
// MakeAddressNodeID produces an address node ID from its composite parts.
|
||||
|
||||
@@ -20,16 +20,17 @@ func MakeTopology() Topology {
|
||||
}
|
||||
}
|
||||
|
||||
// WithNode produces a topology from t, with nmd added under key nodeID; if a
|
||||
// node already exists for this key, nmd is merged with that node. Note that a
|
||||
// fresh topology is returned.
|
||||
func (t Topology) WithNode(nodeID string, nmd Node) Topology {
|
||||
// AddNode adds node to the topology under key nodeID; if a
|
||||
// node already exists for this key, nmd is merged with that node.
|
||||
// The same topology is returned to enable chaining.
|
||||
// This method is different from all the other similar methods
|
||||
// in that it mutates the Topology, to solve issues of GC pressure.
|
||||
func (t Topology) AddNode(nodeID string, nmd Node) Topology {
|
||||
if existing, ok := t.Nodes[nodeID]; ok {
|
||||
nmd = nmd.Merge(existing)
|
||||
}
|
||||
result := t.Copy()
|
||||
result.Nodes[nodeID] = nmd
|
||||
return result
|
||||
t.Nodes[nodeID] = nmd
|
||||
return t
|
||||
}
|
||||
|
||||
// Copy returns a value copy of the Topology.
|
||||
|
||||
46
xfer/buffer.go
Normal file
46
xfer/buffer.go
Normal file
@@ -0,0 +1,46 @@
|
||||
package xfer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
// A Buffer is a reference counted bytes.Buffer, which belongs
|
||||
// to a sync.Pool
|
||||
type Buffer struct {
|
||||
bytes.Buffer
|
||||
pool *sync.Pool
|
||||
refs int32
|
||||
}
|
||||
|
||||
// NewBuffer creates a new buffer
|
||||
func NewBuffer(pool *sync.Pool) *Buffer {
|
||||
return &Buffer{
|
||||
pool: pool,
|
||||
refs: 0,
|
||||
}
|
||||
}
|
||||
|
||||
// Get increases the reference count. It is safe for concurrent calls.
|
||||
func (b *Buffer) Get() {
|
||||
atomic.AddInt32(&b.refs, 1)
|
||||
}
|
||||
|
||||
// Put decreases the reference count, and when it hits zero, puts the
|
||||
// buffer back in the pool.
|
||||
func (b *Buffer) Put() {
|
||||
if atomic.AddInt32(&b.refs, -1) == 0 {
|
||||
b.Reset()
|
||||
b.pool.Put(b)
|
||||
}
|
||||
}
|
||||
|
||||
// NewBufferPool creates a new buffer pool.
|
||||
func NewBufferPool() *sync.Pool {
|
||||
result := &sync.Pool{}
|
||||
result.New = func() interface{} {
|
||||
return NewBuffer(result)
|
||||
}
|
||||
return result
|
||||
}
|
||||
@@ -1,9 +1,6 @@
|
||||
package xfer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"encoding/gob"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
@@ -11,8 +8,6 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/weaveworks/scope/report"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -20,9 +15,10 @@ const (
|
||||
maxBackoff = 60 * time.Second
|
||||
)
|
||||
|
||||
// Publisher is something which can send a report to a remote collector.
|
||||
// Publisher is something which can send a buffered set of data somewhere,
|
||||
// probably to a collector.
|
||||
type Publisher interface {
|
||||
Publish(report.Report) error
|
||||
Publish(*Buffer) error
|
||||
Stop()
|
||||
}
|
||||
|
||||
@@ -63,16 +59,10 @@ func (p HTTPPublisher) String() string {
|
||||
}
|
||||
|
||||
// Publish publishes the report to the URL.
|
||||
func (p HTTPPublisher) Publish(rpt report.Report) error {
|
||||
gzbuf := bytes.Buffer{}
|
||||
gzwriter := gzip.NewWriter(&gzbuf)
|
||||
func (p HTTPPublisher) Publish(buf *Buffer) error {
|
||||
defer buf.Put()
|
||||
|
||||
if err := gob.NewEncoder(gzwriter).Encode(rpt); err != nil {
|
||||
return err
|
||||
}
|
||||
gzwriter.Close() // otherwise the content won't get flushed to the output stream
|
||||
|
||||
req, err := http.NewRequest("POST", p.url, &gzbuf)
|
||||
req, err := http.NewRequest("POST", p.url, buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -108,7 +98,7 @@ func AuthorizationHeader(token string) string {
|
||||
// concurrent publishes are dropped.
|
||||
type BackgroundPublisher struct {
|
||||
publisher Publisher
|
||||
reports chan report.Report
|
||||
reports chan *Buffer
|
||||
quit chan struct{}
|
||||
}
|
||||
|
||||
@@ -116,7 +106,7 @@ type BackgroundPublisher struct {
|
||||
func NewBackgroundPublisher(p Publisher) *BackgroundPublisher {
|
||||
result := &BackgroundPublisher{
|
||||
publisher: p,
|
||||
reports: make(chan report.Report),
|
||||
reports: make(chan *Buffer),
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
go result.loop()
|
||||
@@ -146,9 +136,9 @@ func (b *BackgroundPublisher) loop() {
|
||||
}
|
||||
|
||||
// Publish implements Publisher
|
||||
func (b *BackgroundPublisher) Publish(r report.Report) error {
|
||||
func (b *BackgroundPublisher) Publish(buf *Buffer) error {
|
||||
select {
|
||||
case b.reports <- r:
|
||||
case b.reports <- buf:
|
||||
default:
|
||||
}
|
||||
return nil
|
||||
@@ -198,13 +188,18 @@ func (p *MultiPublisher) Add(target string) {
|
||||
}
|
||||
|
||||
// Publish implements Publisher by emitting the report to all publishers.
|
||||
func (p *MultiPublisher) Publish(rpt report.Report) error {
|
||||
func (p *MultiPublisher) Publish(buf *Buffer) error {
|
||||
p.mtx.RLock()
|
||||
defer p.mtx.RUnlock()
|
||||
|
||||
// First take a reference for each publisher
|
||||
for range p.m {
|
||||
buf.Get()
|
||||
}
|
||||
|
||||
var errs []string
|
||||
for _, publisher := range p.m {
|
||||
if err := publisher.Publish(rpt); err != nil {
|
||||
if err := publisher.Publish(buf); err != nil {
|
||||
errs = append(errs, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -64,7 +64,8 @@ func TestHTTPPublisher(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := p.Publish(rpt); err != nil {
|
||||
rp := xfer.NewReportPublisher(p)
|
||||
if err := rp.Publish(rpt); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -83,7 +84,7 @@ func TestMultiPublisher(t *testing.T) {
|
||||
)
|
||||
|
||||
multiPublisher.Add("first")
|
||||
if err := multiPublisher.Publish(report.MakeReport()); err != nil {
|
||||
if err := multiPublisher.Publish(xfer.NewBuffer(nil)); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if want, have := 1, p.count; want != have {
|
||||
@@ -91,7 +92,7 @@ func TestMultiPublisher(t *testing.T) {
|
||||
}
|
||||
|
||||
multiPublisher.Add("second") // but factory returns same mockPublisher
|
||||
if err := multiPublisher.Publish(report.MakeReport()); err != nil {
|
||||
if err := multiPublisher.Publish(xfer.NewBuffer(nil)); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if want, have := 3, p.count; want != have {
|
||||
@@ -101,5 +102,5 @@ func TestMultiPublisher(t *testing.T) {
|
||||
|
||||
type mockPublisher struct{ count int }
|
||||
|
||||
func (p *mockPublisher) Publish(report.Report) error { p.count++; return nil }
|
||||
func (p *mockPublisher) Stop() {}
|
||||
func (p *mockPublisher) Publish(*xfer.Buffer) error { p.count++; return nil }
|
||||
func (p *mockPublisher) Stop() {}
|
||||
|
||||
38
xfer/report_publisher.go
Normal file
38
xfer/report_publisher.go
Normal file
@@ -0,0 +1,38 @@
|
||||
package xfer
|
||||
|
||||
import (
|
||||
"compress/gzip"
|
||||
"encoding/gob"
|
||||
"sync"
|
||||
|
||||
"github.com/weaveworks/scope/report"
|
||||
)
|
||||
|
||||
// A ReportPublisher uses a buffer pool to serialise reports, which it
|
||||
// then passes to a publisher
|
||||
type ReportPublisher struct {
|
||||
buffers *sync.Pool
|
||||
publisher Publisher
|
||||
}
|
||||
|
||||
// NewReportPublisher creates a new report publisher
|
||||
func NewReportPublisher(publisher Publisher) *ReportPublisher {
|
||||
return &ReportPublisher{
|
||||
buffers: NewBufferPool(),
|
||||
publisher: publisher,
|
||||
}
|
||||
}
|
||||
|
||||
// Publish serialises and compresses a report, then passes it to a publisher
|
||||
func (p *ReportPublisher) Publish(r report.Report) error {
|
||||
buf := p.buffers.Get().(*Buffer)
|
||||
gzwriter := gzip.NewWriter(buf)
|
||||
if err := gob.NewEncoder(gzwriter).Encode(r); err != nil {
|
||||
buf.Reset()
|
||||
p.buffers.Put(buf)
|
||||
return err
|
||||
}
|
||||
gzwriter.Close() // otherwise the content won't get flushed to the output stream
|
||||
|
||||
return p.publisher.Publish(buf)
|
||||
}
|
||||
Reference in New Issue
Block a user