mirror of
https://github.com/weaveworks/scope.git
synced 2026-03-03 02:00:43 +00:00
Push reports to the app in the background and in parallel.
This commit is contained in:
@@ -86,8 +86,15 @@ func main() {
|
||||
log.Printf("warning: process reporting enabled, but that requires root to find everything")
|
||||
}
|
||||
|
||||
publisherFactory := func(target string) (xfer.Publisher, error) { return xfer.NewHTTPPublisher(target, *token, probeID) }
|
||||
publisherFactory := func(target string) (xfer.Publisher, error) {
|
||||
publisher, err := xfer.NewHTTPPublisher(target, *token, probeID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return xfer.NewBackgroundPublisher(publisher), nil
|
||||
}
|
||||
publishers := xfer.NewMultiPublisher(publisherFactory)
|
||||
defer publishers.Stop()
|
||||
resolver := newStaticResolver(targets, publishers.Add)
|
||||
defer resolver.Stop()
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@ import (
|
||||
// Publisher is something which can send a report to a remote collector.
|
||||
type Publisher interface {
|
||||
Publish(report.Report) error
|
||||
Stop()
|
||||
}
|
||||
|
||||
// HTTPPublisher publishes reports by POST to a fixed endpoint.
|
||||
@@ -83,12 +84,57 @@ func (p HTTPPublisher) Publish(rpt report.Report) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop implements Publisher
|
||||
func (p HTTPPublisher) Stop() {}
|
||||
|
||||
// AuthorizationHeader returns a value suitable for an HTTP Authorization
|
||||
// header, based on the passed token string.
|
||||
func AuthorizationHeader(token string) string {
|
||||
return fmt.Sprintf("Scope-Probe token=%s", token)
|
||||
}
|
||||
|
||||
// BackgroundPublisher is a publisher which does the publish asynchronously.
|
||||
// It will only do one publish at once; if there is an ongoing publish,
|
||||
// concurrent publishes are dropped.
|
||||
type BackgroundPublisher struct {
|
||||
publisher Publisher
|
||||
reports chan report.Report
|
||||
}
|
||||
|
||||
// NewBackgroundPublisher creates a new BackgroundPublisher with the given publisher
|
||||
func NewBackgroundPublisher(p Publisher) *BackgroundPublisher {
|
||||
result := &BackgroundPublisher{
|
||||
publisher: p,
|
||||
reports: make(chan report.Report),
|
||||
}
|
||||
go result.loop()
|
||||
return result
|
||||
}
|
||||
|
||||
func (b *BackgroundPublisher) loop() {
|
||||
for r := range b.reports {
|
||||
if err := b.publisher.Publish(r); err != nil {
|
||||
log.Printf("Error publishing: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Publish implements Publisher
|
||||
func (b *BackgroundPublisher) Publish(r report.Report) error {
|
||||
select {
|
||||
case b.reports <- r:
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("Dropping report - can't push fast enough")
|
||||
}
|
||||
}
|
||||
|
||||
// Stop implements Publisher
|
||||
func (b *BackgroundPublisher) Stop() {
|
||||
close(b.reports)
|
||||
b.publisher.Stop()
|
||||
}
|
||||
|
||||
// MultiPublisher implements Publisher over a set of publishers.
|
||||
type MultiPublisher struct {
|
||||
mtx sync.RWMutex
|
||||
@@ -142,3 +188,13 @@ func (p *MultiPublisher) Publish(rpt report.Report) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop implements Publisher
|
||||
func (p *MultiPublisher) Stop() {
|
||||
p.mtx.RLock()
|
||||
defer p.mtx.RUnlock()
|
||||
|
||||
for _, publisher := range p.m {
|
||||
publisher.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -102,3 +102,4 @@ func TestMultiPublisher(t *testing.T) {
|
||||
type mockPublisher struct{ count int }
|
||||
|
||||
func (p *mockPublisher) Publish(report.Report) error { p.count++; return nil }
|
||||
func (p *mockPublisher) Stop() {}
|
||||
|
||||
Reference in New Issue
Block a user