From 3c6ae972ab434f9ff88ab68fe50a72d943d16cb3 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Mon, 24 Jul 2017 22:12:52 +0100 Subject: [PATCH] new full reports are more important than old and shortcut reports so when there is backpressure in publishing reports, drop shortcut reports in preference to full reports, and drop old full reports in preference to new full reports. Fixes #2738. --- probe/appclient/app_client.go | 16 ++++++++++++++-- probe/appclient/multi_client.go | 10 +++++----- probe/appclient/multi_client_test.go | 4 ++-- probe/appclient/report_publisher.go | 2 +- probe/probe_internal_test.go | 2 +- 5 files changed, 23 insertions(+), 11 deletions(-) diff --git a/probe/appclient/app_client.go b/probe/appclient/app_client.go index e207daabc..ac3fee515 100644 --- a/probe/appclient/app_client.go +++ b/probe/appclient/app_client.go @@ -30,7 +30,7 @@ type AppClient interface { ControlConnection() PipeConnection(string, xfer.Pipe) PipeClose(string) error - Publish(r io.Reader) error + Publish(io.Reader, bool) error Target() url.URL ReTarget(url.URL) Stop() @@ -311,13 +311,25 @@ func (c *appClient) startPublishing() { } // Publish implements Publisher -func (c *appClient) Publish(r io.Reader) error { +func (c *appClient) Publish(r io.Reader, shortcut bool) error { // Lazily start the background publishing loop. c.publishLoop.Do(c.startPublishing) + // enqueue report select { case c.readers <- r: default: log.Errorf("Dropping report to %s", c.hostname) + if shortcut { + return nil + } + // drop an old report to make way for new one + c.mtx.Lock() + defer c.mtx.Unlock() + select { + case <-c.readers: + default: + } + c.readers <- r } return nil } diff --git a/probe/appclient/multi_client.go b/probe/appclient/multi_client.go index 2eb7bf556..c8187a5ed 100644 --- a/probe/appclient/multi_client.go +++ b/probe/appclient/multi_client.go @@ -40,7 +40,7 @@ type clientTuple struct { // Publisher is something which can send a stream of data somewhere, probably // to a remote collector. type Publisher interface { - Publish(io.Reader) error + Publish(io.Reader, bool) error Stop() } @@ -51,7 +51,7 @@ type MultiAppClient interface { PipeConnection(appID, pipeID string, pipe xfer.Pipe) error PipeClose(appID, pipeID string) error Stop() - Publish(io.Reader) error + Publish(io.Reader, bool) error } // NewMultiAppClient creates a new MultiAppClient. @@ -165,13 +165,13 @@ func (c *multiClient) Stop() { // underlying publishers sequentially. To do that, it needs to drain the // reader, and recreate new readers for each publisher. Note that it will // publish to one endpoint for each unique ID. Failed publishes don't count. -func (c *multiClient) Publish(r io.Reader) error { +func (c *multiClient) Publish(r io.Reader, shortcut bool) error { c.mtx.Lock() defer c.mtx.Unlock() if len(c.clients) <= 1 { // optimisation for _, c := range c.clients { - return c.Publish(r) + return c.Publish(r, shortcut) } return nil } @@ -183,7 +183,7 @@ func (c *multiClient) Publish(r io.Reader) error { errs := []string{} for _, c := range c.clients { - if err := c.Publish(bytes.NewReader(buf)); err != nil { + if err := c.Publish(bytes.NewReader(buf), shortcut); err != nil { errs = append(errs, err.Error()) } } diff --git a/probe/appclient/multi_client_test.go b/probe/appclient/multi_client_test.go index 744a9d05a..e1789a710 100644 --- a/probe/appclient/multi_client_test.go +++ b/probe/appclient/multi_client_test.go @@ -37,7 +37,7 @@ func (c *mockClient) Stop() { c.stopped++ } -func (c *mockClient) Publish(io.Reader) error { +func (c *mockClient) Publish(io.Reader, bool) error { c.publish++ return nil } @@ -106,7 +106,7 @@ func TestMultiClientPublish(t *testing.T) { mp.Set("b", []url.URL{{Host: "b2"}, {Host: "b3"}}) for i := 1; i < 10; i++ { - if err := mp.Publish(&bytes.Buffer{}); err != nil { + if err := mp.Publish(&bytes.Buffer{}, false); err != nil { t.Error(err) } if want, have := 3*i, sum(); want != have { diff --git a/probe/appclient/report_publisher.go b/probe/appclient/report_publisher.go index 64777e251..2f7bdd0b8 100644 --- a/probe/appclient/report_publisher.go +++ b/probe/appclient/report_publisher.go @@ -30,5 +30,5 @@ func (p *ReportPublisher) Publish(r report.Report) error { } buf := &bytes.Buffer{} r.WriteBinary(buf, gzip.DefaultCompression) - return p.publisher.Publish(buf) + return p.publisher.Publish(buf, r.Shortcut) } diff --git a/probe/probe_internal_test.go b/probe/probe_internal_test.go index 42b7cbd58..af6404af6 100644 --- a/probe/probe_internal_test.go +++ b/probe/probe_internal_test.go @@ -53,7 +53,7 @@ type mockPublisher struct { have chan report.Report } -func (m mockPublisher) Publish(in io.Reader) error { +func (m mockPublisher) Publish(in io.Reader, shortcut bool) error { var r report.Report if reader, err := gzip.NewReader(in); err != nil { return err