mirror of
https://github.com/weaveworks/scope.git
synced 2026-03-03 02:00:43 +00:00
Need to copy the buffer in the MultiPublisher
This commit is contained in:
@@ -16,37 +16,37 @@ const (
|
||||
// concurrent publishes are dropped.
|
||||
type BackgroundPublisher struct {
|
||||
publisher Publisher
|
||||
reports chan *bytes.Buffer
|
||||
buffers chan *bytes.Buffer
|
||||
quit chan struct{}
|
||||
}
|
||||
|
||||
// NewBackgroundPublisher creates a new BackgroundPublisher with the given publisher
|
||||
func NewBackgroundPublisher(p Publisher) *BackgroundPublisher {
|
||||
result := &BackgroundPublisher{
|
||||
bp := &BackgroundPublisher{
|
||||
publisher: p,
|
||||
reports: make(chan *bytes.Buffer),
|
||||
buffers: make(chan *bytes.Buffer),
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
go result.loop()
|
||||
return result
|
||||
go bp.loop()
|
||||
return bp
|
||||
}
|
||||
|
||||
func (b *BackgroundPublisher) loop() {
|
||||
func (bp *BackgroundPublisher) loop() {
|
||||
backoff := initialBackoff
|
||||
|
||||
for r := range b.reports {
|
||||
err := b.publisher.Publish(r)
|
||||
for buf := range bp.buffers {
|
||||
err := bp.publisher.Publish(buf)
|
||||
if err == nil {
|
||||
backoff = initialBackoff
|
||||
continue
|
||||
}
|
||||
|
||||
log.Printf("Error publishing to %s, backing off %s: %v", b.publisher, backoff, err)
|
||||
log.Printf("Error publishing to %s, backing off %s: %v", bp.publisher, backoff, err)
|
||||
select {
|
||||
case <-time.After(backoff):
|
||||
case <-b.quit:
|
||||
case <-bp.quit:
|
||||
}
|
||||
backoff = backoff * 2
|
||||
backoff *= 2
|
||||
if backoff > maxBackoff {
|
||||
backoff = maxBackoff
|
||||
}
|
||||
@@ -54,17 +54,17 @@ func (b *BackgroundPublisher) loop() {
|
||||
}
|
||||
|
||||
// Publish implements Publisher
|
||||
func (b *BackgroundPublisher) Publish(buf *bytes.Buffer) error {
|
||||
func (bp *BackgroundPublisher) Publish(buf *bytes.Buffer) error {
|
||||
select {
|
||||
case b.reports <- buf:
|
||||
case bp.buffers <- buf:
|
||||
default:
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop implements Publisher
|
||||
func (b *BackgroundPublisher) Stop() {
|
||||
close(b.reports)
|
||||
close(b.quit)
|
||||
b.publisher.Stop()
|
||||
func (bp *BackgroundPublisher) Stop() {
|
||||
close(bp.buffers)
|
||||
close(bp.quit)
|
||||
bp.publisher.Stop()
|
||||
}
|
||||
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
"github.com/weaveworks/scope/common/sanitize"
|
||||
)
|
||||
|
||||
// HTTPPublisher publishes reports by POST to a fixed endpoint.
|
||||
// HTTPPublisher publishes buffers by POST to a fixed endpoint.
|
||||
type HTTPPublisher struct {
|
||||
url string
|
||||
token string
|
||||
|
||||
@@ -74,7 +74,7 @@ func (p *MultiPublisher) Publish(buf *bytes.Buffer) error {
|
||||
if _, ok := ids[t.id]; ok {
|
||||
continue
|
||||
}
|
||||
if err := t.publisher.Publish(buf); err != nil {
|
||||
if err := t.publisher.Publish(bytes.NewBuffer(buf.Bytes())); err != nil {
|
||||
errs = append(errs, err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user