diff --git a/probe/appclient/app_client.go b/probe/appclient/app_client.go index e207daabc..ac3fee515 100644 --- a/probe/appclient/app_client.go +++ b/probe/appclient/app_client.go @@ -30,7 +30,7 @@ type AppClient interface { ControlConnection() PipeConnection(string, xfer.Pipe) PipeClose(string) error - Publish(r io.Reader) error + Publish(io.Reader, bool) error Target() url.URL ReTarget(url.URL) Stop() @@ -311,13 +311,25 @@ func (c *appClient) startPublishing() { } // Publish implements Publisher -func (c *appClient) Publish(r io.Reader) error { +func (c *appClient) Publish(r io.Reader, shortcut bool) error { // Lazily start the background publishing loop. c.publishLoop.Do(c.startPublishing) + // enqueue report select { case c.readers <- r: default: log.Errorf("Dropping report to %s", c.hostname) + if shortcut { + return nil + } + // drop an old report to make way for new one + c.mtx.Lock() + defer c.mtx.Unlock() + select { + case <-c.readers: + default: + } + c.readers <- r } return nil } diff --git a/probe/appclient/multi_client.go b/probe/appclient/multi_client.go index 2eb7bf556..c8187a5ed 100644 --- a/probe/appclient/multi_client.go +++ b/probe/appclient/multi_client.go @@ -40,7 +40,7 @@ type clientTuple struct { // Publisher is something which can send a stream of data somewhere, probably // to a remote collector. type Publisher interface { - Publish(io.Reader) error + Publish(io.Reader, bool) error Stop() } @@ -51,7 +51,7 @@ type MultiAppClient interface { PipeConnection(appID, pipeID string, pipe xfer.Pipe) error PipeClose(appID, pipeID string) error Stop() - Publish(io.Reader) error + Publish(io.Reader, bool) error } // NewMultiAppClient creates a new MultiAppClient. @@ -165,13 +165,13 @@ func (c *multiClient) Stop() { // underlying publishers sequentially. To do that, it needs to drain the // reader, and recreate new readers for each publisher. Note that it will // publish to one endpoint for each unique ID. Failed publishes don't count. -func (c *multiClient) Publish(r io.Reader) error { +func (c *multiClient) Publish(r io.Reader, shortcut bool) error { c.mtx.Lock() defer c.mtx.Unlock() if len(c.clients) <= 1 { // optimisation for _, c := range c.clients { - return c.Publish(r) + return c.Publish(r, shortcut) } return nil } @@ -183,7 +183,7 @@ func (c *multiClient) Publish(r io.Reader) error { errs := []string{} for _, c := range c.clients { - if err := c.Publish(bytes.NewReader(buf)); err != nil { + if err := c.Publish(bytes.NewReader(buf), shortcut); err != nil { errs = append(errs, err.Error()) } } diff --git a/probe/appclient/multi_client_test.go b/probe/appclient/multi_client_test.go index 744a9d05a..e1789a710 100644 --- a/probe/appclient/multi_client_test.go +++ b/probe/appclient/multi_client_test.go @@ -37,7 +37,7 @@ func (c *mockClient) Stop() { c.stopped++ } -func (c *mockClient) Publish(io.Reader) error { +func (c *mockClient) Publish(io.Reader, bool) error { c.publish++ return nil } @@ -106,7 +106,7 @@ func TestMultiClientPublish(t *testing.T) { mp.Set("b", []url.URL{{Host: "b2"}, {Host: "b3"}}) for i := 1; i < 10; i++ { - if err := mp.Publish(&bytes.Buffer{}); err != nil { + if err := mp.Publish(&bytes.Buffer{}, false); err != nil { t.Error(err) } if want, have := 3*i, sum(); want != have { diff --git a/probe/appclient/report_publisher.go b/probe/appclient/report_publisher.go index 64777e251..2f7bdd0b8 100644 --- a/probe/appclient/report_publisher.go +++ b/probe/appclient/report_publisher.go @@ -30,5 +30,5 @@ func (p *ReportPublisher) Publish(r report.Report) error { } buf := &bytes.Buffer{} r.WriteBinary(buf, gzip.DefaultCompression) - return p.publisher.Publish(buf) + return p.publisher.Publish(buf, r.Shortcut) } diff --git a/probe/probe_internal_test.go b/probe/probe_internal_test.go index 42b7cbd58..af6404af6 100644 --- a/probe/probe_internal_test.go +++ b/probe/probe_internal_test.go @@ -53,7 +53,7 @@ type mockPublisher struct { have chan report.Report } -func (m mockPublisher) Publish(in io.Reader) error { +func (m mockPublisher) Publish(in io.Reader, shortcut bool) error { var r report.Report if reader, err := gzip.NewReader(in); err != nil { return err