From 4f4d986571af1c99846ed14cd09b2ae4f815ed91 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Fri, 4 Dec 2015 12:17:21 +0000 Subject: [PATCH 1/4] Merge http publish and app client. --- prog/probe/main.go | 15 +- xfer/app_client.go | 125 +++++++++++---- ...er_test.go => app_client_internal_test.go} | 36 +++-- xfer/background_publisher.go | 70 --------- xfer/background_publisher_test.go | 29 ---- xfer/http_publisher.go | 95 ------------ xfer/multi_client.go | 44 ++++++ ..._test.go => multi_client_internal_test.go} | 0 xfer/multi_client_test.go | 65 +++++--- xfer/multi_publisher.go | 144 ------------------ xfer/multi_publisher_test.go | 54 ------- 11 files changed, 214 insertions(+), 463 deletions(-) rename xfer/{http_publisher_test.go => app_client_internal_test.go} (73%) delete mode 100644 xfer/background_publisher.go delete mode 100644 xfer/background_publisher_test.go delete mode 100644 xfer/http_publisher.go rename xfer/{multi_publisher_internal_test.go => multi_client_internal_test.go} (100%) delete mode 100644 xfer/multi_publisher.go delete mode 100644 xfer/multi_publisher_test.go diff --git a/prog/probe/main.go b/prog/probe/main.go index 03f28cdb3..3cfbf7b28 100644 --- a/prog/probe/main.go +++ b/prog/probe/main.go @@ -101,17 +101,6 @@ func main() { } log.Printf("publishing to: %s", strings.Join(targets, ", ")) - factory := func(hostname, endpoint string) (string, xfer.Publisher, error) { - id, publisher, err := xfer.NewHTTPPublisher(hostname, endpoint, *token, probeID, *insecure) - if err != nil { - return "", nil, err - } - return id, xfer.NewBackgroundPublisher(publisher), nil - } - - publishers := xfer.NewMultiPublisher(factory) - defer publishers.Stop() - clients := xfer.NewMultiAppClient(xfer.ProbeConfig{ Token: *token, ProbeID: probeID, @@ -119,14 +108,14 @@ func main() { }, xfer.ControlHandlerFunc(controls.HandleControlRequest), xfer.NewAppClient) defer clients.Stop() - resolver := xfer.NewStaticResolver(targets, publishers.Set, clients.Set) + resolver := xfer.NewStaticResolver(targets, clients.Set) defer resolver.Stop() endpointReporter := endpoint.NewReporter(hostID, hostName, *spyProcs, *useConntrack) defer endpointReporter.Stop() processCache := process.NewCachingWalker(process.NewWalker(*procRoot)) - p := probe.New(*spyInterval, *publishInterval, publishers) + p := probe.New(*spyInterval, *publishInterval, clients) p.AddTicker(processCache) p.AddReporter( endpointReporter, diff --git a/xfer/app_client.go b/xfer/app_client.go index 1b1fd579d..f2a8dacd8 100644 --- a/xfer/app_client.go +++ b/xfer/app_client.go @@ -2,6 +2,8 @@ package xfer import ( "encoding/json" + "fmt" + "io" "log" "net/http" "net/rpc" @@ -13,6 +15,11 @@ import ( "github.com/weaveworks/scope/common/sanitize" ) +const ( + initialBackoff = 1 * time.Second + maxBackoff = 60 * time.Second +) + // Details are some generic details that can be fetched from /api type Details struct { ID string `json:"id"` @@ -23,6 +30,7 @@ type Details struct { type AppClient interface { Details() (Details, error) ControlConnection(handler ControlHandler) + Publish(r io.Reader) error Stop() } @@ -34,6 +42,11 @@ type appClient struct { insecure bool client http.Client + // For publish + publishLoop sync.Once + readers chan io.Reader + + // For controls controlServerCodecMtx sync.Mutex controlServerCodec rpc.ServerCodec } @@ -45,20 +58,24 @@ func NewAppClient(pc ProbeConfig, hostname, target string) (AppClient, error) { return nil, err } - return &appClient{ + appClient := &appClient{ ProbeConfig: pc, quit: make(chan struct{}), + readers: make(chan io.Reader), target: target, client: http.Client{ Transport: httpTransport, }, - }, nil + } + + return appClient, nil } // Stop stops the appClient. func (c *appClient) Stop() { c.controlServerCodecMtx.Lock() defer c.controlServerCodecMtx.Unlock() + close(c.readers) close(c.quit) if c.controlServerCodec != nil { c.controlServerCodec.Close() @@ -81,6 +98,32 @@ func (c *appClient) Details() (Details, error) { return result, json.NewDecoder(resp.Body).Decode(&result) } +func (c *appClient) doWithBackoff(msg string, f func() (bool, error)) { + backoff := initialBackoff + + for { + again, err := f() + if !again { + return + } + if err == nil { + backoff = initialBackoff + continue + } + + log.Printf("Error doing %s for %s, backing off %s: %v", msg, c.target, backoff, err) + select { + case <-time.After(backoff): + case <-c.quit: + return + } + backoff *= 2 + if backoff > maxBackoff { + backoff = maxBackoff + } + } +} + func (c *appClient) controlConnection(handler ControlHandler) error { dialer := websocket.Dialer{} headers := http.Header{} @@ -122,30 +165,58 @@ func (c *appClient) controlConnection(handler ControlHandler) error { return nil } -func (c *appClient) controlConnectionLoop(handler ControlHandler) { - defer log.Printf("Control connection to %s exiting", c.target) - backoff := initialBackoff - - for { - err := c.controlConnection(handler) - if err == nil { - backoff = initialBackoff - continue - } - - log.Printf("Error doing controls for %s, backing off %s: %v", c.target, backoff, err) - select { - case <-time.After(backoff): - case <-c.quit: - return - } - backoff *= 2 - if backoff > maxBackoff { - backoff = maxBackoff - } - } -} - func (c *appClient) ControlConnection(handler ControlHandler) { - go c.controlConnectionLoop(handler) + go func() { + log.Printf("Control connection to %s starting", c.target) + defer log.Printf("Control connection to %s exiting", c.target) + c.doWithBackoff("controls", func() (bool, error) { + return true, c.controlConnection(handler) + }) + }() +} + +func (c *appClient) publish(r io.Reader) error { + url := sanitize.URL("", 0, "/api/report")(c.target) + req, err := c.ProbeConfig.authorizedRequest("POST", url, r) + if err != nil { + return err + } + req.Header.Set("Content-Encoding", "gzip") + // req.Header.Set("Content-Type", "application/binary") // TODO: we should use http.DetectContentType(..) on the gob'ed + + resp, err := c.client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf(resp.Status) + } + return nil +} + +func (c *appClient) startPublishing() { + go func() { + log.Printf("Publish loop for %s starting", c.target) + defer log.Printf("Publish loop for %s exiting", c.target) + c.doWithBackoff("publish", func() (bool, error) { + r := <-c.readers + if r == nil { + return false, nil + } + return true, c.publish(r) + }) + }() +} + +// Publish implements Publisher +func (c *appClient) Publish(r io.Reader) error { + // Lazily start the background publishing loop. + c.publishLoop.Do(c.startPublishing) + select { + case c.readers <- r: + default: + } + return nil } diff --git a/xfer/http_publisher_test.go b/xfer/app_client_internal_test.go similarity index 73% rename from xfer/http_publisher_test.go rename to xfer/app_client_internal_test.go index 9a1bc320f..d13275fbf 100644 --- a/xfer/http_publisher_test.go +++ b/xfer/app_client_internal_test.go @@ -1,10 +1,10 @@ -package xfer_test +package xfer import ( "compress/gzip" "encoding/gob" - "encoding/json" "fmt" + "io" "net/http" "net/http/httptest" "net/url" @@ -16,10 +16,17 @@ import ( "github.com/gorilla/handlers" "github.com/weaveworks/scope/report" "github.com/weaveworks/scope/test" - "github.com/weaveworks/scope/xfer" ) -func TestHTTPPublisher(t *testing.T) { +type publisherFunc func(io.Reader) error + +func (p publisherFunc) Publish(r io.Reader) error { + return p(r) +} + +func (publisherFunc) Stop() {} + +func TestAppClientPublish(t *testing.T) { var ( token = "abcdefg" id = "1234567" @@ -32,15 +39,10 @@ func TestHTTPPublisher(t *testing.T) { t.Errorf("want %q, have %q", want, have) } - if want, have := id, r.Header.Get(xfer.ScopeProbeIDHeader); want != have { + if want, have := id, r.Header.Get(ScopeProbeIDHeader); want != have { t.Errorf("want %q, have %q", want, have) } - if r.URL.Path == "/api" { - _ = json.NewEncoder(w).Encode(map[string]string{"id": "irrelevant"}) - return - } - var have report.Report reader := r.Body @@ -73,18 +75,26 @@ func TestHTTPPublisher(t *testing.T) { if err != nil { t.Fatal(err) } - _, p, err := xfer.NewHTTPPublisher(u.Host, s.URL, token, id, false) + + pc := ProbeConfig{ + Token: token, + ProbeID: id, + Insecure: false, + } + + p, err := NewAppClient(pc, u.Host, s.URL) if err != nil { t.Fatal(err) } - rp := xfer.NewReportPublisher(p) + defer p.Stop() + rp := NewReportPublisher(publisherFunc(p.(*appClient).publish)) if err := rp.Publish(rpt); err != nil { t.Error(err) } select { case <-done: - case <-time.After(time.Millisecond): + case <-time.After(100 * time.Millisecond): t.Error("timeout") } } diff --git a/xfer/background_publisher.go b/xfer/background_publisher.go deleted file mode 100644 index cd74d23d3..000000000 --- a/xfer/background_publisher.go +++ /dev/null @@ -1,70 +0,0 @@ -package xfer - -import ( - "io" - "log" - "time" -) - -const ( - initialBackoff = 1 * time.Second - maxBackoff = 60 * time.Second -) - -// BackgroundPublisher is a publisher which does the publish asynchronously. -// It will only do one publish at once; if there is an ongoing publish, -// concurrent publishes are dropped. -type BackgroundPublisher struct { - publisher Publisher - readers chan io.Reader - quit chan struct{} -} - -// NewBackgroundPublisher creates a new BackgroundPublisher with the given publisher -func NewBackgroundPublisher(p Publisher) *BackgroundPublisher { - bp := &BackgroundPublisher{ - publisher: p, - readers: make(chan io.Reader), - quit: make(chan struct{}), - } - go bp.loop() - return bp -} - -func (bp *BackgroundPublisher) loop() { - backoff := initialBackoff - - for r := range bp.readers { - err := bp.publisher.Publish(r) - if err == nil { - backoff = initialBackoff - continue - } - - log.Printf("Error publishing to %s, backing off %s: %v", bp.publisher, backoff, err) - select { - case <-time.After(backoff): - case <-bp.quit: - } - backoff *= 2 - if backoff > maxBackoff { - backoff = maxBackoff - } - } -} - -// Publish implements Publisher -func (bp *BackgroundPublisher) Publish(r io.Reader) error { - select { - case bp.readers <- r: - default: - } - return nil -} - -// Stop implements Publisher -func (bp *BackgroundPublisher) Stop() { - close(bp.readers) - close(bp.quit) - bp.publisher.Stop() -} diff --git a/xfer/background_publisher_test.go b/xfer/background_publisher_test.go deleted file mode 100644 index 350a61f6c..000000000 --- a/xfer/background_publisher_test.go +++ /dev/null @@ -1,29 +0,0 @@ -package xfer_test - -import ( - "bytes" - "runtime" - "testing" - "time" - - "github.com/weaveworks/scope/test" - "github.com/weaveworks/scope/xfer" -) - -func TestBackgroundPublisher(t *testing.T) { - mp := mockPublisher{} - backgroundPublisher := xfer.NewBackgroundPublisher(&mp) - defer backgroundPublisher.Stop() - runtime.Gosched() - - for i := 1; i <= 10; i++ { - err := backgroundPublisher.Publish(&bytes.Buffer{}) - if err != nil { - t.Fatalf("%v", err) - } - - test.Poll(t, 100*time.Millisecond, i, func() interface{} { - return mp.count - }) - } -} diff --git a/xfer/http_publisher.go b/xfer/http_publisher.go deleted file mode 100644 index 1a80ffe2e..000000000 --- a/xfer/http_publisher.go +++ /dev/null @@ -1,95 +0,0 @@ -package xfer - -import ( - "encoding/json" - "fmt" - "io" - "net/http" - "time" - - "github.com/weaveworks/scope/common/sanitize" -) - -// HTTPPublisher publishes buffers by POST to a fixed endpoint. -type HTTPPublisher struct { - ProbeConfig - - url string - client *http.Client -} - -// NewHTTPPublisher returns an HTTPPublisher ready for use. -func NewHTTPPublisher(hostname, target, token, probeID string, insecure bool) (string, *HTTPPublisher, error) { - pc := ProbeConfig{ - Token: token, - ProbeID: probeID, - Insecure: insecure, - } - - httpTransport, err := pc.getHTTPTransport(hostname) - if err != nil { - return "", nil, err - } - - p := &HTTPPublisher{ - ProbeConfig: pc, - url: sanitize.URL("", 0, "/api/report")(target), - client: &http.Client{ - Transport: httpTransport, - }, - } - - client := &http.Client{ - Timeout: 5 * time.Second, - Transport: httpTransport, - } - req, err := pc.authorizedRequest("GET", sanitize.URL("", 0, "/api")(target), nil) - if err != nil { - return "", nil, err - } - resp, err := client.Do(req) - if err != nil { - return "", nil, err - } - defer resp.Body.Close() - var apiResponse struct { - ID string `json:"id"` - } - if err := json.NewDecoder(resp.Body).Decode(&apiResponse); err != nil { - return "", nil, err - } - return apiResponse.ID, p, nil -} - -func (p HTTPPublisher) String() string { - return p.url -} - -// Publish publishes the report to the URL. -func (p HTTPPublisher) Publish(r io.Reader) error { - req, err := p.ProbeConfig.authorizedRequest("POST", p.url, r) - if err != nil { - return err - } - req.Header.Set("Content-Encoding", "gzip") - // req.Header.Set("Content-Type", "application/binary") // TODO: we should use http.DetectContentType(..) on the gob'ed - - resp, err := p.client.Do(req) - if err != nil { - return err - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return fmt.Errorf(resp.Status) - } - return nil -} - -// Stop implements Publisher -func (p *HTTPPublisher) Stop() { - // We replace the HTTPPublishers pretty regularly, so we need to ensure the - // underlying connections get closed, or we end up with lots of idle - // goroutines on the server (see #604) - p.client.Transport.(*http.Transport).CloseIdleConnections() -} diff --git a/xfer/multi_client.go b/xfer/multi_client.go index f3eb13195..d0eb5a21e 100644 --- a/xfer/multi_client.go +++ b/xfer/multi_client.go @@ -1,12 +1,19 @@ package xfer import ( + "bytes" + "errors" + "io" + "io/ioutil" "log" + "strings" "sync" "github.com/weaveworks/scope/report" ) +const maxConcurrentGET = 10 + // ClientFactory is a thing thats makes AppClients type ClientFactory func(ProbeConfig, string, string) (AppClient, error) @@ -33,6 +40,7 @@ type clientTuple struct { type MultiAppClient interface { Set(hostname string, endpoints []string) Stop() + Publish(io.Reader) error } // NewMultiAppClient creates a new MultiAppClient. @@ -116,3 +124,39 @@ func (c *multiClient) Stop() { c.clients = map[string]AppClient{} close(c.quit) } + +// Publish implements Publisher by publishing the reader to all of the +// underlying publishers sequentially. To do that, it needs to drain the +// reader, and recreate new readers for each publisher. Note that it will +// publish to one endpoint for each unique ID. Failed publishes don't count. +func (c *multiClient) Publish(r io.Reader) error { + buf, err := ioutil.ReadAll(r) + if err != nil { + return err + } + + c.mtx.Lock() + defer c.mtx.Unlock() + errs := []string{} + for _, c := range c.clients { + if err := c.Publish(bytes.NewReader(buf)); err != nil { + errs = append(errs, err.Error()) + } + } + if len(errs) > 0 { + return errors.New(strings.Join(errs, "; ")) + } + return nil +} + +type semaphore chan struct{} + +func newSemaphore(n int) semaphore { + c := make(chan struct{}, n) + for i := 0; i < n; i++ { + c <- struct{}{} + } + return semaphore(c) +} +func (s semaphore) acquire() { <-s } +func (s semaphore) release() { s <- struct{}{} } diff --git a/xfer/multi_publisher_internal_test.go b/xfer/multi_client_internal_test.go similarity index 100% rename from xfer/multi_publisher_internal_test.go rename to xfer/multi_client_internal_test.go diff --git a/xfer/multi_client_test.go b/xfer/multi_client_test.go index 936143595..aecf8b587 100644 --- a/xfer/multi_client_test.go +++ b/xfer/multi_client_test.go @@ -1,6 +1,8 @@ package xfer_test import ( + "bytes" + "io" "runtime" "testing" @@ -11,6 +13,7 @@ type mockClient struct { id string count int stopped int + publish int } func (c *mockClient) Details() (xfer.Details, error) { @@ -25,26 +28,33 @@ func (c *mockClient) Stop() { c.stopped++ } +func (c *mockClient) Publish(io.Reader) error { + c.publish++ + return nil +} + +var ( + a1 = &mockClient{id: "1"} // hostname a, app id 1 + a2 = &mockClient{id: "2"} // hostname a, app id 2 + b2 = &mockClient{id: "2"} // hostname b, app id 2 (duplicate) + b3 = &mockClient{id: "3"} // hostname b, app id 3 + factory = func(_ xfer.ProbeConfig, hostname, target string) (xfer.AppClient, error) { + switch target { + case "a1": + return a1, nil + case "a2": + return a2, nil + case "b2": + return b2, nil + case "b3": + return b3, nil + } + panic(target) + } +) + func TestMultiClient(t *testing.T) { var ( - a1 = &mockClient{id: "1"} // hostname a, app id 1 - a2 = &mockClient{id: "2"} // hostname a, app id 2 - b2 = &mockClient{id: "2"} // hostname b, app id 2 (duplicate) - b3 = &mockClient{id: "3"} // hostname b, app id 3 - factory = func(_ xfer.ProbeConfig, hostname, target string) (xfer.AppClient, error) { - switch target { - case "a1": - return a1, nil - case "a2": - return a2, nil - case "b2": - return b2, nil - case "b3": - return b3, nil - } - t.Fatal(target) - return a1, nil - } controlHandler = xfer.ControlHandlerFunc(func(_ xfer.Request) xfer.Response { return xfer.Response{} }) @@ -76,3 +86,22 @@ func TestMultiClient(t *testing.T) { mp.Set("b", []string{}) expect(b3.stopped, 1) } + +func TestMultiClientPublish(t *testing.T) { + mp := xfer.NewMultiAppClient(xfer.ProbeConfig{}, nil, factory) + defer mp.Stop() + + sum := func() int { return a1.publish + a2.publish + b2.publish + b3.publish } + + mp.Set("a", []string{"a1", "a2"}) + mp.Set("b", []string{"b2", "b3"}) + + for i := 1; i < 10; i++ { + if err := mp.Publish(&bytes.Buffer{}); err != nil { + t.Error(err) + } + if want, have := 3*i, sum(); want != have { + t.Errorf("want %d, have %d", want, have) + } + } +} diff --git a/xfer/multi_publisher.go b/xfer/multi_publisher.go deleted file mode 100644 index ba2f1ec11..000000000 --- a/xfer/multi_publisher.go +++ /dev/null @@ -1,144 +0,0 @@ -package xfer - -import ( - "bytes" - "errors" - "io" - "io/ioutil" - "log" - "strings" - "sync" -) - -// MultiPublisher implements publisher over a collection of heterogeneous -// targets. See documentation of each method to understand the semantics. -type MultiPublisher struct { - mtx sync.Mutex - factory func(hostname, endpoint string) (string, Publisher, error) - sema semaphore - list []tuple -} - -// NewMultiPublisher returns a new MultiPublisher ready for use. -func NewMultiPublisher(factory func(hostname, endpoint string) (string, Publisher, error)) *MultiPublisher { - return &MultiPublisher{ - factory: factory, - sema: newSemaphore(maxConcurrentGET), - } -} - -type tuple struct { - publisher Publisher - target string // DNS name - endpoint string // IP addr - id string // unique ID from app - err error // if factory failed -} - -const maxConcurrentGET = 10 - -// Set declares that the target (DNS name) resolves to the provided endpoints -// (IPs), and that we want to publish to each of those endpoints. Set replaces -// any existing publishers to the given target. Set invokes the factory method -// to convert each endpoint to a publisher, and to get the remote receiver's -// unique ID. -func (p *MultiPublisher) Set(target string, endpoints []string) { - // Convert endpoints to publishers. - c := make(chan tuple, len(endpoints)) - for _, endpoint := range endpoints { - go func(endpoint string) { - p.sema.acquire() - defer p.sema.release() - id, publisher, err := p.factory(target, endpoint) - c <- tuple{publisher, target, endpoint, id, err} - }(endpoint) - } - list := make([]tuple, 0, len(p.list)+len(endpoints)) - for i := 0; i < cap(c); i++ { - t := <-c - if t.err != nil { - log.Printf("multi-publisher set: %s (%s): %v", t.target, t.endpoint, t.err) - continue - } - list = append(list, t) - } - - // Copy all other tuples over to the new list. - p.mtx.Lock() - defer p.mtx.Unlock() - p.list = p.appendFilter(list, func(t tuple) bool { return t.target != target }) -} - -// Delete removes all endpoints that match the given target. -func (p *MultiPublisher) Delete(target string) { - p.mtx.Lock() - defer p.mtx.Unlock() - p.list = p.appendFilter([]tuple{}, func(t tuple) bool { return t.target != target }) -} - -// Publish implements Publisher by publishing the reader to all of the -// underlying publishers sequentially. To do that, it needs to drain the -// reader, and recreate new readers for each publisher. Note that it will -// publish to one endpoint for each unique ID. Failed publishes don't count. -func (p *MultiPublisher) Publish(r io.Reader) error { - buf, err := ioutil.ReadAll(r) - if err != nil { - return err - } - - var ( - ids = map[string]struct{}{} - errs = []string{} - ) - - p.mtx.Lock() - defer p.mtx.Unlock() - - for _, t := range p.list { - if _, ok := ids[t.id]; ok { - continue - } - if err := t.publisher.Publish(bytes.NewReader(buf)); err != nil { - errs = append(errs, err.Error()) - continue - } - ids[t.id] = struct{}{} // sent already - } - if len(errs) > 0 { - return errors.New(strings.Join(errs, "; ")) - } - return nil -} - -// Stop invokes stop on all underlying publishers and removes them. -func (p *MultiPublisher) Stop() { - p.mtx.Lock() - defer p.mtx.Unlock() - for _, t := range p.list { - t.publisher.Stop() - } - p.list = []tuple{} -} - -func (p *MultiPublisher) appendFilter(list []tuple, f func(tuple) bool) []tuple { - for _, t := range p.list { - if !f(t) { - t.publisher.Stop() - continue - } - list = append(list, t) - } - return list -} - -type semaphore chan struct{} - -func newSemaphore(n int) semaphore { - c := make(chan struct{}, n) - for i := 0; i < n; i++ { - c <- struct{}{} - } - return semaphore(c) -} -func (s semaphore) acquire() { <-s } -func (s semaphore) release() { s <- struct{}{} } diff --git a/xfer/multi_publisher_test.go b/xfer/multi_publisher_test.go deleted file mode 100644 index d25123b06..000000000 --- a/xfer/multi_publisher_test.go +++ /dev/null @@ -1,54 +0,0 @@ -package xfer_test - -import ( - "bytes" - "fmt" - "io" - "testing" - - "github.com/weaveworks/scope/xfer" -) - -func TestMultiPublisher(t *testing.T) { - var ( - a1 = &mockPublisher{} // target a, endpoint 1 - a2 = &mockPublisher{} // target a, endpoint 2 (duplicate) - b2 = &mockPublisher{} // target b, endpoint 2 (duplicate) - b3 = &mockPublisher{} // target b, endpoint 3 - ) - - sum := func() int { return a1.count + a2.count + b2.count + b3.count } - - mp := xfer.NewMultiPublisher(func(hostname, endpoint string) (string, xfer.Publisher, error) { - switch endpoint { - case "a1": - return "1", a1, nil - case "a2": - return "2", a2, nil - case "b2": - return "2", b2, nil - case "b3": - return "3", b3, nil - default: - return "", nil, fmt.Errorf("invalid endpoint %s", endpoint) - } - }) - defer mp.Stop() - - mp.Set("a", []string{"a1", "a2"}) - mp.Set("b", []string{"b2", "b3"}) - - for i := 1; i < 10; i++ { - if err := mp.Publish(&bytes.Buffer{}); err != nil { - t.Error(err) - } - if want, have := 3*i, sum(); want != have { - t.Errorf("want %d, have %d", want, have) - } - } -} - -type mockPublisher struct{ count int } - -func (p *mockPublisher) Publish(io.Reader) error { p.count++; return nil } -func (p *mockPublisher) Stop() {} From 95e58b32cbced9101d7bb981e17814a0c667bade Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Fri, 4 Dec 2015 12:50:33 +0000 Subject: [PATCH 2/4] Update experimental/ --- experimental/demoprobe/main.go | 8 ++++++-- experimental/fixprobe/main.go | 8 ++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/experimental/demoprobe/main.go b/experimental/demoprobe/main.go index a3d557ef8..613966f2a 100644 --- a/experimental/demoprobe/main.go +++ b/experimental/demoprobe/main.go @@ -23,11 +23,15 @@ func main() { ) flag.Parse() - _, publisher, err := xfer.NewHTTPPublisher(*publish, *publish, "demoprobe", "demoprobe", false) + client, err := xfer.NewAppClient(xfer.ProbeConfig{ + Token: "demoprobe", + ProbeID: "demoprobe", + Insecure: false, + }, *publish, *publish) if err != nil { log.Fatal(err) } - rp := xfer.NewReportPublisher(publisher) + rp := xfer.NewReportPublisher(client) rand.Seed(time.Now().UnixNano()) for range time.Tick(*publishInterval) { diff --git a/experimental/fixprobe/main.go b/experimental/fixprobe/main.go index b253f79bc..78a70d442 100644 --- a/experimental/fixprobe/main.go +++ b/experimental/fixprobe/main.go @@ -34,12 +34,16 @@ func main() { } f.Close() - _, publisher, err := xfer.NewHTTPPublisher(*publish, *publish, "fixprobe", "fixprobe", false) + client, err := xfer.NewAppClient(xfer.ProbeConfig{ + Token: "fixprobe", + ProbeID: "fixprobe", + Insecure: false, + }, *publish, *publish) if err != nil { log.Fatal(err) } - rp := xfer.NewReportPublisher(publisher) + rp := xfer.NewReportPublisher(client) for range time.Tick(*publishInterval) { rp.Publish(fixedReport) } From 563a6e06ba4486d2c421c8592c5faf83c9eee5a3 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Fri, 4 Dec 2015 13:03:48 +0000 Subject: [PATCH 3/4] Extend the testing of the AppClient. --- xfer/app_client_internal_test.go | 120 +++++++++++++++++++++++++++---- 1 file changed, 105 insertions(+), 15 deletions(-) diff --git a/xfer/app_client_internal_test.go b/xfer/app_client_internal_test.go index d13275fbf..4b12eb16a 100644 --- a/xfer/app_client_internal_test.go +++ b/xfer/app_client_internal_test.go @@ -3,6 +3,7 @@ package xfer import ( "compress/gzip" "encoding/gob" + "encoding/json" "fmt" "io" "net/http" @@ -26,21 +27,14 @@ func (p publisherFunc) Publish(r io.Reader) error { func (publisherFunc) Stop() {} -func TestAppClientPublish(t *testing.T) { - var ( - token = "abcdefg" - id = "1234567" - rpt = report.MakeReport() - done = make(chan struct{}) - ) - +func dummyServer(t *testing.T, expectedToken, expectedID string, expectedReport report.Report, done chan struct{}) *httptest.Server { handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if want, have := fmt.Sprintf("Scope-Probe token=%s", token), r.Header.Get("Authorization"); want != have { - t.Errorf("want %q, have %q", want, have) + if have := r.Header.Get("Authorization"); fmt.Sprintf("Scope-Probe token=%s", expectedToken) != have { + t.Errorf("want %q, have %q", expectedToken, have) } - if want, have := id, r.Header.Get(ScopeProbeIDHeader); want != have { - t.Errorf("want %q, have %q", want, have) + if have := r.Header.Get(ScopeProbeIDHeader); expectedID != have { + t.Errorf("want %q, have %q", expectedID, have) } var have report.Report @@ -60,15 +54,26 @@ func TestAppClientPublish(t *testing.T) { t.Error(err) return } - if want := rpt; !reflect.DeepEqual(want, have) { - t.Error(test.Diff(want, have)) + if !reflect.DeepEqual(expectedReport, have) { + t.Error(test.Diff(expectedReport, have)) return } w.WriteHeader(http.StatusOK) close(done) }) - s := httptest.NewServer(handlers.CompressHandler(handler)) + return httptest.NewServer(handlers.CompressHandler(handler)) +} + +func TestAppClientPublishInternal(t *testing.T) { + var ( + token = "abcdefg" + id = "1234567" + rpt = report.MakeReport() + done = make(chan struct{}) + ) + + s := dummyServer(t, token, id, rpt, done) defer s.Close() u, err := url.Parse(s.URL) @@ -98,3 +103,88 @@ func TestAppClientPublish(t *testing.T) { t.Error("timeout") } } + +func TestAppClientDetails(t *testing.T) { + var ( + token = "abcdefg" + id = "1234567" + rpt = report.MakeReport() + done = make(chan struct{}) + ) + + s := dummyServer(t, token, id, rpt, done) + defer s.Close() + + u, err := url.Parse(s.URL) + if err != nil { + t.Fatal(err) + } + + pc := ProbeConfig{ + Token: token, + ProbeID: id, + Insecure: false, + } + + p, err := NewAppClient(pc, u.Host, s.URL) + if err != nil { + t.Fatal(err) + } + defer p.Stop() + + // First few reports might be dropped as the client is spinning up. + rp := NewReportPublisher(p) + for i := 0; i < 3; i++ { + if err := rp.Publish(rpt); err != nil { + t.Error(err) + } + } + + select { + case <-done: + case <-time.After(100 * time.Millisecond): + t.Error("timeout") + } +} + +func TestAppClientPublish(t *testing.T) { + var ( + id = "foobarbaz" + version = "imalittleteapot" + want = Details{ID: id, Version: version} + ) + + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if err := json.NewEncoder(w).Encode(want); err != nil { + t.Fatal(err) + } + }) + + s := httptest.NewServer(handlers.CompressHandler(handler)) + defer s.Close() + + u, err := url.Parse(s.URL) + if err != nil { + t.Fatal(err) + } + + pc := ProbeConfig{ + Token: "", + ProbeID: "", + Insecure: false, + } + p, err := NewAppClient(pc, u.Host, s.URL) + if err != nil { + t.Fatal(err) + } + defer p.Stop() + + have, err := p.Details() + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(want, have) { + t.Error(test.Diff(want, have)) + return + } +} From cb8ae536de891342b109c169f619346c5c46b17c Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 7 Dec 2015 11:38:00 +0000 Subject: [PATCH 4/4] Review feedback --- xfer/app_client.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/xfer/app_client.go b/xfer/app_client.go index f2a8dacd8..13fd13bc7 100644 --- a/xfer/app_client.go +++ b/xfer/app_client.go @@ -102,8 +102,8 @@ func (c *appClient) doWithBackoff(msg string, f func() (bool, error)) { backoff := initialBackoff for { - again, err := f() - if !again { + done, err := f() + if done { return } if err == nil { @@ -170,7 +170,7 @@ func (c *appClient) ControlConnection(handler ControlHandler) { log.Printf("Control connection to %s starting", c.target) defer log.Printf("Control connection to %s exiting", c.target) c.doWithBackoff("controls", func() (bool, error) { - return true, c.controlConnection(handler) + return false, c.controlConnection(handler) }) }() } @@ -203,9 +203,9 @@ func (c *appClient) startPublishing() { c.doWithBackoff("publish", func() (bool, error) { r := <-c.readers if r == nil { - return false, nil + return true, nil } - return true, c.publish(r) + return false, c.publish(r) }) }() }