mirror of
https://github.com/weaveworks/scope.git
synced 2026-03-03 02:00:43 +00:00
Backoff on failed report pushes.
This commit is contained in:
@@ -10,10 +10,16 @@ import (
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/weaveworks/scope/report"
|
||||
)
|
||||
|
||||
const (
|
||||
initialBackoff = 1 * time.Second
|
||||
maxBackoff = 60 * time.Second
|
||||
)
|
||||
|
||||
// Publisher is something which can send a report to a remote collector.
|
||||
type Publisher interface {
|
||||
Publish(report.Report) error
|
||||
@@ -52,6 +58,10 @@ func NewHTTPPublisher(target, token, id string) (*HTTPPublisher, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (p HTTPPublisher) String() string {
|
||||
return p.url
|
||||
}
|
||||
|
||||
// Publish publishes the report to the URL.
|
||||
func (p HTTPPublisher) Publish(rpt report.Report) error {
|
||||
gzbuf := bytes.Buffer{}
|
||||
@@ -99,6 +109,7 @@ func AuthorizationHeader(token string) string {
|
||||
type BackgroundPublisher struct {
|
||||
publisher Publisher
|
||||
reports chan report.Report
|
||||
quit chan struct{}
|
||||
}
|
||||
|
||||
// NewBackgroundPublisher creates a new BackgroundPublisher with the given publisher
|
||||
@@ -106,15 +117,30 @@ func NewBackgroundPublisher(p Publisher) *BackgroundPublisher {
|
||||
result := &BackgroundPublisher{
|
||||
publisher: p,
|
||||
reports: make(chan report.Report),
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
go result.loop()
|
||||
return result
|
||||
}
|
||||
|
||||
func (b *BackgroundPublisher) loop() {
|
||||
backoff := initialBackoff
|
||||
|
||||
for r := range b.reports {
|
||||
if err := b.publisher.Publish(r); err != nil {
|
||||
log.Printf("Error publishing: %v", err)
|
||||
err := b.publisher.Publish(r)
|
||||
if err == nil {
|
||||
backoff = initialBackoff
|
||||
continue
|
||||
}
|
||||
|
||||
log.Printf("Error publishing to %s, backing off %s: %v", b.publisher, backoff, err)
|
||||
select {
|
||||
case <-time.After(backoff):
|
||||
case <-b.quit:
|
||||
}
|
||||
backoff = backoff * 2
|
||||
if backoff > maxBackoff {
|
||||
backoff = maxBackoff
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -123,15 +149,15 @@ func (b *BackgroundPublisher) loop() {
|
||||
func (b *BackgroundPublisher) Publish(r report.Report) error {
|
||||
select {
|
||||
case b.reports <- r:
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("Dropping report - can't push fast enough")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop implements Publisher
|
||||
func (b *BackgroundPublisher) Stop() {
|
||||
close(b.reports)
|
||||
close(b.quit)
|
||||
b.publisher.Stop()
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user