Merge pull request #2743 from weaveworks/2738-prefer-full-reports

new full reports are more important than old and shortcut reports

Fixes #2738.
This commit is contained in:
Matthias Radestock
2017-07-25 15:17:59 +01:00
committed by GitHub
5 changed files with 23 additions and 11 deletions

View File

@@ -30,7 +30,7 @@ type AppClient interface {
ControlConnection()
PipeConnection(string, xfer.Pipe)
PipeClose(string) error
Publish(r io.Reader) error
Publish(io.Reader, bool) error
Target() url.URL
ReTarget(url.URL)
Stop()
@@ -311,13 +311,25 @@ func (c *appClient) startPublishing() {
}
// Publish implements Publisher
func (c *appClient) Publish(r io.Reader) error {
func (c *appClient) Publish(r io.Reader, shortcut bool) error {
// Lazily start the background publishing loop.
c.publishLoop.Do(c.startPublishing)
// enqueue report
select {
case c.readers <- r:
default:
log.Errorf("Dropping report to %s", c.hostname)
if shortcut {
return nil
}
// drop an old report to make way for new one
c.mtx.Lock()
defer c.mtx.Unlock()
select {
case <-c.readers:
default:
}
c.readers <- r
}
return nil
}

View File

@@ -40,7 +40,7 @@ type clientTuple struct {
// Publisher is something which can send a stream of data somewhere, probably
// to a remote collector.
type Publisher interface {
Publish(io.Reader) error
Publish(io.Reader, bool) error
Stop()
}
@@ -51,7 +51,7 @@ type MultiAppClient interface {
PipeConnection(appID, pipeID string, pipe xfer.Pipe) error
PipeClose(appID, pipeID string) error
Stop()
Publish(io.Reader) error
Publish(io.Reader, bool) error
}
// NewMultiAppClient creates a new MultiAppClient.
@@ -165,13 +165,13 @@ func (c *multiClient) Stop() {
// underlying publishers sequentially. To do that, it needs to drain the
// reader, and recreate new readers for each publisher. Note that it will
// publish to one endpoint for each unique ID. Failed publishes don't count.
func (c *multiClient) Publish(r io.Reader) error {
func (c *multiClient) Publish(r io.Reader, shortcut bool) error {
c.mtx.Lock()
defer c.mtx.Unlock()
if len(c.clients) <= 1 { // optimisation
for _, c := range c.clients {
return c.Publish(r)
return c.Publish(r, shortcut)
}
return nil
}
@@ -183,7 +183,7 @@ func (c *multiClient) Publish(r io.Reader) error {
errs := []string{}
for _, c := range c.clients {
if err := c.Publish(bytes.NewReader(buf)); err != nil {
if err := c.Publish(bytes.NewReader(buf), shortcut); err != nil {
errs = append(errs, err.Error())
}
}

View File

@@ -37,7 +37,7 @@ func (c *mockClient) Stop() {
c.stopped++
}
func (c *mockClient) Publish(io.Reader) error {
func (c *mockClient) Publish(io.Reader, bool) error {
c.publish++
return nil
}
@@ -106,7 +106,7 @@ func TestMultiClientPublish(t *testing.T) {
mp.Set("b", []url.URL{{Host: "b2"}, {Host: "b3"}})
for i := 1; i < 10; i++ {
if err := mp.Publish(&bytes.Buffer{}); err != nil {
if err := mp.Publish(&bytes.Buffer{}, false); err != nil {
t.Error(err)
}
if want, have := 3*i, sum(); want != have {

View File

@@ -30,5 +30,5 @@ func (p *ReportPublisher) Publish(r report.Report) error {
}
buf := &bytes.Buffer{}
r.WriteBinary(buf, gzip.DefaultCompression)
return p.publisher.Publish(buf)
return p.publisher.Publish(buf, r.Shortcut)
}

View File

@@ -53,7 +53,7 @@ type mockPublisher struct {
have chan report.Report
}
func (m mockPublisher) Publish(in io.Reader) error {
func (m mockPublisher) Publish(in io.Reader, shortcut bool) error {
var r report.Report
if reader, err := gzip.NewReader(in); err != nil {
return err