From 822c09370e11faa57e9a6264ac54b2df7a52c682 Mon Sep 17 00:00:00 2001 From: Peter Bourgon Date: Mon, 14 Sep 2015 16:01:28 +0200 Subject: [PATCH 1/7] app: show unique ID in /api --- app/main.go | 13 +++++++++---- app/router.go | 3 ++- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/app/main.go b/app/main.go index 7e8e3c3b3..0320fc2cf 100644 --- a/app/main.go +++ b/app/main.go @@ -16,8 +16,13 @@ import ( "github.com/weaveworks/scope/xfer" ) -// Set during buildtime. -var version = "dev" +var ( + // Set at buildtime. + version = "dev" + + // Set at runtime. + uniqueID = "0" +) func main() { var ( @@ -33,8 +38,8 @@ func main() { } rand.Seed(time.Now().UnixNano()) - id := strconv.FormatInt(rand.Int63(), 16) - log.Printf("app starting, version %s, ID %s", version, id) + uniqueID = strconv.FormatInt(rand.Int63(), 16) + log.Printf("app starting, version %s, ID %s", version, uniqueID) c := xfer.NewCollector(*window) http.Handle("/", Router(c)) diff --git a/app/router.go b/app/router.go index a1c866173..9e99c3ed3 100644 --- a/app/router.go +++ b/app/router.go @@ -121,11 +121,12 @@ func captureTopology(rep xfer.Reporter, f func(xfer.Reporter, topologyView, http // APIDetails are some generic details that can be fetched from /api type APIDetails struct { + ID string `json:"id"` Version string `json:"version"` } func apiHandler(w http.ResponseWriter, r *http.Request) { - respondWith(w, http.StatusOK, APIDetails{Version: version}) + respondWith(w, http.StatusOK, APIDetails{ID: uniqueID, Version: version}) } // Topology option labels should tell the current state. The first item must From 64fdf6a78099589b8280d3a5dbb544e2f0cccbf6 Mon Sep 17 00:00:00 2001 From: Peter Bourgon Date: Mon, 14 Sep 2015 17:32:13 +0200 Subject: [PATCH 2/7] common/sanitize + relevant updates --- common/sanitize/sanitize.go | 39 ++++++++++++++++++++++++++++++ common/sanitize/sanitize_test.go | 34 ++++++++++++++++++++++++++ experimental/demoprobe/main.go | 2 +- experimental/fixprobe/main.go | 2 +- probe/main.go | 7 ++---- probe/overlay/weave.go | 36 ++++------------------------ probe/overlay/weave_test.go | 6 +---- xfer/publisher.go | 41 +++++++++++++++++--------------- xfer/publisher_test.go | 8 ++++++- 9 files changed, 112 insertions(+), 63 deletions(-) create mode 100644 common/sanitize/sanitize.go create mode 100644 common/sanitize/sanitize_test.go diff --git a/common/sanitize/sanitize.go b/common/sanitize/sanitize.go new file mode 100644 index 000000000..bd12bf8c9 --- /dev/null +++ b/common/sanitize/sanitize.go @@ -0,0 +1,39 @@ +package sanitize + +import ( + "fmt" + "log" + "net" + "net/url" + "strings" +) + +// URL returns a function that sanitizes a URL string. It lets underspecified +// strings to be converted to usable URLs via some default arguments. +func URL(scheme string, port int, path string) func(string) string { + if scheme == "" { + scheme = "http://" + } + return func(s string) string { + if s == "" { + return s // can't do much here + } + if !strings.HasPrefix(s, "http") { + s = scheme + s + } + u, err := url.Parse(s) + if err != nil { + log.Printf("%q: %v", s, err) + return s // oh well + } + if port > 0 { + if _, _, err = net.SplitHostPort(u.Host); err != nil { + u.Host += fmt.Sprintf(":%d", port) + } + } + if path != "" && u.Path != path { + u.Path = path + } + return u.String() + } +} diff --git a/common/sanitize/sanitize_test.go b/common/sanitize/sanitize_test.go new file mode 100644 index 000000000..1d6502488 --- /dev/null +++ b/common/sanitize/sanitize_test.go @@ -0,0 +1,34 @@ +package sanitize_test + +import ( + "testing" + + "github.com/weaveworks/scope/common/sanitize" +) + +func TestSanitizeURL(t *testing.T) { + for _, input := range []struct { + scheme string + port int + path string + input string + want string + }{ + {"", 0, "", "", ""}, + {"", 0, "", "foo", "http://foo"}, + {"", 80, "", "foo", "http://foo:80"}, + {"", 0, "some/path", "foo", "http://foo/some/path"}, + {"", 0, "/some/path", "foo", "http://foo/some/path"}, + {"https://", 0, "", "foo", "https://foo"}, + {"https://", 80, "", "foo", "https://foo:80"}, + {"https://", 0, "some/path", "foo", "https://foo/some/path"}, + {"https://", 0, "", "http://foo", "http://foo"}, // specified scheme beats default... + {"", 9999, "", "foo:80", "http://foo:80"}, // specified port beats default... + {"", 0, "/bar", "foo/baz", "http://foo/bar"}, // ...but default path beats specified! + } { + if want, have := input.want, sanitize.URL(input.scheme, input.port, input.path)(input.input); want != have { + t.Errorf("sanitize.URL(%q, %d, %q)(%q): want %q, have %q", input.scheme, input.port, input.path, input.input, want, have) + continue + } + } +} diff --git a/experimental/demoprobe/main.go b/experimental/demoprobe/main.go index 55cb91fb6..51c3ac5bf 100644 --- a/experimental/demoprobe/main.go +++ b/experimental/demoprobe/main.go @@ -23,7 +23,7 @@ func main() { ) flag.Parse() - publisher, err := xfer.NewHTTPPublisher(*publish, "demoprobe", "demoprobe") + _, publisher, err := xfer.NewHTTPPublisher(*publish, "demoprobe", "demoprobe") if err != nil { log.Fatal(err) } diff --git a/experimental/fixprobe/main.go b/experimental/fixprobe/main.go index 1bd1294b6..bcec4c05a 100644 --- a/experimental/fixprobe/main.go +++ b/experimental/fixprobe/main.go @@ -34,7 +34,7 @@ func main() { } f.Close() - publisher, err := xfer.NewHTTPPublisher(*publish, "fixprobe", "fixprobe") + _, publisher, err := xfer.NewHTTPPublisher(*publish, "fixprobe", "fixprobe") if err != nil { log.Fatal(err) } diff --git a/probe/main.go b/probe/main.go index a8dd86dda..43b6dcea6 100644 --- a/probe/main.go +++ b/probe/main.go @@ -81,7 +81,7 @@ func main() { } publisherFactory := func(target string) (xfer.Publisher, error) { - publisher, err := xfer.NewHTTPPublisher(target, *token, probeID) + _, publisher, err := xfer.NewHTTPPublisher(target, *token, probeID) if err != nil { return nil, err } @@ -133,10 +133,7 @@ func main() { } if *weaveRouterAddr != "" { - weave, err := overlay.NewWeave(hostID, *weaveRouterAddr) - if err != nil { - log.Fatalf("failed to start Weave tagger: %v", err) - } + weave := overlay.NewWeave(hostID, *weaveRouterAddr) tickers = append(tickers, weave) taggers = append(taggers, weave) reporters = append(reporters, weave) diff --git a/probe/overlay/weave.go b/probe/overlay/weave.go index c509eea2c..1a278180f 100644 --- a/probe/overlay/weave.go +++ b/probe/overlay/weave.go @@ -5,13 +5,13 @@ import ( "encoding/json" "fmt" "log" - "net" "net/http" - "net/url" "regexp" "strings" "sync" + "github.com/weaveworks/scope/common/sanitize" + "github.com/weaveworks/scope/common/exec" "github.com/weaveworks/scope/probe/docker" "github.com/weaveworks/scope/report" @@ -68,15 +68,11 @@ type weaveStatus struct { // NewWeave returns a new Weave tagger based on the Weave router at // address. The address should be an IP or FQDN, no port. -func NewWeave(hostID, weaveRouterAddress string) (*Weave, error) { - s, err := sanitize("http://", 6784, "/report")(weaveRouterAddress) - if err != nil { - return nil, err - } +func NewWeave(hostID, weaveRouterAddress string) *Weave { return &Weave{ - url: s, + url: sanitize.URL("http://", 6784, "/report")(weaveRouterAddress), hostID: hostID, - }, nil + } } // Tick implements Ticker @@ -202,25 +198,3 @@ func (w *Weave) Report() (report.Report, error) { } return r, nil } - -func sanitize(scheme string, port int, path string) func(string) (string, error) { - return func(s string) (string, error) { - if s == "" { - return "", fmt.Errorf("no host") - } - if !strings.HasPrefix(s, "http") { - s = scheme + s - } - u, err := url.Parse(s) - if err != nil { - return "", err - } - if _, _, err = net.SplitHostPort(u.Host); err != nil { - u.Host += fmt.Sprintf(":%d", port) - } - if u.Path != path { - u.Path = path - } - return u.String(), nil - } -} diff --git a/probe/overlay/weave_test.go b/probe/overlay/weave_test.go index bb5f18f6e..ab29ca781 100644 --- a/probe/overlay/weave_test.go +++ b/probe/overlay/weave_test.go @@ -25,11 +25,7 @@ func TestWeaveTaggerOverlayTopology(t *testing.T) { s := httptest.NewServer(http.HandlerFunc(mockWeaveRouter)) defer s.Close() - w, err := overlay.NewWeave(mockHostID, s.URL) - if err != nil { - t.Fatal(err) - } - + w := overlay.NewWeave(mockHostID, s.URL) w.Tick() { diff --git a/xfer/publisher.go b/xfer/publisher.go index 759e48a6e..0a0df7504 100644 --- a/xfer/publisher.go +++ b/xfer/publisher.go @@ -2,13 +2,15 @@ package xfer import ( "bytes" + "encoding/json" "fmt" "log" "net/http" - "net/url" "strings" "sync" "time" + + "github.com/weaveworks/scope/common/sanitize" ) const ( @@ -17,7 +19,7 @@ const ( ) // Publisher is something which can send a buffered set of data somewhere, -// probably to a collector. +// probably to a remote collector. type Publisher interface { Publish(*bytes.Buffer) error Stop() @@ -25,9 +27,9 @@ type Publisher interface { // HTTPPublisher publishes reports by POST to a fixed endpoint. type HTTPPublisher struct { - url string - token string - id string + url string + token string + probeID string } // ScopeProbeIDHeader is the header we use to carry the probe's unique ID. The @@ -37,21 +39,23 @@ type HTTPPublisher struct { const ScopeProbeIDHeader = "X-Scope-Probe-ID" // NewHTTPPublisher returns an HTTPPublisher ready for use. -func NewHTTPPublisher(target, token, id string) (*HTTPPublisher, error) { - if !strings.HasPrefix(target, "http") { - target = "http://" + target - } - u, err := url.Parse(target) +func NewHTTPPublisher(target, token, probeID string) (string, *HTTPPublisher, error) { + targetAPI := sanitize.URL("http://", 0, "/api")(target) + resp, err := http.Get(targetAPI) if err != nil { - return nil, err + return "", nil, err } - if u.Path == "" { - u.Path = "/api/report" + defer resp.Body.Close() + var apiResponse struct { + ID string `json:"id"` } - return &HTTPPublisher{ - url: u.String(), - token: token, - id: id, + if err := json.NewDecoder(resp.Body).Decode(&apiResponse); err != nil { + return "", nil, err + } + return apiResponse.ID, &HTTPPublisher{ + url: sanitize.URL("http://", 0, "/api/report")(target), + token: token, + probeID: probeID, }, nil } @@ -65,9 +69,8 @@ func (p HTTPPublisher) Publish(buf *bytes.Buffer) error { if err != nil { return err } - req.Header.Set("Authorization", AuthorizationHeader(p.token)) - req.Header.Set(ScopeProbeIDHeader, p.id) + req.Header.Set(ScopeProbeIDHeader, p.probeID) req.Header.Set("Content-Encoding", "gzip") // req.Header.Set("Content-Type", "application/binary") // TODO: we should use http.DetectContentType(..) on the gob'ed diff --git a/xfer/publisher_test.go b/xfer/publisher_test.go index 0bef0861e..787eee793 100644 --- a/xfer/publisher_test.go +++ b/xfer/publisher_test.go @@ -4,6 +4,7 @@ import ( "bytes" "compress/gzip" "encoding/gob" + "encoding/json" "net/http" "net/http/httptest" "reflect" @@ -27,6 +28,11 @@ func TestHTTPPublisher(t *testing.T) { ) handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/api" { + json.NewEncoder(w).Encode(map[string]string{"id": "irrelevant"}) + return + } + if want, have := xfer.AuthorizationHeader(token), r.Header.Get("Authorization"); want != have { t.Errorf("want %q, have %q", want, have) } @@ -61,7 +67,7 @@ func TestHTTPPublisher(t *testing.T) { s := httptest.NewServer(handlers.CompressHandler(handler)) defer s.Close() - p, err := xfer.NewHTTPPublisher(s.URL, token, id) + _, p, err := xfer.NewHTTPPublisher(s.URL, token, id) if err != nil { t.Fatal(err) } From c818f08c0601abecabcc6e6975328a19f8b0e9b0 Mon Sep 17 00:00:00 2001 From: Peter Bourgon Date: Thu, 17 Sep 2015 19:21:17 +0200 Subject: [PATCH 3/7] Refactor MultiPublisher - Set instead of Add, to allow replacement of endpoints - Break out individual Publishers to their own files and tests --- probe/main.go | 12 +- probe/overlay/weave.go | 3 +- probe/resolver.go | 10 +- probe/resolver_test.go | 12 +- xfer/background_publisher.go | 70 ++++++ xfer/background_publisher_test.go | 7 + xfer/http_publisher.go | 80 +++++++ ...blisher_test.go => http_publisher_test.go} | 31 --- xfer/multi_publisher.go | 107 +++++++++ xfer/multi_publisher_test.go | 52 +++++ xfer/publisher.go | 210 +----------------- 11 files changed, 338 insertions(+), 256 deletions(-) create mode 100644 xfer/background_publisher.go create mode 100644 xfer/background_publisher_test.go create mode 100644 xfer/http_publisher.go rename xfer/{publisher_test.go => http_publisher_test.go} (67%) create mode 100644 xfer/multi_publisher.go create mode 100644 xfer/multi_publisher_test.go diff --git a/probe/main.go b/probe/main.go index 43b6dcea6..a2d91dcbe 100644 --- a/probe/main.go +++ b/probe/main.go @@ -80,16 +80,16 @@ func main() { log.Printf("warning: process reporting enabled, but that requires root to find everything") } - publisherFactory := func(target string) (xfer.Publisher, error) { - _, publisher, err := xfer.NewHTTPPublisher(target, *token, probeID) + factory := func(endpoint string) (string, xfer.Publisher, error) { + id, publisher, err := xfer.NewHTTPPublisher(endpoint, *token, probeID) if err != nil { - return nil, err + return "", nil, err } - return xfer.NewBackgroundPublisher(publisher), nil + return id, xfer.NewBackgroundPublisher(publisher), nil } - publishers := xfer.NewMultiPublisher(publisherFactory) + publishers := xfer.NewMultiPublisher(factory) defer publishers.Stop() - resolver := newStaticResolver(targets, publishers.Add) + resolver := newStaticResolver(targets, publishers.Set) defer resolver.Stop() addrs, err := net.InterfaceAddrs() diff --git a/probe/overlay/weave.go b/probe/overlay/weave.go index 1a278180f..ce1330523 100644 --- a/probe/overlay/weave.go +++ b/probe/overlay/weave.go @@ -10,9 +10,8 @@ import ( "strings" "sync" - "github.com/weaveworks/scope/common/sanitize" - "github.com/weaveworks/scope/common/exec" + "github.com/weaveworks/scope/common/sanitize" "github.com/weaveworks/scope/probe/docker" "github.com/weaveworks/scope/report" ) diff --git a/probe/resolver.go b/probe/resolver.go index 8b8c7e97e..6feae5d41 100644 --- a/probe/resolver.go +++ b/probe/resolver.go @@ -17,7 +17,7 @@ var ( type staticResolver struct { quit chan struct{} - add func(string) + set func(string, []string) peers []peer } @@ -31,10 +31,10 @@ type peer struct { // resolved IPs. It explictiy supports hostnames which // resolve to multiple IPs; it will repeatedly call // add with the same IP, expecting the target to dedupe. -func newStaticResolver(peers []string, add func(string)) staticResolver { +func newStaticResolver(peers []string, set func(target string, endpoints []string)) staticResolver { r := staticResolver{ quit: make(chan struct{}), - add: add, + set: set, peers: prepareNames(peers), } go r.loop() @@ -92,13 +92,15 @@ func (r staticResolver) resolveHosts() { } } + endpoints := make([]string, 0, len(addrs)) for _, addr := range addrs { // For now, ignore IPv6 if addr.To4() == nil { continue } - r.add(net.JoinHostPort(addr.String(), peer.port)) + endpoints = append(endpoints, net.JoinHostPort(addr.String(), peer.port)) } + r.set(peer.hostname, endpoints) } } diff --git a/probe/resolver_test.go b/probe/resolver_test.go index d1deada58..63826380c 100644 --- a/probe/resolver_test.go +++ b/probe/resolver_test.go @@ -39,15 +39,19 @@ func TestResolver(t *testing.T) { port := ":80" ip1 := "192.168.0.1" ip2 := "192.168.0.10" - adds := make(chan string) - add := func(s string) { adds <- s } + sets := make(chan string) + set := func(target string, endpoints []string) { + for _, endpoint := range endpoints { + sets <- endpoint + } + } - r := newStaticResolver([]string{"symbolic.name" + port, "namewithnoport", ip1 + port, ip2}, add) + r := newStaticResolver([]string{"symbolic.name" + port, "namewithnoport", ip1 + port, ip2}, set) assertAdd := func(want string) { _, _, line, _ := runtime.Caller(1) select { - case have := <-adds: + case have := <-sets: if want != have { t.Errorf("line %d: want %q, have %q", line, want, have) } diff --git a/xfer/background_publisher.go b/xfer/background_publisher.go new file mode 100644 index 000000000..ac511830e --- /dev/null +++ b/xfer/background_publisher.go @@ -0,0 +1,70 @@ +package xfer + +import ( + "bytes" + "log" + "time" +) + +const ( + initialBackoff = 1 * time.Second + maxBackoff = 60 * time.Second +) + +// BackgroundPublisher is a publisher which does the publish asynchronously. +// It will only do one publish at once; if there is an ongoing publish, +// concurrent publishes are dropped. +type BackgroundPublisher struct { + publisher Publisher + reports chan *bytes.Buffer + quit chan struct{} +} + +// NewBackgroundPublisher creates a new BackgroundPublisher with the given publisher +func NewBackgroundPublisher(p Publisher) *BackgroundPublisher { + result := &BackgroundPublisher{ + publisher: p, + reports: make(chan *bytes.Buffer), + quit: make(chan struct{}), + } + go result.loop() + return result +} + +func (b *BackgroundPublisher) loop() { + backoff := initialBackoff + + for r := range b.reports { + err := b.publisher.Publish(r) + if err == nil { + backoff = initialBackoff + continue + } + + log.Printf("Error publishing to %s, backing off %s: %v", b.publisher, backoff, err) + select { + case <-time.After(backoff): + case <-b.quit: + } + backoff = backoff * 2 + if backoff > maxBackoff { + backoff = maxBackoff + } + } +} + +// Publish implements Publisher +func (b *BackgroundPublisher) Publish(buf *bytes.Buffer) error { + select { + case b.reports <- buf: + default: + } + return nil +} + +// Stop implements Publisher +func (b *BackgroundPublisher) Stop() { + close(b.reports) + close(b.quit) + b.publisher.Stop() +} diff --git a/xfer/background_publisher_test.go b/xfer/background_publisher_test.go new file mode 100644 index 000000000..60a1a6283 --- /dev/null +++ b/xfer/background_publisher_test.go @@ -0,0 +1,7 @@ +package xfer_test + +import "testing" + +func TestBackgroundPublisher(t *testing.T) { + t.Skip("TODO") +} diff --git a/xfer/http_publisher.go b/xfer/http_publisher.go new file mode 100644 index 000000000..42eb3c70b --- /dev/null +++ b/xfer/http_publisher.go @@ -0,0 +1,80 @@ +package xfer + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + + "github.com/weaveworks/scope/common/sanitize" +) + +// HTTPPublisher publishes reports by POST to a fixed endpoint. +type HTTPPublisher struct { + url string + token string + probeID string +} + +// NewHTTPPublisher returns an HTTPPublisher ready for use. +func NewHTTPPublisher(target, token, probeID string) (string, *HTTPPublisher, error) { + targetAPI := sanitize.URL("http://", 0, "/api")(target) + resp, err := http.Get(targetAPI) + if err != nil { + return "", nil, err + } + defer resp.Body.Close() + var apiResponse struct { + ID string `json:"id"` + } + if err := json.NewDecoder(resp.Body).Decode(&apiResponse); err != nil { + return "", nil, err + } + return apiResponse.ID, &HTTPPublisher{ + url: sanitize.URL("http://", 0, "/api/report")(target), + token: token, + probeID: probeID, + }, nil +} + +func (p HTTPPublisher) String() string { + return p.url +} + +// Publish publishes the report to the URL. +func (p HTTPPublisher) Publish(buf *bytes.Buffer) error { + req, err := http.NewRequest("POST", p.url, buf) + if err != nil { + return err + } + req.Header.Set("Authorization", AuthorizationHeader(p.token)) + req.Header.Set(ScopeProbeIDHeader, p.probeID) + req.Header.Set("Content-Encoding", "gzip") + // req.Header.Set("Content-Type", "application/binary") // TODO: we should use http.DetectContentType(..) on the gob'ed + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf(resp.Status) + } + return nil +} + +// Stop implements Publisher +func (p HTTPPublisher) Stop() {} + +// AuthorizationHeader returns a value suitable for an HTTP Authorization +// header, based on the passed token string. +func AuthorizationHeader(token string) string { + return fmt.Sprintf("Scope-Probe token=%s", token) +} + +// ScopeProbeIDHeader is the header we use to carry the probe's unique ID. The +// ID is currently set to the probe's hostname. It's designed to deduplicate +// reports from the same probe to the same receiver, in case the probe is +// configured to publish to multiple receivers that resolve to the same app. +const ScopeProbeIDHeader = "X-Scope-Probe-ID" diff --git a/xfer/publisher_test.go b/xfer/http_publisher_test.go similarity index 67% rename from xfer/publisher_test.go rename to xfer/http_publisher_test.go index 787eee793..e0147a738 100644 --- a/xfer/publisher_test.go +++ b/xfer/http_publisher_test.go @@ -1,7 +1,6 @@ package xfer_test import ( - "bytes" "compress/gzip" "encoding/gob" "encoding/json" @@ -13,7 +12,6 @@ import ( "time" "github.com/gorilla/handlers" - "github.com/weaveworks/scope/report" "github.com/weaveworks/scope/test" "github.com/weaveworks/scope/xfer" @@ -82,32 +80,3 @@ func TestHTTPPublisher(t *testing.T) { t.Error("timeout") } } - -func TestMultiPublisher(t *testing.T) { - var ( - p = &mockPublisher{} - factory = func(string) (xfer.Publisher, error) { return p, nil } - multiPublisher = xfer.NewMultiPublisher(factory) - ) - - multiPublisher.Add("first") - if err := multiPublisher.Publish(&bytes.Buffer{}); err != nil { - t.Error(err) - } - if want, have := 1, p.count; want != have { - t.Errorf("want %d, have %d", want, have) - } - - multiPublisher.Add("second") // but factory returns same mockPublisher - if err := multiPublisher.Publish(&bytes.Buffer{}); err != nil { - t.Error(err) - } - if want, have := 3, p.count; want != have { - t.Errorf("want %d, have %d", want, have) - } -} - -type mockPublisher struct{ count int } - -func (p *mockPublisher) Publish(*bytes.Buffer) error { p.count++; return nil } -func (p *mockPublisher) Stop() {} diff --git a/xfer/multi_publisher.go b/xfer/multi_publisher.go new file mode 100644 index 000000000..eadf1ff90 --- /dev/null +++ b/xfer/multi_publisher.go @@ -0,0 +1,107 @@ +package xfer + +import ( + "bytes" + "errors" + "log" + "strings" + "sync" +) + +// MultiPublisher implements publisher over a collection of heterogeneous +// targets. See documentation of each method to understand the semantics. +type MultiPublisher struct { + mtx sync.Mutex + factory func(endpoint string) (string, Publisher, error) + list []tuple +} + +// NewMultiPublisher returns a new MultiPublisher ready for use. +func NewMultiPublisher(factory func(endpoint string) (string, Publisher, error)) *MultiPublisher { + return &MultiPublisher{ + factory: factory, + } +} + +type tuple struct { + publisher Publisher + target string // DNS name + endpoint string // IP addr + id string // unique ID from app +} + +// Set declares that the target (DNS name) resolves to the provided endpoints +// (IPs), and that we want to publish to each of those endpoints. Set replaces +// any existing publishers to the given target. Set invokes the factory method +// to convert each endpoint to a publisher, and to get the remote receiver's +// unique ID. +func (p *MultiPublisher) Set(target string, endpoints []string) { + // Convert endpoints to publishers. + list := make([]tuple, 0, len(p.list)+len(endpoints)) + for _, endpoint := range endpoints { + id, publisher, err := p.factory(endpoint) + if err != nil { + log.Printf("multi-publisher set: %s (%s): %v", target, endpoint, err) + continue + } + list = append(list, tuple{publisher, target, endpoint, id}) + } + + // Copy all other tuples over to the new list. + p.mtx.Lock() + defer p.mtx.Unlock() + p.list = p.appendFilter(list, func(t tuple) bool { return t.target != target }) +} + +// Delete removes all endpoints that match the given target. +func (p *MultiPublisher) Delete(target string) { + p.mtx.Lock() + defer p.mtx.Unlock() + p.list = p.appendFilter([]tuple{}, func(t tuple) bool { return t.target != target }) +} + +// Publish implements Publisher by publishing the buffer to all of the +// underlying publishers sequentially. But, it will publish to one endpoint +// for each unique ID. Failed publishes don't count. +func (p *MultiPublisher) Publish(buf *bytes.Buffer) error { + var ( + ids = map[string]struct{}{} + errs = []string{} + ) + p.mtx.Lock() + defer p.mtx.Unlock() + for _, t := range p.list { + if _, ok := ids[t.id]; ok { + continue + } + if err := t.publisher.Publish(buf); err != nil { + errs = append(errs, err.Error()) + continue + } + ids[t.id] = struct{}{} // sent already + } + if len(errs) > 0 { + return errors.New(strings.Join(errs, "; ")) + } + return nil +} + +// Stop invokes stop on all underlying publishers and removes them. +func (p *MultiPublisher) Stop() { + p.mtx.Lock() + defer p.mtx.Unlock() + for _, t := range p.list { + t.publisher.Stop() + } + p.list = []tuple{} +} + +func (p *MultiPublisher) appendFilter(list []tuple, f func(tuple) bool) []tuple { + for _, t := range p.list { + if !f(t) { + continue + } + list = append(list, t) + } + return list +} diff --git a/xfer/multi_publisher_test.go b/xfer/multi_publisher_test.go new file mode 100644 index 000000000..0ec919bde --- /dev/null +++ b/xfer/multi_publisher_test.go @@ -0,0 +1,52 @@ +package xfer_test + +import ( + "bytes" + "fmt" + "testing" + + "github.com/weaveworks/scope/xfer" +) + +func TestMultiPublisher(t *testing.T) { + var ( + a1 = &mockPublisher{} // target a, endpoint 1 + a2 = &mockPublisher{} // target a, endpoint 2 (duplicate) + b2 = &mockPublisher{} // target b, endpoint 2 (duplicate) + b3 = &mockPublisher{} // target b, endpoint 3 + ) + + sum := func() int { return a1.count + a2.count + b2.count + b3.count } + + mp := xfer.NewMultiPublisher(func(endpoint string) (string, xfer.Publisher, error) { + switch endpoint { + case "a1": + return "1", a1, nil + case "a2": + return "2", a2, nil + case "b2": + return "2", b2, nil + case "b3": + return "3", b3, nil + default: + return "", nil, fmt.Errorf("invalid endpoint %s", endpoint) + } + }) + + mp.Set("a", []string{"a1", "a2"}) + mp.Set("b", []string{"b2", "b3"}) + + for i := 1; i < 10; i++ { + if err := mp.Publish(&bytes.Buffer{}); err != nil { + t.Error(err) + } + if want, have := 3*i, sum(); want != have { + t.Errorf("want %d, have %d", want, have) + } + } +} + +type mockPublisher struct{ count int } + +func (p *mockPublisher) Publish(*bytes.Buffer) error { p.count++; return nil } +func (p *mockPublisher) Stop() {} diff --git a/xfer/publisher.go b/xfer/publisher.go index 0a0df7504..10a7cd960 100644 --- a/xfer/publisher.go +++ b/xfer/publisher.go @@ -1,22 +1,6 @@ package xfer -import ( - "bytes" - "encoding/json" - "fmt" - "log" - "net/http" - "strings" - "sync" - "time" - - "github.com/weaveworks/scope/common/sanitize" -) - -const ( - initialBackoff = 1 * time.Second - maxBackoff = 60 * time.Second -) +import "bytes" // Publisher is something which can send a buffered set of data somewhere, // probably to a remote collector. @@ -24,195 +8,3 @@ type Publisher interface { Publish(*bytes.Buffer) error Stop() } - -// HTTPPublisher publishes reports by POST to a fixed endpoint. -type HTTPPublisher struct { - url string - token string - probeID string -} - -// ScopeProbeIDHeader is the header we use to carry the probe's unique ID. The -// ID is currently set to the probe's hostname. It's designed to deduplicate -// reports from the same probe to the same receiver, in case the probe is -// configured to publish to multiple receivers that resolve to the same app. -const ScopeProbeIDHeader = "X-Scope-Probe-ID" - -// NewHTTPPublisher returns an HTTPPublisher ready for use. -func NewHTTPPublisher(target, token, probeID string) (string, *HTTPPublisher, error) { - targetAPI := sanitize.URL("http://", 0, "/api")(target) - resp, err := http.Get(targetAPI) - if err != nil { - return "", nil, err - } - defer resp.Body.Close() - var apiResponse struct { - ID string `json:"id"` - } - if err := json.NewDecoder(resp.Body).Decode(&apiResponse); err != nil { - return "", nil, err - } - return apiResponse.ID, &HTTPPublisher{ - url: sanitize.URL("http://", 0, "/api/report")(target), - token: token, - probeID: probeID, - }, nil -} - -func (p HTTPPublisher) String() string { - return p.url -} - -// Publish publishes the report to the URL. -func (p HTTPPublisher) Publish(buf *bytes.Buffer) error { - req, err := http.NewRequest("POST", p.url, buf) - if err != nil { - return err - } - req.Header.Set("Authorization", AuthorizationHeader(p.token)) - req.Header.Set(ScopeProbeIDHeader, p.probeID) - req.Header.Set("Content-Encoding", "gzip") - // req.Header.Set("Content-Type", "application/binary") // TODO: we should use http.DetectContentType(..) on the gob'ed - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return fmt.Errorf(resp.Status) - } - return nil -} - -// Stop implements Publisher -func (p HTTPPublisher) Stop() {} - -// AuthorizationHeader returns a value suitable for an HTTP Authorization -// header, based on the passed token string. -func AuthorizationHeader(token string) string { - return fmt.Sprintf("Scope-Probe token=%s", token) -} - -// BackgroundPublisher is a publisher which does the publish asynchronously. -// It will only do one publish at once; if there is an ongoing publish, -// concurrent publishes are dropped. -type BackgroundPublisher struct { - publisher Publisher - reports chan *bytes.Buffer - quit chan struct{} -} - -// NewBackgroundPublisher creates a new BackgroundPublisher with the given publisher -func NewBackgroundPublisher(p Publisher) *BackgroundPublisher { - result := &BackgroundPublisher{ - publisher: p, - reports: make(chan *bytes.Buffer), - quit: make(chan struct{}), - } - go result.loop() - return result -} - -func (b *BackgroundPublisher) loop() { - backoff := initialBackoff - - for r := range b.reports { - err := b.publisher.Publish(r) - if err == nil { - backoff = initialBackoff - continue - } - - log.Printf("Error publishing to %s, backing off %s: %v", b.publisher, backoff, err) - select { - case <-time.After(backoff): - case <-b.quit: - } - backoff = backoff * 2 - if backoff > maxBackoff { - backoff = maxBackoff - } - } -} - -// Publish implements Publisher -func (b *BackgroundPublisher) Publish(buf *bytes.Buffer) error { - select { - case b.reports <- buf: - default: - } - return nil -} - -// Stop implements Publisher -func (b *BackgroundPublisher) Stop() { - close(b.reports) - close(b.quit) - b.publisher.Stop() -} - -// MultiPublisher implements Publisher over a set of publishers. -type MultiPublisher struct { - mtx sync.RWMutex - factory func(string) (Publisher, error) - m map[string]Publisher -} - -// NewMultiPublisher returns a new MultiPublisher ready for use. The factory -// should be e.g. NewHTTPPublisher, except you need to curry it over the -// probe token. -func NewMultiPublisher(factory func(string) (Publisher, error)) *MultiPublisher { - return &MultiPublisher{ - factory: factory, - m: map[string]Publisher{}, - } -} - -// Add allows additional targets to be added dynamically. It will dedupe -// identical targets. TODO we have no good mechanism to remove. -func (p *MultiPublisher) Add(target string) { - p.mtx.Lock() - defer p.mtx.Unlock() - - if _, ok := p.m[target]; ok { - return - } - - publisher, err := p.factory(target) - if err != nil { - log.Printf("multi-publisher: %v", err) - return - } - - p.m[target] = publisher -} - -// Publish implements Publisher by emitting the report to all publishers. -func (p *MultiPublisher) Publish(buf *bytes.Buffer) error { - p.mtx.RLock() - defer p.mtx.RUnlock() - - var errs []string - for _, publisher := range p.m { - if err := publisher.Publish(bytes.NewBuffer(buf.Bytes())); err != nil { - errs = append(errs, err.Error()) - } - } - - if len(errs) > 0 { - return fmt.Errorf(strings.Join(errs, "; ")) - } - return nil -} - -// Stop implements Publisher -func (p *MultiPublisher) Stop() { - p.mtx.RLock() - defer p.mtx.RUnlock() - - for _, publisher := range p.m { - publisher.Stop() - } -} From eca45ca9d5513ff63cee68e0c68dbd748815f1cf Mon Sep 17 00:00:00 2001 From: Peter Bourgon Date: Fri, 18 Sep 2015 10:40:43 +0200 Subject: [PATCH 4/7] Need to copy the buffer in the MultiPublisher --- xfer/background_publisher.go | 34 +++++++++++++++++----------------- xfer/http_publisher.go | 2 +- xfer/multi_publisher.go | 2 +- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/xfer/background_publisher.go b/xfer/background_publisher.go index ac511830e..726659dec 100644 --- a/xfer/background_publisher.go +++ b/xfer/background_publisher.go @@ -16,37 +16,37 @@ const ( // concurrent publishes are dropped. type BackgroundPublisher struct { publisher Publisher - reports chan *bytes.Buffer + buffers chan *bytes.Buffer quit chan struct{} } // NewBackgroundPublisher creates a new BackgroundPublisher with the given publisher func NewBackgroundPublisher(p Publisher) *BackgroundPublisher { - result := &BackgroundPublisher{ + bp := &BackgroundPublisher{ publisher: p, - reports: make(chan *bytes.Buffer), + buffers: make(chan *bytes.Buffer), quit: make(chan struct{}), } - go result.loop() - return result + go bp.loop() + return bp } -func (b *BackgroundPublisher) loop() { +func (bp *BackgroundPublisher) loop() { backoff := initialBackoff - for r := range b.reports { - err := b.publisher.Publish(r) + for buf := range bp.buffers { + err := bp.publisher.Publish(buf) if err == nil { backoff = initialBackoff continue } - log.Printf("Error publishing to %s, backing off %s: %v", b.publisher, backoff, err) + log.Printf("Error publishing to %s, backing off %s: %v", bp.publisher, backoff, err) select { case <-time.After(backoff): - case <-b.quit: + case <-bp.quit: } - backoff = backoff * 2 + backoff *= 2 if backoff > maxBackoff { backoff = maxBackoff } @@ -54,17 +54,17 @@ func (b *BackgroundPublisher) loop() { } // Publish implements Publisher -func (b *BackgroundPublisher) Publish(buf *bytes.Buffer) error { +func (bp *BackgroundPublisher) Publish(buf *bytes.Buffer) error { select { - case b.reports <- buf: + case bp.buffers <- buf: default: } return nil } // Stop implements Publisher -func (b *BackgroundPublisher) Stop() { - close(b.reports) - close(b.quit) - b.publisher.Stop() +func (bp *BackgroundPublisher) Stop() { + close(bp.buffers) + close(bp.quit) + bp.publisher.Stop() } diff --git a/xfer/http_publisher.go b/xfer/http_publisher.go index 42eb3c70b..87baa922a 100644 --- a/xfer/http_publisher.go +++ b/xfer/http_publisher.go @@ -9,7 +9,7 @@ import ( "github.com/weaveworks/scope/common/sanitize" ) -// HTTPPublisher publishes reports by POST to a fixed endpoint. +// HTTPPublisher publishes buffers by POST to a fixed endpoint. type HTTPPublisher struct { url string token string diff --git a/xfer/multi_publisher.go b/xfer/multi_publisher.go index eadf1ff90..cda0bca32 100644 --- a/xfer/multi_publisher.go +++ b/xfer/multi_publisher.go @@ -74,7 +74,7 @@ func (p *MultiPublisher) Publish(buf *bytes.Buffer) error { if _, ok := ids[t.id]; ok { continue } - if err := t.publisher.Publish(buf); err != nil { + if err := t.publisher.Publish(bytes.NewBuffer(buf.Bytes())); err != nil { errs = append(errs, err.Error()) continue } From 790da39f04b1757c87ffe011372003c0e9a74970 Mon Sep 17 00:00:00 2001 From: Peter Bourgon Date: Fri, 18 Sep 2015 10:53:29 +0200 Subject: [PATCH 5/7] Publish an io.Reader, not a bytes.Buffer --- xfer/background_publisher.go | 16 ++++++++-------- xfer/http_publisher.go | 6 +++--- xfer/http_publisher_test.go | 2 +- xfer/multi_publisher.go | 20 +++++++++++++++----- xfer/multi_publisher_test.go | 5 +++-- xfer/publisher.go | 8 ++++---- 6 files changed, 34 insertions(+), 23 deletions(-) diff --git a/xfer/background_publisher.go b/xfer/background_publisher.go index 726659dec..cd74d23d3 100644 --- a/xfer/background_publisher.go +++ b/xfer/background_publisher.go @@ -1,7 +1,7 @@ package xfer import ( - "bytes" + "io" "log" "time" ) @@ -16,7 +16,7 @@ const ( // concurrent publishes are dropped. type BackgroundPublisher struct { publisher Publisher - buffers chan *bytes.Buffer + readers chan io.Reader quit chan struct{} } @@ -24,7 +24,7 @@ type BackgroundPublisher struct { func NewBackgroundPublisher(p Publisher) *BackgroundPublisher { bp := &BackgroundPublisher{ publisher: p, - buffers: make(chan *bytes.Buffer), + readers: make(chan io.Reader), quit: make(chan struct{}), } go bp.loop() @@ -34,8 +34,8 @@ func NewBackgroundPublisher(p Publisher) *BackgroundPublisher { func (bp *BackgroundPublisher) loop() { backoff := initialBackoff - for buf := range bp.buffers { - err := bp.publisher.Publish(buf) + for r := range bp.readers { + err := bp.publisher.Publish(r) if err == nil { backoff = initialBackoff continue @@ -54,9 +54,9 @@ func (bp *BackgroundPublisher) loop() { } // Publish implements Publisher -func (bp *BackgroundPublisher) Publish(buf *bytes.Buffer) error { +func (bp *BackgroundPublisher) Publish(r io.Reader) error { select { - case bp.buffers <- buf: + case bp.readers <- r: default: } return nil @@ -64,7 +64,7 @@ func (bp *BackgroundPublisher) Publish(buf *bytes.Buffer) error { // Stop implements Publisher func (bp *BackgroundPublisher) Stop() { - close(bp.buffers) + close(bp.readers) close(bp.quit) bp.publisher.Stop() } diff --git a/xfer/http_publisher.go b/xfer/http_publisher.go index 87baa922a..bb9aa3f57 100644 --- a/xfer/http_publisher.go +++ b/xfer/http_publisher.go @@ -1,9 +1,9 @@ package xfer import ( - "bytes" "encoding/json" "fmt" + "io" "net/http" "github.com/weaveworks/scope/common/sanitize" @@ -42,8 +42,8 @@ func (p HTTPPublisher) String() string { } // Publish publishes the report to the URL. -func (p HTTPPublisher) Publish(buf *bytes.Buffer) error { - req, err := http.NewRequest("POST", p.url, buf) +func (p HTTPPublisher) Publish(r io.Reader) error { + req, err := http.NewRequest("POST", p.url, r) if err != nil { return err } diff --git a/xfer/http_publisher_test.go b/xfer/http_publisher_test.go index e0147a738..d023f4469 100644 --- a/xfer/http_publisher_test.go +++ b/xfer/http_publisher_test.go @@ -27,7 +27,7 @@ func TestHTTPPublisher(t *testing.T) { handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.URL.Path == "/api" { - json.NewEncoder(w).Encode(map[string]string{"id": "irrelevant"}) + _ = json.NewEncoder(w).Encode(map[string]string{"id": "irrelevant"}) return } diff --git a/xfer/multi_publisher.go b/xfer/multi_publisher.go index cda0bca32..88d1455aa 100644 --- a/xfer/multi_publisher.go +++ b/xfer/multi_publisher.go @@ -3,6 +3,8 @@ package xfer import ( "bytes" "errors" + "io" + "io/ioutil" "log" "strings" "sync" @@ -60,21 +62,29 @@ func (p *MultiPublisher) Delete(target string) { p.list = p.appendFilter([]tuple{}, func(t tuple) bool { return t.target != target }) } -// Publish implements Publisher by publishing the buffer to all of the -// underlying publishers sequentially. But, it will publish to one endpoint -// for each unique ID. Failed publishes don't count. -func (p *MultiPublisher) Publish(buf *bytes.Buffer) error { +// Publish implements Publisher by publishing the reader to all of the +// underlying publishers sequentially. To do that, it needs to drain the +// reader, and recreate new readers for each publisher. Note that it will +// publish to one endpoint for each unique ID. Failed publishes don't count. +func (p *MultiPublisher) Publish(r io.Reader) error { + buf, err := ioutil.ReadAll(r) + if err != nil { + return err + } + var ( ids = map[string]struct{}{} errs = []string{} ) + p.mtx.Lock() defer p.mtx.Unlock() + for _, t := range p.list { if _, ok := ids[t.id]; ok { continue } - if err := t.publisher.Publish(bytes.NewBuffer(buf.Bytes())); err != nil { + if err := t.publisher.Publish(bytes.NewReader(buf)); err != nil { errs = append(errs, err.Error()) continue } diff --git a/xfer/multi_publisher_test.go b/xfer/multi_publisher_test.go index 0ec919bde..2664340b7 100644 --- a/xfer/multi_publisher_test.go +++ b/xfer/multi_publisher_test.go @@ -3,6 +3,7 @@ package xfer_test import ( "bytes" "fmt" + "io" "testing" "github.com/weaveworks/scope/xfer" @@ -48,5 +49,5 @@ func TestMultiPublisher(t *testing.T) { type mockPublisher struct{ count int } -func (p *mockPublisher) Publish(*bytes.Buffer) error { p.count++; return nil } -func (p *mockPublisher) Stop() {} +func (p *mockPublisher) Publish(io.Reader) error { p.count++; return nil } +func (p *mockPublisher) Stop() {} diff --git a/xfer/publisher.go b/xfer/publisher.go index 10a7cd960..7ac56a93e 100644 --- a/xfer/publisher.go +++ b/xfer/publisher.go @@ -1,10 +1,10 @@ package xfer -import "bytes" +import "io" -// Publisher is something which can send a buffered set of data somewhere, -// probably to a remote collector. +// Publisher is something which can send a stream of data somewhere, probably +// to a remote collector. type Publisher interface { - Publish(*bytes.Buffer) error + Publish(io.Reader) error Stop() } From eccc74aafec4e5711bdcec4773aab02365a37e19 Mon Sep 17 00:00:00 2001 From: Peter Bourgon Date: Tue, 22 Sep 2015 16:02:29 +0200 Subject: [PATCH 6/7] Make the resolver concurrent --- probe/resolver.go | 156 +++++++++++++++++++++++------------------ probe/resolver_test.go | 76 ++++++++++++++------ 2 files changed, 144 insertions(+), 88 deletions(-) diff --git a/probe/resolver.go b/probe/resolver.go index 6feae5d41..1d99e41c0 100644 --- a/probe/resolver.go +++ b/probe/resolver.go @@ -15,95 +15,115 @@ var ( lookupIP = net.LookupIP ) +const maxConcurrentLookup = 10 + type staticResolver struct { - quit chan struct{} - set func(string, []string) - peers []peer + set func(string, []string) + targets []target + sema semaphore + quit chan struct{} } -type peer struct { - hostname string - port string -} +type target struct{ host, port string } -// NewResolver starts a new resolver that periodically -// tries to resolve peers and the calls add() with all the -// resolved IPs. It explictiy supports hostnames which -// resolve to multiple IPs; it will repeatedly call -// add with the same IP, expecting the target to dedupe. -func newStaticResolver(peers []string, set func(target string, endpoints []string)) staticResolver { +func (t target) String() string { return net.JoinHostPort(t.host, t.port) } + +// newStaticResolver periodically resolves the targets, and calls the set +// function with all the resolved IPs. It explictiy supports targets which +// resolve to multiple IPs. +func newStaticResolver(targets []string, set func(target string, endpoints []string)) staticResolver { r := staticResolver{ - quit: make(chan struct{}), - set: set, - peers: prepareNames(peers), + targets: prepare(targets), + set: set, + sema: newSemaphore(maxConcurrentLookup), + quit: make(chan struct{}), } go r.loop() return r } -func prepareNames(strs []string) []peer { - var results []peer - for _, s := range strs { - var ( - hostname string - port string - ) - - if strings.Contains(s, ":") { - var err error - hostname, port, err = net.SplitHostPort(s) - if err != nil { - log.Printf("invalid address %s: %v", s, err) - continue - } - } else { - hostname, port = s, strconv.Itoa(xfer.AppPort) - } - - results = append(results, peer{hostname, port}) - } - return results -} - func (r staticResolver) loop() { - r.resolveHosts() + r.resolve() t := tick(time.Minute) for { select { case <-t: - r.resolveHosts() - + r.resolve() case <-r.quit: return } } } -func (r staticResolver) resolveHosts() { - for _, peer := range r.peers { - var addrs []net.IP - if addr := net.ParseIP(peer.hostname); addr != nil { - addrs = []net.IP{addr} - } else { - var err error - addrs, err = lookupIP(peer.hostname) - if err != nil { - continue - } - } - - endpoints := make([]string, 0, len(addrs)) - for _, addr := range addrs { - // For now, ignore IPv6 - if addr.To4() == nil { - continue - } - endpoints = append(endpoints, net.JoinHostPort(addr.String(), peer.port)) - } - r.set(peer.hostname, endpoints) - } -} - func (r staticResolver) Stop() { close(r.quit) } + +func prepare(strs []string) []target { + var targets []target + for _, s := range strs { + var host, port string + if strings.Contains(s, ":") { + var err error + host, port, err = net.SplitHostPort(s) + if err != nil { + log.Printf("invalid address %s: %v", s, err) + continue + } + } else { + host, port = s, strconv.Itoa(xfer.AppPort) + } + targets = append(targets, target{host, port}) + } + return targets +} + +func (r staticResolver) resolve() { + for t, endpoints := range resolveMany(r.sema, r.targets) { + r.set(t.String(), endpoints) + } +} + +func resolveMany(s semaphore, targets []target) map[target][]string { + result := map[target][]string{} + for _, t := range targets { + c := make(chan []string) + go func(t target) { s.p(); defer s.v(); c <- resolveOne(t) }(t) + result[t] = <-c + } + return result +} + +func resolveOne(t target) []string { + var addrs []net.IP + if addr := net.ParseIP(t.host); addr != nil { + addrs = []net.IP{addr} + } else { + var err error + addrs, err = lookupIP(t.host) + if err != nil { + return []string{} + } + } + endpoints := make([]string, 0, len(addrs)) + for _, addr := range addrs { + // For now, ignore IPv6 + if addr.To4() == nil { + continue + } + endpoints = append(endpoints, net.JoinHostPort(addr.String(), t.port)) + } + return endpoints +} + +type semaphore chan struct{} + +func newSemaphore(n int) semaphore { + c := make(chan struct{}, n) + for i := 0; i < n; i++ { + c <- struct{}{} + } + return semaphore(c) +} +func (s semaphore) p() { <-s } +func (s semaphore) v() { s <- struct{}{} } diff --git a/probe/resolver_test.go b/probe/resolver_test.go index 63826380c..88e5807aa 100644 --- a/probe/resolver_test.go +++ b/probe/resolver_test.go @@ -48,42 +48,44 @@ func TestResolver(t *testing.T) { r := newStaticResolver([]string{"symbolic.name" + port, "namewithnoport", ip1 + port, ip2}, set) - assertAdd := func(want string) { + assertAdd := func(want ...string) { + remaining := map[string]struct{}{} + for _, s := range want { + remaining[s] = struct{}{} + } _, _, line, _ := runtime.Caller(1) - select { - case have := <-sets: - if want != have { - t.Errorf("line %d: want %q, have %q", line, want, have) + for len(remaining) > 0 { + select { + case s := <-sets: + if _, ok := remaining[s]; ok { + t.Logf("line %d: got %q OK", line, s) + delete(remaining, s) + } else { + t.Errorf("line %d: got unexpected %q", line, s) + } + case <-time.After(100 * time.Millisecond): + t.Fatalf("line %d: didn't get the adds in time", line) } - case <-time.After(100 * time.Millisecond): - t.Fatalf("line %d: didn't get add in time", line) } } // Initial resolve should just give us IPs - assertAdd(ip1 + port) - assertAdd(fmt.Sprintf("%s:%d", ip2, xfer.AppPort)) + assertAdd(ip1+port, fmt.Sprintf("%s:%d", ip2, xfer.AppPort)) // Trigger another resolve with a tick; again, // just want ips. c <- time.Now() - assertAdd(ip1 + port) - assertAdd(fmt.Sprintf("%s:%d", ip2, xfer.AppPort)) + assertAdd(ip1+port, fmt.Sprintf("%s:%d", ip2, xfer.AppPort)) ip3 := "1.2.3.4" updateIPs("symbolic.name", makeIPs(ip3)) - c <- time.Now() // trigger a resolve - assertAdd(ip3 + port) // we want 1 add - assertAdd(ip1 + port) - assertAdd(fmt.Sprintf("%s:%d", ip2, xfer.AppPort)) + c <- time.Now() // trigger a resolve + assertAdd(ip3+port, ip1+port, fmt.Sprintf("%s:%d", ip2, xfer.AppPort)) ip4 := "10.10.10.10" updateIPs("symbolic.name", makeIPs(ip3, ip4)) - c <- time.Now() // trigger another resolve, this time with 2 adds - assertAdd(ip3 + port) // first add - assertAdd(ip4 + port) // second add - assertAdd(ip1 + port) - assertAdd(fmt.Sprintf("%s:%d", ip2, xfer.AppPort)) + c <- time.Now() // trigger another resolve, this time with 2 adds + assertAdd(ip3+port, ip4+port, ip1+port, fmt.Sprintf("%s:%d", ip2, xfer.AppPort)) done := make(chan struct{}) go func() { r.Stop(); close(done) }() @@ -94,6 +96,40 @@ func TestResolver(t *testing.T) { } } +func TestSemaphore(t *testing.T) { + n := 3 + s := newSemaphore(n) + + // First n should be fine + for i := 0; i < n; i++ { + ok := make(chan struct{}) + go func() { s.p(); close(ok) }() + select { + case <-ok: + case <-time.After(10 * time.Millisecond): + t.Errorf("p (%d) failed", i+1) + } + } + + // This should block + ok := make(chan struct{}) + go func() { s.p(); close(ok) }() + select { + case <-ok: + t.Errorf("%dth p OK, but should block", n+1) + case <-time.After(10 * time.Millisecond): + //t.Logf("%dth p blocks, as expected", n+1) + } + + s.v() + + select { + case <-ok: + case <-time.After(10 * time.Millisecond): + t.Errorf("%dth p didn't resolve in time", n+1) + } +} + func makeIPs(addrs ...string) []net.IP { var ips []net.IP for _, addr := range addrs { From 8602132ab6283ae6bb7d947fc362b9c3dd17b725 Mon Sep 17 00:00:00 2001 From: Peter Bourgon Date: Thu, 24 Sep 2015 16:56:37 +0200 Subject: [PATCH 7/7] Move concurrency from resolve to HTTP GET - Process DNS resolution serially - Process up to 10 HTTP GET (for app ID) concurrently More than 10 concurrent GET requests will block on the semaphore. This will cause the staticResolver.resolve method to block, which is probably fine: it will just delay the next resolve loop, currently at 1m intervals. To make this a little bit more robust, I've also added a fastClient for app ID resolution, with a timeout (total, including connect, request, and response) of 5s. --- probe/resolver.go | 24 ++-------------- probe/resolver_test.go | 34 ----------------------- xfer/http_publisher.go | 7 ++++- xfer/multi_publisher.go | 36 ++++++++++++++++++++---- xfer/multi_publisher_internal_test.go | 40 +++++++++++++++++++++++++++ 5 files changed, 80 insertions(+), 61 deletions(-) create mode 100644 xfer/multi_publisher_internal_test.go diff --git a/probe/resolver.go b/probe/resolver.go index 1d99e41c0..58119497a 100644 --- a/probe/resolver.go +++ b/probe/resolver.go @@ -15,12 +15,9 @@ var ( lookupIP = net.LookupIP ) -const maxConcurrentLookup = 10 - type staticResolver struct { set func(string, []string) targets []target - sema semaphore quit chan struct{} } @@ -35,7 +32,6 @@ func newStaticResolver(targets []string, set func(target string, endpoints []str r := staticResolver{ targets: prepare(targets), set: set, - sema: newSemaphore(maxConcurrentLookup), quit: make(chan struct{}), } go r.loop() @@ -79,17 +75,15 @@ func prepare(strs []string) []target { } func (r staticResolver) resolve() { - for t, endpoints := range resolveMany(r.sema, r.targets) { + for t, endpoints := range resolveMany(r.targets) { r.set(t.String(), endpoints) } } -func resolveMany(s semaphore, targets []target) map[target][]string { +func resolveMany(targets []target) map[target][]string { result := map[target][]string{} for _, t := range targets { - c := make(chan []string) - go func(t target) { s.p(); defer s.v(); c <- resolveOne(t) }(t) - result[t] = <-c + result[t] = resolveOne(t) } return result } @@ -115,15 +109,3 @@ func resolveOne(t target) []string { } return endpoints } - -type semaphore chan struct{} - -func newSemaphore(n int) semaphore { - c := make(chan struct{}, n) - for i := 0; i < n; i++ { - c <- struct{}{} - } - return semaphore(c) -} -func (s semaphore) p() { <-s } -func (s semaphore) v() { s <- struct{}{} } diff --git a/probe/resolver_test.go b/probe/resolver_test.go index 88e5807aa..4463e8f22 100644 --- a/probe/resolver_test.go +++ b/probe/resolver_test.go @@ -96,40 +96,6 @@ func TestResolver(t *testing.T) { } } -func TestSemaphore(t *testing.T) { - n := 3 - s := newSemaphore(n) - - // First n should be fine - for i := 0; i < n; i++ { - ok := make(chan struct{}) - go func() { s.p(); close(ok) }() - select { - case <-ok: - case <-time.After(10 * time.Millisecond): - t.Errorf("p (%d) failed", i+1) - } - } - - // This should block - ok := make(chan struct{}) - go func() { s.p(); close(ok) }() - select { - case <-ok: - t.Errorf("%dth p OK, but should block", n+1) - case <-time.After(10 * time.Millisecond): - //t.Logf("%dth p blocks, as expected", n+1) - } - - s.v() - - select { - case <-ok: - case <-time.After(10 * time.Millisecond): - t.Errorf("%dth p didn't resolve in time", n+1) - } -} - func makeIPs(addrs ...string) []net.IP { var ips []net.IP for _, addr := range addrs { diff --git a/xfer/http_publisher.go b/xfer/http_publisher.go index bb9aa3f57..8ff6e2f13 100644 --- a/xfer/http_publisher.go +++ b/xfer/http_publisher.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "net/http" + "time" "github.com/weaveworks/scope/common/sanitize" ) @@ -16,10 +17,14 @@ type HTTPPublisher struct { probeID string } +var fastClient = http.Client{ + Timeout: 5 * time.Second, +} + // NewHTTPPublisher returns an HTTPPublisher ready for use. func NewHTTPPublisher(target, token, probeID string) (string, *HTTPPublisher, error) { targetAPI := sanitize.URL("http://", 0, "/api")(target) - resp, err := http.Get(targetAPI) + resp, err := fastClient.Get(targetAPI) if err != nil { return "", nil, err } diff --git a/xfer/multi_publisher.go b/xfer/multi_publisher.go index 88d1455aa..532525600 100644 --- a/xfer/multi_publisher.go +++ b/xfer/multi_publisher.go @@ -15,6 +15,7 @@ import ( type MultiPublisher struct { mtx sync.Mutex factory func(endpoint string) (string, Publisher, error) + sema semaphore list []tuple } @@ -22,6 +23,7 @@ type MultiPublisher struct { func NewMultiPublisher(factory func(endpoint string) (string, Publisher, error)) *MultiPublisher { return &MultiPublisher{ factory: factory, + sema: newSemaphore(maxConcurrentGET), } } @@ -30,8 +32,11 @@ type tuple struct { target string // DNS name endpoint string // IP addr id string // unique ID from app + err error // if factory failed } +const maxConcurrentGET = 10 + // Set declares that the target (DNS name) resolves to the provided endpoints // (IPs), and that we want to publish to each of those endpoints. Set replaces // any existing publishers to the given target. Set invokes the factory method @@ -39,14 +44,23 @@ type tuple struct { // unique ID. func (p *MultiPublisher) Set(target string, endpoints []string) { // Convert endpoints to publishers. - list := make([]tuple, 0, len(p.list)+len(endpoints)) + c := make(chan tuple, len(endpoints)) for _, endpoint := range endpoints { - id, publisher, err := p.factory(endpoint) - if err != nil { - log.Printf("multi-publisher set: %s (%s): %v", target, endpoint, err) + go func(endpoint string) { + p.sema.p() + defer p.sema.v() + id, publisher, err := p.factory(endpoint) + c <- tuple{publisher, target, endpoint, id, err} + }(endpoint) + } + list := make([]tuple, 0, len(p.list)+len(endpoints)) + for i := 0; i < cap(c); i++ { + t := <-c + if t.err != nil { + log.Printf("multi-publisher set: %s (%s): %v", t.target, t.endpoint, t.err) continue } - list = append(list, tuple{publisher, target, endpoint, id}) + list = append(list, t) } // Copy all other tuples over to the new list. @@ -115,3 +129,15 @@ func (p *MultiPublisher) appendFilter(list []tuple, f func(tuple) bool) []tuple } return list } + +type semaphore chan struct{} + +func newSemaphore(n int) semaphore { + c := make(chan struct{}, n) + for i := 0; i < n; i++ { + c <- struct{}{} + } + return semaphore(c) +} +func (s semaphore) p() { <-s } +func (s semaphore) v() { s <- struct{}{} } diff --git a/xfer/multi_publisher_internal_test.go b/xfer/multi_publisher_internal_test.go new file mode 100644 index 000000000..4400b1b2e --- /dev/null +++ b/xfer/multi_publisher_internal_test.go @@ -0,0 +1,40 @@ +package xfer + +import ( + "testing" + "time" +) + +func TestSemaphore(t *testing.T) { + n := 3 + s := newSemaphore(n) + + // First n should be fine + for i := 0; i < n; i++ { + ok := make(chan struct{}) + go func() { s.p(); close(ok) }() + select { + case <-ok: + case <-time.After(10 * time.Millisecond): + t.Errorf("p (%d) failed", i+1) + } + } + + // This should block + ok := make(chan struct{}) + go func() { s.p(); close(ok) }() + select { + case <-ok: + t.Errorf("%dth p OK, but should block", n+1) + case <-time.After(10 * time.Millisecond): + //t.Logf("%dth p blocks, as expected", n+1) + } + + s.v() + + select { + case <-ok: + case <-time.After(10 * time.Millisecond): + t.Errorf("%dth p didn't resolve in time", n+1) + } +}