new full reports are more important than old and shortcut reports

so when there is backpressure in publishing reports, drop shortcut
reports in preference to full reports, and drop old full reports in
preference to new full reports.

Fixes #2738.
This commit is contained in:
Matthias Radestock
2017-07-24 22:12:52 +01:00
parent 7e86836180
commit 3c6ae972ab
5 changed files with 23 additions and 11 deletions

View File

@@ -30,7 +30,7 @@ type AppClient interface {
ControlConnection()
PipeConnection(string, xfer.Pipe)
PipeClose(string) error
Publish(r io.Reader) error
Publish(io.Reader, bool) error
Target() url.URL
ReTarget(url.URL)
Stop()
@@ -311,13 +311,25 @@ func (c *appClient) startPublishing() {
}
// Publish implements Publisher
func (c *appClient) Publish(r io.Reader) error {
func (c *appClient) Publish(r io.Reader, shortcut bool) error {
// Lazily start the background publishing loop.
c.publishLoop.Do(c.startPublishing)
// enqueue report
select {
case c.readers <- r:
default:
log.Errorf("Dropping report to %s", c.hostname)
if shortcut {
return nil
}
// drop an old report to make way for new one
c.mtx.Lock()
defer c.mtx.Unlock()
select {
case <-c.readers:
default:
}
c.readers <- r
}
return nil
}

View File

@@ -40,7 +40,7 @@ type clientTuple struct {
// Publisher is something which can send a stream of data somewhere, probably
// to a remote collector.
type Publisher interface {
Publish(io.Reader) error
Publish(io.Reader, bool) error
Stop()
}
@@ -51,7 +51,7 @@ type MultiAppClient interface {
PipeConnection(appID, pipeID string, pipe xfer.Pipe) error
PipeClose(appID, pipeID string) error
Stop()
Publish(io.Reader) error
Publish(io.Reader, bool) error
}
// NewMultiAppClient creates a new MultiAppClient.
@@ -165,13 +165,13 @@ func (c *multiClient) Stop() {
// underlying publishers sequentially. To do that, it needs to drain the
// reader, and recreate new readers for each publisher. Note that it will
// publish to one endpoint for each unique ID. Failed publishes don't count.
func (c *multiClient) Publish(r io.Reader) error {
func (c *multiClient) Publish(r io.Reader, shortcut bool) error {
c.mtx.Lock()
defer c.mtx.Unlock()
if len(c.clients) <= 1 { // optimisation
for _, c := range c.clients {
return c.Publish(r)
return c.Publish(r, shortcut)
}
return nil
}
@@ -183,7 +183,7 @@ func (c *multiClient) Publish(r io.Reader) error {
errs := []string{}
for _, c := range c.clients {
if err := c.Publish(bytes.NewReader(buf)); err != nil {
if err := c.Publish(bytes.NewReader(buf), shortcut); err != nil {
errs = append(errs, err.Error())
}
}

View File

@@ -37,7 +37,7 @@ func (c *mockClient) Stop() {
c.stopped++
}
func (c *mockClient) Publish(io.Reader) error {
func (c *mockClient) Publish(io.Reader, bool) error {
c.publish++
return nil
}
@@ -106,7 +106,7 @@ func TestMultiClientPublish(t *testing.T) {
mp.Set("b", []url.URL{{Host: "b2"}, {Host: "b3"}})
for i := 1; i < 10; i++ {
if err := mp.Publish(&bytes.Buffer{}); err != nil {
if err := mp.Publish(&bytes.Buffer{}, false); err != nil {
t.Error(err)
}
if want, have := 3*i, sum(); want != have {

View File

@@ -30,5 +30,5 @@ func (p *ReportPublisher) Publish(r report.Report) error {
}
buf := &bytes.Buffer{}
r.WriteBinary(buf, gzip.DefaultCompression)
return p.publisher.Publish(buf)
return p.publisher.Publish(buf, r.Shortcut)
}

View File

@@ -53,7 +53,7 @@ type mockPublisher struct {
have chan report.Report
}
func (m mockPublisher) Publish(in io.Reader) error {
func (m mockPublisher) Publish(in io.Reader, shortcut bool) error {
var r report.Report
if reader, err := gzip.NewReader(in); err != nil {
return err