diff --git a/experimental/demoprobe/main.go b/experimental/demoprobe/main.go index c6513382d..55cb91fb6 100644 --- a/experimental/demoprobe/main.go +++ b/experimental/demoprobe/main.go @@ -27,10 +27,11 @@ func main() { if err != nil { log.Fatal(err) } + rp := xfer.NewReportPublisher(publisher) rand.Seed(time.Now().UnixNano()) for range time.Tick(*publishInterval) { - if err := publisher.Publish(demoReport(*hostCount)); err != nil { + if err := rp.Publish(demoReport(*hostCount)); err != nil { log.Print(err) } } @@ -84,14 +85,14 @@ func demoReport(nodeCount int) report.Report { ) // Endpoint topology - r.Endpoint = r.Endpoint.WithNode(srcPortID, report.MakeNode().WithMetadata(map[string]string{ + r.Endpoint = r.Endpoint.AddNode(srcPortID, report.MakeNode().WithMetadata(map[string]string{ process.PID: "4000", "name": c.srcProc, "domain": "node-" + src, }).WithEdge(dstPortID, report.EdgeMetadata{ MaxConnCountTCP: newu64(uint64(rand.Intn(100) + 10)), })) - r.Endpoint = r.Endpoint.WithNode(dstPortID, report.MakeNode().WithMetadata(map[string]string{ + r.Endpoint = r.Endpoint.AddNode(dstPortID, report.MakeNode().WithMetadata(map[string]string{ process.PID: "4000", "name": c.dstProc, "domain": "node-" + dst, @@ -100,15 +101,15 @@ func demoReport(nodeCount int) report.Report { })) // Address topology - r.Address = r.Address.WithNode(srcAddressID, report.MakeNode().WithMetadata(map[string]string{ + r.Address = r.Address.AddNode(srcAddressID, report.MakeNode().WithMetadata(map[string]string{ docker.Name: src, }).WithAdjacent(dstAddressID)) - r.Address = r.Address.WithNode(srcAddressID, report.MakeNode().WithMetadata(map[string]string{ + r.Address = r.Address.AddNode(srcAddressID, report.MakeNode().WithMetadata(map[string]string{ docker.Name: dst, }).WithAdjacent(srcAddressID)) // Host data - r.Host = r.Host.WithNode("hostX", report.MakeNodeWith(map[string]string{ + r.Host = r.Host.AddNode("hostX", report.MakeNodeWith(map[string]string{ "ts": time.Now().UTC().Format(time.RFC3339Nano), "host_name": "host-x", "local_networks": localNet.String(), diff --git a/experimental/fixprobe/main.go b/experimental/fixprobe/main.go index f0d0a1812..1bd1294b6 100644 --- a/experimental/fixprobe/main.go +++ b/experimental/fixprobe/main.go @@ -39,7 +39,8 @@ func main() { log.Fatal(err) } + rp := xfer.NewReportPublisher(publisher) for range time.Tick(*publishInterval) { - publisher.Publish(fixedReport) + rp.Publish(fixedReport) } } diff --git a/experimental/genreport/generate.go b/experimental/genreport/generate.go index bb5d19e12..bc90cd78e 100644 --- a/experimental/genreport/generate.go +++ b/experimental/genreport/generate.go @@ -64,14 +64,14 @@ func DemoReport(nodeCount int) report.Report { ) // Endpoint topology - r.Endpoint = r.Endpoint.WithNode(srcPortID, report.MakeNode().WithMetadata(map[string]string{ + r.Endpoint = r.Endpoint.AddNode(srcPortID, report.MakeNode().WithMetadata(map[string]string{ "pid": "4000", "name": c.srcProc, "domain": "node-" + src, }).WithEdge(dstPortID, report.EdgeMetadata{ MaxConnCountTCP: newu64(uint64(rand.Intn(100) + 10)), })) - r.Endpoint = r.Endpoint.WithNode(dstPortID, report.MakeNode().WithMetadata(map[string]string{ + r.Endpoint = r.Endpoint.AddNode(dstPortID, report.MakeNode().WithMetadata(map[string]string{ "pid": "4000", "name": c.dstProc, "domain": "node-" + dst, @@ -80,15 +80,15 @@ func DemoReport(nodeCount int) report.Report { })) // Address topology - r.Address = r.Address.WithNode(srcAddressID, report.MakeNode().WithMetadata(map[string]string{ + r.Address = r.Address.AddNode(srcAddressID, report.MakeNode().WithMetadata(map[string]string{ "name": src, }).WithAdjacent(dstAddressID)) - r.Address = r.Address.WithNode(dstAddressID, report.MakeNode().WithMetadata(map[string]string{ + r.Address = r.Address.AddNode(dstAddressID, report.MakeNode().WithMetadata(map[string]string{ "name": dst, }).WithAdjacent(srcAddressID)) // Host data - r.Host = r.Host.WithNode("hostX", report.MakeNodeWith(map[string]string{ + r.Host = r.Host.AddNode("hostX", report.MakeNodeWith(map[string]string{ "ts": time.Now().UTC().Format(time.RFC3339Nano), "host_name": "host-x", "local_networks": localNet.String(), diff --git a/probe/endpoint/conntrack.go b/probe/endpoint/conntrack.go index deefa4575..e853855bc 100644 --- a/probe/endpoint/conntrack.go +++ b/probe/endpoint/conntrack.go @@ -105,7 +105,7 @@ var ConntrackModulePresent = func() bool { // NB this is not re-entrant! func (c *Conntracker) run(args ...string) { - args = append([]string{"-E", "-o", "xml"}, args...) + args = append([]string{"-E", "-o", "xml", "-p", "tcp"}, args...) cmd := exec.Command("conntrack", args...) stdout, err := cmd.StdoutPipe() if err != nil { diff --git a/probe/endpoint/reporter.go b/probe/endpoint/reporter.go index 095b3b085..6df5eb229 100644 --- a/probe/endpoint/reporter.go +++ b/probe/endpoint/reporter.go @@ -178,8 +178,8 @@ func (r *Reporter) addConnection(rpt *report.Report, localAddr, remoteAddr strin }) } - rpt.Address = rpt.Address.WithNode(localAddressNodeID, localNode) - rpt.Address = rpt.Address.WithNode(remoteAddressNodeID, remoteNode) + rpt.Address = rpt.Address.AddNode(localAddressNodeID, localNode) + rpt.Address = rpt.Address.AddNode(remoteAddressNodeID, remoteNode) } // Update endpoint topology @@ -225,8 +225,8 @@ func (r *Reporter) addConnection(rpt *report.Report, localAddr, remoteAddr strin if extraRemoteNode != nil { remoteNode = remoteNode.Merge(*extraRemoteNode) } - rpt.Endpoint = rpt.Endpoint.WithNode(localEndpointNodeID, localNode) - rpt.Endpoint = rpt.Endpoint.WithNode(remoteEndpointNodeID, remoteNode) + rpt.Endpoint = rpt.Endpoint.AddNode(localEndpointNodeID, localNode) + rpt.Endpoint = rpt.Endpoint.AddNode(remoteEndpointNodeID, remoteNode) } } diff --git a/probe/main.go b/probe/main.go index a43b99a84..eb4495ff9 100644 --- a/probe/main.go +++ b/probe/main.go @@ -113,6 +113,7 @@ func main() { var ( endpointReporter = endpoint.NewReporter(hostID, hostName, *spyProcs, *useConntrack) processCache = process.NewCachingWalker(process.NewWalker(*procRoot)) + tickers = []Ticker{processCache} reporters = []Reporter{ endpointReporter, host.NewReporter(hostID, hostName, localNets), @@ -142,6 +143,7 @@ func main() { if err != nil { log.Fatalf("failed to start Weave tagger: %v", err) } + tickers = append(tickers, weave) taggers = append(taggers, weave) reporters = append(reporters, weave) } @@ -173,6 +175,7 @@ func main() { pubTick = time.Tick(*publishInterval) spyTick = time.Tick(*spyInterval) r = report.MakeReport() + p = xfer.NewReportPublisher(publishers) ) for { @@ -180,24 +183,27 @@ func main() { case <-pubTick: publishTicks.WithLabelValues().Add(1) r.Window = *publishInterval - if err := publishers.Publish(r); err != nil { + if err := p.Publish(r); err != nil { log.Printf("publish: %v", err) } r = report.MakeReport() case <-spyTick: - if err := processCache.Update(); err != nil { - log.Printf("error reading processes: %v", err) - } - for _, reporter := range reporters { - newReport, err := reporter.Report() - if err != nil { - log.Printf("error generating report: %v", err) + start := time.Now() + + for _, ticker := range tickers { + if err := ticker.Tick(); err != nil { + log.Printf("error doing ticker: %v", err) } - r = r.Merge(newReport) } + + r = r.Merge(doReport(reporters)) r = Apply(r, taggers) + if took := time.Since(start); took > *spyInterval { + log.Printf("report generation took too long (%s)", took) + } + case <-quit: return } @@ -206,6 +212,26 @@ func main() { log.Printf("%s", <-interrupt()) } +func doReport(reporters []Reporter) report.Report { + reports := make(chan report.Report, len(reporters)) + for _, rep := range reporters { + go func(rep Reporter) { + newReport, err := rep.Report() + if err != nil { + log.Printf("error generating report: %v", err) + newReport = report.MakeReport() // empty is OK to merge + } + reports <- newReport + }(rep) + } + + result := report.MakeReport() + for i := 0; i < cap(reports); i++ { + result = result.Merge(<-reports) + } + return result +} + func interrupt() chan os.Signal { c := make(chan os.Signal) signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) diff --git a/probe/overlay/weave.go b/probe/overlay/weave.go index b9699cdd3..f1d60d157 100644 --- a/probe/overlay/weave.go +++ b/probe/overlay/weave.go @@ -10,6 +10,7 @@ import ( "net/url" "regexp" "strings" + "sync" "github.com/weaveworks/scope/common/exec" "github.com/weaveworks/scope/probe/docker" @@ -43,6 +44,9 @@ var ipMatch = regexp.MustCompile(`([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3 type Weave struct { url string hostID string + + mtx sync.RWMutex + status weaveStatus } type weaveStatus struct { @@ -75,24 +79,32 @@ func NewWeave(hostID, weaveRouterAddress string) (*Weave, error) { }, nil } -func (w Weave) update() (weaveStatus, error) { - var result weaveStatus +// Tick implements Ticker +func (w *Weave) Tick() error { req, err := http.NewRequest("GET", w.url, nil) if err != nil { - return result, err + return err } req.Header.Add("Accept", "application/json") resp, err := http.DefaultClient.Do(req) if err != nil { - return result, err + return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - return result, fmt.Errorf("Weave Tagger: got %d", resp.StatusCode) + return fmt.Errorf("Weave Tagger: got %d", resp.StatusCode) } - return result, json.NewDecoder(resp.Body).Decode(&result) + var result weaveStatus + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return err + } + + w.mtx.Lock() + defer w.mtx.Unlock() + w.status = result + return nil } type psEntry struct { @@ -101,7 +113,7 @@ type psEntry struct { ips []string } -func (w Weave) ps() ([]psEntry, error) { +func (w *Weave) ps() ([]psEntry, error) { var result []psEntry cmd := exec.Command("weave", "--local", "ps") out, err := cmd.StdoutPipe() @@ -132,30 +144,13 @@ func (w Weave) ps() ([]psEntry, error) { return result, scanner.Err() } -func (w Weave) tagContainer(r report.Report, containerIDPrefix, macAddress string, ips []string) { - for nodeid, nmd := range r.Container.Nodes { - idPrefix := nmd.Metadata[docker.ContainerID][:12] - if idPrefix != containerIDPrefix { - continue - } - - existingIPs := report.MakeIDList(docker.ExtractContainerIPs(nmd)...) - existingIPs = existingIPs.Add(ips...) - nmd.Metadata[docker.ContainerIPs] = strings.Join(existingIPs, " ") - nmd.Metadata[WeaveMACAddress] = macAddress - r.Container.Nodes[nodeid] = nmd - break - } -} - // Tag implements Tagger. -func (w Weave) Tag(r report.Report) (report.Report, error) { - status, err := w.update() - if err != nil { - return r, nil - } +func (w *Weave) Tag(r report.Report) (report.Report, error) { + w.mtx.RLock() + defer w.mtx.RUnlock() - for _, entry := range status.DNS.Entries { + // Put information from weaveDNS on the container nodes + for _, entry := range w.status.DNS.Entries { if entry.Tombstone > 0 { continue } @@ -167,28 +162,39 @@ func (w Weave) Tag(r report.Report) (report.Report, error) { hostnames := report.IDList(strings.Fields(node.Metadata[WeaveDNSHostname])) hostnames = hostnames.Add(strings.TrimSuffix(entry.Hostname, ".")) node.Metadata[WeaveDNSHostname] = strings.Join(hostnames, " ") - r.Container.Nodes[nodeID] = node } + // Put information from weave ps on the container nodes psEntries, err := w.ps() if err != nil { return r, nil } + containersByPrefix := map[string]report.Node{} + for _, node := range r.Container.Nodes { + prefix := node.Metadata[docker.ContainerID][:12] + containersByPrefix[prefix] = node + } for _, e := range psEntries { - w.tagContainer(r, e.containerIDPrefix, e.macAddress, e.ips) + node, ok := containersByPrefix[e.containerIDPrefix] + if !ok { + continue + } + + existingIPs := report.MakeIDList(docker.ExtractContainerIPs(node)...) + existingIPs = existingIPs.Add(e.ips...) + node.Metadata[docker.ContainerIPs] = strings.Join(existingIPs, " ") + node.Metadata[WeaveMACAddress] = e.macAddress } return r, nil } // Report implements Reporter. -func (w Weave) Report() (report.Report, error) { - r := report.MakeReport() - status, err := w.update() - if err != nil { - return r, err - } +func (w *Weave) Report() (report.Report, error) { + w.mtx.RLock() + defer w.mtx.RUnlock() - for _, peer := range status.Router.Peers { + r := report.MakeReport() + for _, peer := range w.status.Router.Peers { r.Overlay.Nodes[report.MakeOverlayNodeID(peer.Name)] = report.MakeNodeWith(map[string]string{ WeavePeerName: peer.Name, WeavePeerNickName: peer.NickName, diff --git a/probe/overlay/weave_test.go b/probe/overlay/weave_test.go index f5d302a65..bb5f18f6e 100644 --- a/probe/overlay/weave_test.go +++ b/probe/overlay/weave_test.go @@ -30,6 +30,8 @@ func TestWeaveTaggerOverlayTopology(t *testing.T) { t.Fatal(err) } + w.Tick() + { have, err := w.Report() if err != nil { diff --git a/probe/process/walker.go b/probe/process/walker.go index 4a88c45cd..4ba20027e 100644 --- a/probe/process/walker.go +++ b/probe/process/walker.go @@ -39,8 +39,8 @@ func (c *CachingWalker) Walk(f func(Process)) error { return nil } -// Update updates cached copy of process list -func (c *CachingWalker) Update() error { +// Tick updates cached copy of process list +func (c *CachingWalker) Tick() error { newCache := []Process{} err := c.source.Walk(func(p Process) { newCache = append(newCache, p) diff --git a/probe/process/walker_test.go b/probe/process/walker_test.go index 37d445891..6046006bb 100644 --- a/probe/process/walker_test.go +++ b/probe/process/walker_test.go @@ -29,7 +29,7 @@ func TestCache(t *testing.T) { processes: processes, } cachingWalker := process.NewCachingWalker(walker) - err := cachingWalker.Update() + err := cachingWalker.Tick() if err != nil { t.Fatal(err) } @@ -45,7 +45,7 @@ func TestCache(t *testing.T) { t.Errorf("%v (%v)", test.Diff(processes, have), err) } - err = cachingWalker.Update() + err = cachingWalker.Tick() if err != nil { t.Fatal(err) } diff --git a/probe/sniff/sniffer.go b/probe/sniff/sniffer.go index a586d1a38..134911241 100644 --- a/probe/sniff/sniffer.go +++ b/probe/sniff/sniffer.go @@ -253,8 +253,8 @@ func (s *Sniffer) Merge(p Packet, rpt *report.Report) { } addAdjacency := func(t report.Topology, srcNodeID, dstNodeID string) report.Topology { - result := t.WithNode(srcNodeID, report.MakeNode().WithAdjacent(dstNodeID)) - result = result.WithNode(dstNodeID, report.MakeNode()) + result := t.AddNode(srcNodeID, report.MakeNode().WithAdjacent(dstNodeID)) + result = result.AddNode(dstNodeID, report.MakeNode()) return result } diff --git a/probe/tag_report.go b/probe/tag_report.go index 15024d77a..5a3bfb4e5 100644 --- a/probe/tag_report.go +++ b/probe/tag_report.go @@ -16,6 +16,13 @@ type Reporter interface { Report() (report.Report, error) } +// Ticker is something which will be invoked every spyDuration. +// It's useful for things that should be updated on that interval. +// For example, cached shared state between Taggers and Reporters. +type Ticker interface { + Tick() error +} + // Apply tags the report with all the taggers. func Apply(r report.Report, taggers []Tagger) report.Report { var err error diff --git a/report/id.go b/report/id.go index 5f676850c..19501ce04 100644 --- a/report/id.go +++ b/report/id.go @@ -1,8 +1,13 @@ package report import ( + "hash" + "hash/fnv" "net" "strings" + "sync" + + "github.com/bluele/gcache" ) // TheInternet is used as a node ID to indicate a remote IP. @@ -21,9 +26,38 @@ const ( EdgeDelim = "|" ) +var ( + idCache = gcache.New(1024).LRU().Build() + hashers = sync.Pool{ + New: func() interface{} { + return fnv.New64a() + }, + } +) + +func lookupID(part1, part2, part3 string, f func() string) string { + h := hashers.Get().(hash.Hash64) + h.Write([]byte(part1)) + h.Write([]byte(part2)) + h.Write([]byte(part3)) + sum := h.Sum64() + var result string + if id, err := idCache.Get(sum); id != nil && err != nil { + result = id.(string) + } else { + result = f() + idCache.Set(sum, result) + } + h.Reset() + hashers.Put(h) + return result +} + // MakeEndpointNodeID produces an endpoint node ID from its composite parts. func MakeEndpointNodeID(hostID, address, port string) string { - return MakeAddressNodeID(hostID, address) + ScopeDelim + port + return lookupID(hostID, address, port, func() string { + return MakeAddressNodeID(hostID, address) + ScopeDelim + port + }) } // MakeAddressNodeID produces an address node ID from its composite parts. diff --git a/report/topology.go b/report/topology.go index 033ddaed7..4ac349c72 100644 --- a/report/topology.go +++ b/report/topology.go @@ -20,16 +20,17 @@ func MakeTopology() Topology { } } -// WithNode produces a topology from t, with nmd added under key nodeID; if a -// node already exists for this key, nmd is merged with that node. Note that a -// fresh topology is returned. -func (t Topology) WithNode(nodeID string, nmd Node) Topology { +// AddNode adds node to the topology under key nodeID; if a +// node already exists for this key, nmd is merged with that node. +// The same topology is returned to enable chaining. +// This method is different from all the other similar methods +// in that it mutates the Topology, to solve issues of GC pressure. +func (t Topology) AddNode(nodeID string, nmd Node) Topology { if existing, ok := t.Nodes[nodeID]; ok { nmd = nmd.Merge(existing) } - result := t.Copy() - result.Nodes[nodeID] = nmd - return result + t.Nodes[nodeID] = nmd + return t } // Copy returns a value copy of the Topology. diff --git a/xfer/buffer.go b/xfer/buffer.go new file mode 100644 index 000000000..452411e52 --- /dev/null +++ b/xfer/buffer.go @@ -0,0 +1,46 @@ +package xfer + +import ( + "bytes" + "sync" + "sync/atomic" +) + +// A Buffer is a reference counted bytes.Buffer, which belongs +// to a sync.Pool +type Buffer struct { + bytes.Buffer + pool *sync.Pool + refs int32 +} + +// NewBuffer creates a new buffer +func NewBuffer(pool *sync.Pool) *Buffer { + return &Buffer{ + pool: pool, + refs: 0, + } +} + +// Get increases the reference count. It is safe for concurrent calls. +func (b *Buffer) Get() { + atomic.AddInt32(&b.refs, 1) +} + +// Put decreases the reference count, and when it hits zero, puts the +// buffer back in the pool. +func (b *Buffer) Put() { + if atomic.AddInt32(&b.refs, -1) == 0 { + b.Reset() + b.pool.Put(b) + } +} + +// NewBufferPool creates a new buffer pool. +func NewBufferPool() *sync.Pool { + result := &sync.Pool{} + result.New = func() interface{} { + return NewBuffer(result) + } + return result +} diff --git a/xfer/publisher.go b/xfer/publisher.go index 1eb1fcdd2..d1612e78e 100644 --- a/xfer/publisher.go +++ b/xfer/publisher.go @@ -1,9 +1,6 @@ package xfer import ( - "bytes" - "compress/gzip" - "encoding/gob" "fmt" "log" "net/http" @@ -11,8 +8,6 @@ import ( "strings" "sync" "time" - - "github.com/weaveworks/scope/report" ) const ( @@ -20,9 +15,10 @@ const ( maxBackoff = 60 * time.Second ) -// Publisher is something which can send a report to a remote collector. +// Publisher is something which can send a buffered set of data somewhere, +// probably to a collector. type Publisher interface { - Publish(report.Report) error + Publish(*Buffer) error Stop() } @@ -63,16 +59,10 @@ func (p HTTPPublisher) String() string { } // Publish publishes the report to the URL. -func (p HTTPPublisher) Publish(rpt report.Report) error { - gzbuf := bytes.Buffer{} - gzwriter := gzip.NewWriter(&gzbuf) +func (p HTTPPublisher) Publish(buf *Buffer) error { + defer buf.Put() - if err := gob.NewEncoder(gzwriter).Encode(rpt); err != nil { - return err - } - gzwriter.Close() // otherwise the content won't get flushed to the output stream - - req, err := http.NewRequest("POST", p.url, &gzbuf) + req, err := http.NewRequest("POST", p.url, buf) if err != nil { return err } @@ -108,7 +98,7 @@ func AuthorizationHeader(token string) string { // concurrent publishes are dropped. type BackgroundPublisher struct { publisher Publisher - reports chan report.Report + reports chan *Buffer quit chan struct{} } @@ -116,7 +106,7 @@ type BackgroundPublisher struct { func NewBackgroundPublisher(p Publisher) *BackgroundPublisher { result := &BackgroundPublisher{ publisher: p, - reports: make(chan report.Report), + reports: make(chan *Buffer), quit: make(chan struct{}), } go result.loop() @@ -146,9 +136,9 @@ func (b *BackgroundPublisher) loop() { } // Publish implements Publisher -func (b *BackgroundPublisher) Publish(r report.Report) error { +func (b *BackgroundPublisher) Publish(buf *Buffer) error { select { - case b.reports <- r: + case b.reports <- buf: default: } return nil @@ -198,13 +188,18 @@ func (p *MultiPublisher) Add(target string) { } // Publish implements Publisher by emitting the report to all publishers. -func (p *MultiPublisher) Publish(rpt report.Report) error { +func (p *MultiPublisher) Publish(buf *Buffer) error { p.mtx.RLock() defer p.mtx.RUnlock() + // First take a reference for each publisher + for range p.m { + buf.Get() + } + var errs []string for _, publisher := range p.m { - if err := publisher.Publish(rpt); err != nil { + if err := publisher.Publish(buf); err != nil { errs = append(errs, err.Error()) } } diff --git a/xfer/publisher_test.go b/xfer/publisher_test.go index ecf0148b0..195d0b0bf 100644 --- a/xfer/publisher_test.go +++ b/xfer/publisher_test.go @@ -64,7 +64,8 @@ func TestHTTPPublisher(t *testing.T) { if err != nil { t.Fatal(err) } - if err := p.Publish(rpt); err != nil { + rp := xfer.NewReportPublisher(p) + if err := rp.Publish(rpt); err != nil { t.Error(err) } @@ -83,7 +84,7 @@ func TestMultiPublisher(t *testing.T) { ) multiPublisher.Add("first") - if err := multiPublisher.Publish(report.MakeReport()); err != nil { + if err := multiPublisher.Publish(xfer.NewBuffer(nil)); err != nil { t.Error(err) } if want, have := 1, p.count; want != have { @@ -91,7 +92,7 @@ func TestMultiPublisher(t *testing.T) { } multiPublisher.Add("second") // but factory returns same mockPublisher - if err := multiPublisher.Publish(report.MakeReport()); err != nil { + if err := multiPublisher.Publish(xfer.NewBuffer(nil)); err != nil { t.Error(err) } if want, have := 3, p.count; want != have { @@ -101,5 +102,5 @@ func TestMultiPublisher(t *testing.T) { type mockPublisher struct{ count int } -func (p *mockPublisher) Publish(report.Report) error { p.count++; return nil } -func (p *mockPublisher) Stop() {} +func (p *mockPublisher) Publish(*xfer.Buffer) error { p.count++; return nil } +func (p *mockPublisher) Stop() {} diff --git a/xfer/report_publisher.go b/xfer/report_publisher.go new file mode 100644 index 000000000..cdf5a0101 --- /dev/null +++ b/xfer/report_publisher.go @@ -0,0 +1,38 @@ +package xfer + +import ( + "compress/gzip" + "encoding/gob" + "sync" + + "github.com/weaveworks/scope/report" +) + +// A ReportPublisher uses a buffer pool to serialise reports, which it +// then passes to a publisher +type ReportPublisher struct { + buffers *sync.Pool + publisher Publisher +} + +// NewReportPublisher creates a new report publisher +func NewReportPublisher(publisher Publisher) *ReportPublisher { + return &ReportPublisher{ + buffers: NewBufferPool(), + publisher: publisher, + } +} + +// Publish serialises and compresses a report, then passes it to a publisher +func (p *ReportPublisher) Publish(r report.Report) error { + buf := p.buffers.Get().(*Buffer) + gzwriter := gzip.NewWriter(buf) + if err := gob.NewEncoder(gzwriter).Encode(r); err != nil { + buf.Reset() + p.buffers.Put(buf) + return err + } + gzwriter.Close() // otherwise the content won't get flushed to the output stream + + return p.publisher.Publish(buf) +}