diff --git a/app/main.go b/app/main.go index 7e8e3c3b3..0320fc2cf 100644 --- a/app/main.go +++ b/app/main.go @@ -16,8 +16,13 @@ import ( "github.com/weaveworks/scope/xfer" ) -// Set during buildtime. -var version = "dev" +var ( + // Set at buildtime. + version = "dev" + + // Set at runtime. + uniqueID = "0" +) func main() { var ( @@ -33,8 +38,8 @@ func main() { } rand.Seed(time.Now().UnixNano()) - id := strconv.FormatInt(rand.Int63(), 16) - log.Printf("app starting, version %s, ID %s", version, id) + uniqueID = strconv.FormatInt(rand.Int63(), 16) + log.Printf("app starting, version %s, ID %s", version, uniqueID) c := xfer.NewCollector(*window) http.Handle("/", Router(c)) diff --git a/app/router.go b/app/router.go index a1c866173..9e99c3ed3 100644 --- a/app/router.go +++ b/app/router.go @@ -121,11 +121,12 @@ func captureTopology(rep xfer.Reporter, f func(xfer.Reporter, topologyView, http // APIDetails are some generic details that can be fetched from /api type APIDetails struct { + ID string `json:"id"` Version string `json:"version"` } func apiHandler(w http.ResponseWriter, r *http.Request) { - respondWith(w, http.StatusOK, APIDetails{Version: version}) + respondWith(w, http.StatusOK, APIDetails{ID: uniqueID, Version: version}) } // Topology option labels should tell the current state. The first item must diff --git a/common/sanitize/sanitize.go b/common/sanitize/sanitize.go new file mode 100644 index 000000000..bd12bf8c9 --- /dev/null +++ b/common/sanitize/sanitize.go @@ -0,0 +1,39 @@ +package sanitize + +import ( + "fmt" + "log" + "net" + "net/url" + "strings" +) + +// URL returns a function that sanitizes a URL string. It lets underspecified +// strings to be converted to usable URLs via some default arguments. +func URL(scheme string, port int, path string) func(string) string { + if scheme == "" { + scheme = "http://" + } + return func(s string) string { + if s == "" { + return s // can't do much here + } + if !strings.HasPrefix(s, "http") { + s = scheme + s + } + u, err := url.Parse(s) + if err != nil { + log.Printf("%q: %v", s, err) + return s // oh well + } + if port > 0 { + if _, _, err = net.SplitHostPort(u.Host); err != nil { + u.Host += fmt.Sprintf(":%d", port) + } + } + if path != "" && u.Path != path { + u.Path = path + } + return u.String() + } +} diff --git a/common/sanitize/sanitize_test.go b/common/sanitize/sanitize_test.go new file mode 100644 index 000000000..1d6502488 --- /dev/null +++ b/common/sanitize/sanitize_test.go @@ -0,0 +1,34 @@ +package sanitize_test + +import ( + "testing" + + "github.com/weaveworks/scope/common/sanitize" +) + +func TestSanitizeURL(t *testing.T) { + for _, input := range []struct { + scheme string + port int + path string + input string + want string + }{ + {"", 0, "", "", ""}, + {"", 0, "", "foo", "http://foo"}, + {"", 80, "", "foo", "http://foo:80"}, + {"", 0, "some/path", "foo", "http://foo/some/path"}, + {"", 0, "/some/path", "foo", "http://foo/some/path"}, + {"https://", 0, "", "foo", "https://foo"}, + {"https://", 80, "", "foo", "https://foo:80"}, + {"https://", 0, "some/path", "foo", "https://foo/some/path"}, + {"https://", 0, "", "http://foo", "http://foo"}, // specified scheme beats default... + {"", 9999, "", "foo:80", "http://foo:80"}, // specified port beats default... + {"", 0, "/bar", "foo/baz", "http://foo/bar"}, // ...but default path beats specified! + } { + if want, have := input.want, sanitize.URL(input.scheme, input.port, input.path)(input.input); want != have { + t.Errorf("sanitize.URL(%q, %d, %q)(%q): want %q, have %q", input.scheme, input.port, input.path, input.input, want, have) + continue + } + } +} diff --git a/experimental/demoprobe/main.go b/experimental/demoprobe/main.go index 55cb91fb6..51c3ac5bf 100644 --- a/experimental/demoprobe/main.go +++ b/experimental/demoprobe/main.go @@ -23,7 +23,7 @@ func main() { ) flag.Parse() - publisher, err := xfer.NewHTTPPublisher(*publish, "demoprobe", "demoprobe") + _, publisher, err := xfer.NewHTTPPublisher(*publish, "demoprobe", "demoprobe") if err != nil { log.Fatal(err) } diff --git a/experimental/fixprobe/main.go b/experimental/fixprobe/main.go index 1bd1294b6..bcec4c05a 100644 --- a/experimental/fixprobe/main.go +++ b/experimental/fixprobe/main.go @@ -34,7 +34,7 @@ func main() { } f.Close() - publisher, err := xfer.NewHTTPPublisher(*publish, "fixprobe", "fixprobe") + _, publisher, err := xfer.NewHTTPPublisher(*publish, "fixprobe", "fixprobe") if err != nil { log.Fatal(err) } diff --git a/probe/main.go b/probe/main.go index a8dd86dda..a2d91dcbe 100644 --- a/probe/main.go +++ b/probe/main.go @@ -80,16 +80,16 @@ func main() { log.Printf("warning: process reporting enabled, but that requires root to find everything") } - publisherFactory := func(target string) (xfer.Publisher, error) { - publisher, err := xfer.NewHTTPPublisher(target, *token, probeID) + factory := func(endpoint string) (string, xfer.Publisher, error) { + id, publisher, err := xfer.NewHTTPPublisher(endpoint, *token, probeID) if err != nil { - return nil, err + return "", nil, err } - return xfer.NewBackgroundPublisher(publisher), nil + return id, xfer.NewBackgroundPublisher(publisher), nil } - publishers := xfer.NewMultiPublisher(publisherFactory) + publishers := xfer.NewMultiPublisher(factory) defer publishers.Stop() - resolver := newStaticResolver(targets, publishers.Add) + resolver := newStaticResolver(targets, publishers.Set) defer resolver.Stop() addrs, err := net.InterfaceAddrs() @@ -133,10 +133,7 @@ func main() { } if *weaveRouterAddr != "" { - weave, err := overlay.NewWeave(hostID, *weaveRouterAddr) - if err != nil { - log.Fatalf("failed to start Weave tagger: %v", err) - } + weave := overlay.NewWeave(hostID, *weaveRouterAddr) tickers = append(tickers, weave) taggers = append(taggers, weave) reporters = append(reporters, weave) diff --git a/probe/overlay/weave.go b/probe/overlay/weave.go index c509eea2c..ce1330523 100644 --- a/probe/overlay/weave.go +++ b/probe/overlay/weave.go @@ -5,14 +5,13 @@ import ( "encoding/json" "fmt" "log" - "net" "net/http" - "net/url" "regexp" "strings" "sync" "github.com/weaveworks/scope/common/exec" + "github.com/weaveworks/scope/common/sanitize" "github.com/weaveworks/scope/probe/docker" "github.com/weaveworks/scope/report" ) @@ -68,15 +67,11 @@ type weaveStatus struct { // NewWeave returns a new Weave tagger based on the Weave router at // address. The address should be an IP or FQDN, no port. -func NewWeave(hostID, weaveRouterAddress string) (*Weave, error) { - s, err := sanitize("http://", 6784, "/report")(weaveRouterAddress) - if err != nil { - return nil, err - } +func NewWeave(hostID, weaveRouterAddress string) *Weave { return &Weave{ - url: s, + url: sanitize.URL("http://", 6784, "/report")(weaveRouterAddress), hostID: hostID, - }, nil + } } // Tick implements Ticker @@ -202,25 +197,3 @@ func (w *Weave) Report() (report.Report, error) { } return r, nil } - -func sanitize(scheme string, port int, path string) func(string) (string, error) { - return func(s string) (string, error) { - if s == "" { - return "", fmt.Errorf("no host") - } - if !strings.HasPrefix(s, "http") { - s = scheme + s - } - u, err := url.Parse(s) - if err != nil { - return "", err - } - if _, _, err = net.SplitHostPort(u.Host); err != nil { - u.Host += fmt.Sprintf(":%d", port) - } - if u.Path != path { - u.Path = path - } - return u.String(), nil - } -} diff --git a/probe/overlay/weave_test.go b/probe/overlay/weave_test.go index bb5f18f6e..ab29ca781 100644 --- a/probe/overlay/weave_test.go +++ b/probe/overlay/weave_test.go @@ -25,11 +25,7 @@ func TestWeaveTaggerOverlayTopology(t *testing.T) { s := httptest.NewServer(http.HandlerFunc(mockWeaveRouter)) defer s.Close() - w, err := overlay.NewWeave(mockHostID, s.URL) - if err != nil { - t.Fatal(err) - } - + w := overlay.NewWeave(mockHostID, s.URL) w.Tick() { diff --git a/probe/resolver.go b/probe/resolver.go index 8b8c7e97e..58119497a 100644 --- a/probe/resolver.go +++ b/probe/resolver.go @@ -16,92 +16,96 @@ var ( ) type staticResolver struct { - quit chan struct{} - add func(string) - peers []peer + set func(string, []string) + targets []target + quit chan struct{} } -type peer struct { - hostname string - port string -} +type target struct{ host, port string } -// NewResolver starts a new resolver that periodically -// tries to resolve peers and the calls add() with all the -// resolved IPs. It explictiy supports hostnames which -// resolve to multiple IPs; it will repeatedly call -// add with the same IP, expecting the target to dedupe. -func newStaticResolver(peers []string, add func(string)) staticResolver { +func (t target) String() string { return net.JoinHostPort(t.host, t.port) } + +// newStaticResolver periodically resolves the targets, and calls the set +// function with all the resolved IPs. It explictiy supports targets which +// resolve to multiple IPs. +func newStaticResolver(targets []string, set func(target string, endpoints []string)) staticResolver { r := staticResolver{ - quit: make(chan struct{}), - add: add, - peers: prepareNames(peers), + targets: prepare(targets), + set: set, + quit: make(chan struct{}), } go r.loop() return r } -func prepareNames(strs []string) []peer { - var results []peer - for _, s := range strs { - var ( - hostname string - port string - ) - - if strings.Contains(s, ":") { - var err error - hostname, port, err = net.SplitHostPort(s) - if err != nil { - log.Printf("invalid address %s: %v", s, err) - continue - } - } else { - hostname, port = s, strconv.Itoa(xfer.AppPort) - } - - results = append(results, peer{hostname, port}) - } - return results -} - func (r staticResolver) loop() { - r.resolveHosts() + r.resolve() t := tick(time.Minute) for { select { case <-t: - r.resolveHosts() - + r.resolve() case <-r.quit: return } } } -func (r staticResolver) resolveHosts() { - for _, peer := range r.peers { - var addrs []net.IP - if addr := net.ParseIP(peer.hostname); addr != nil { - addrs = []net.IP{addr} - } else { - var err error - addrs, err = lookupIP(peer.hostname) - if err != nil { - continue - } - } - - for _, addr := range addrs { - // For now, ignore IPv6 - if addr.To4() == nil { - continue - } - r.add(net.JoinHostPort(addr.String(), peer.port)) - } - } -} - func (r staticResolver) Stop() { close(r.quit) } + +func prepare(strs []string) []target { + var targets []target + for _, s := range strs { + var host, port string + if strings.Contains(s, ":") { + var err error + host, port, err = net.SplitHostPort(s) + if err != nil { + log.Printf("invalid address %s: %v", s, err) + continue + } + } else { + host, port = s, strconv.Itoa(xfer.AppPort) + } + targets = append(targets, target{host, port}) + } + return targets +} + +func (r staticResolver) resolve() { + for t, endpoints := range resolveMany(r.targets) { + r.set(t.String(), endpoints) + } +} + +func resolveMany(targets []target) map[target][]string { + result := map[target][]string{} + for _, t := range targets { + result[t] = resolveOne(t) + } + return result +} + +func resolveOne(t target) []string { + var addrs []net.IP + if addr := net.ParseIP(t.host); addr != nil { + addrs = []net.IP{addr} + } else { + var err error + addrs, err = lookupIP(t.host) + if err != nil { + return []string{} + } + } + endpoints := make([]string, 0, len(addrs)) + for _, addr := range addrs { + // For now, ignore IPv6 + if addr.To4() == nil { + continue + } + endpoints = append(endpoints, net.JoinHostPort(addr.String(), t.port)) + } + return endpoints +} diff --git a/probe/resolver_test.go b/probe/resolver_test.go index d1deada58..4463e8f22 100644 --- a/probe/resolver_test.go +++ b/probe/resolver_test.go @@ -39,47 +39,53 @@ func TestResolver(t *testing.T) { port := ":80" ip1 := "192.168.0.1" ip2 := "192.168.0.10" - adds := make(chan string) - add := func(s string) { adds <- s } + sets := make(chan string) + set := func(target string, endpoints []string) { + for _, endpoint := range endpoints { + sets <- endpoint + } + } - r := newStaticResolver([]string{"symbolic.name" + port, "namewithnoport", ip1 + port, ip2}, add) + r := newStaticResolver([]string{"symbolic.name" + port, "namewithnoport", ip1 + port, ip2}, set) - assertAdd := func(want string) { + assertAdd := func(want ...string) { + remaining := map[string]struct{}{} + for _, s := range want { + remaining[s] = struct{}{} + } _, _, line, _ := runtime.Caller(1) - select { - case have := <-adds: - if want != have { - t.Errorf("line %d: want %q, have %q", line, want, have) + for len(remaining) > 0 { + select { + case s := <-sets: + if _, ok := remaining[s]; ok { + t.Logf("line %d: got %q OK", line, s) + delete(remaining, s) + } else { + t.Errorf("line %d: got unexpected %q", line, s) + } + case <-time.After(100 * time.Millisecond): + t.Fatalf("line %d: didn't get the adds in time", line) } - case <-time.After(100 * time.Millisecond): - t.Fatalf("line %d: didn't get add in time", line) } } // Initial resolve should just give us IPs - assertAdd(ip1 + port) - assertAdd(fmt.Sprintf("%s:%d", ip2, xfer.AppPort)) + assertAdd(ip1+port, fmt.Sprintf("%s:%d", ip2, xfer.AppPort)) // Trigger another resolve with a tick; again, // just want ips. c <- time.Now() - assertAdd(ip1 + port) - assertAdd(fmt.Sprintf("%s:%d", ip2, xfer.AppPort)) + assertAdd(ip1+port, fmt.Sprintf("%s:%d", ip2, xfer.AppPort)) ip3 := "1.2.3.4" updateIPs("symbolic.name", makeIPs(ip3)) - c <- time.Now() // trigger a resolve - assertAdd(ip3 + port) // we want 1 add - assertAdd(ip1 + port) - assertAdd(fmt.Sprintf("%s:%d", ip2, xfer.AppPort)) + c <- time.Now() // trigger a resolve + assertAdd(ip3+port, ip1+port, fmt.Sprintf("%s:%d", ip2, xfer.AppPort)) ip4 := "10.10.10.10" updateIPs("symbolic.name", makeIPs(ip3, ip4)) - c <- time.Now() // trigger another resolve, this time with 2 adds - assertAdd(ip3 + port) // first add - assertAdd(ip4 + port) // second add - assertAdd(ip1 + port) - assertAdd(fmt.Sprintf("%s:%d", ip2, xfer.AppPort)) + c <- time.Now() // trigger another resolve, this time with 2 adds + assertAdd(ip3+port, ip4+port, ip1+port, fmt.Sprintf("%s:%d", ip2, xfer.AppPort)) done := make(chan struct{}) go func() { r.Stop(); close(done) }() diff --git a/xfer/background_publisher.go b/xfer/background_publisher.go new file mode 100644 index 000000000..cd74d23d3 --- /dev/null +++ b/xfer/background_publisher.go @@ -0,0 +1,70 @@ +package xfer + +import ( + "io" + "log" + "time" +) + +const ( + initialBackoff = 1 * time.Second + maxBackoff = 60 * time.Second +) + +// BackgroundPublisher is a publisher which does the publish asynchronously. +// It will only do one publish at once; if there is an ongoing publish, +// concurrent publishes are dropped. +type BackgroundPublisher struct { + publisher Publisher + readers chan io.Reader + quit chan struct{} +} + +// NewBackgroundPublisher creates a new BackgroundPublisher with the given publisher +func NewBackgroundPublisher(p Publisher) *BackgroundPublisher { + bp := &BackgroundPublisher{ + publisher: p, + readers: make(chan io.Reader), + quit: make(chan struct{}), + } + go bp.loop() + return bp +} + +func (bp *BackgroundPublisher) loop() { + backoff := initialBackoff + + for r := range bp.readers { + err := bp.publisher.Publish(r) + if err == nil { + backoff = initialBackoff + continue + } + + log.Printf("Error publishing to %s, backing off %s: %v", bp.publisher, backoff, err) + select { + case <-time.After(backoff): + case <-bp.quit: + } + backoff *= 2 + if backoff > maxBackoff { + backoff = maxBackoff + } + } +} + +// Publish implements Publisher +func (bp *BackgroundPublisher) Publish(r io.Reader) error { + select { + case bp.readers <- r: + default: + } + return nil +} + +// Stop implements Publisher +func (bp *BackgroundPublisher) Stop() { + close(bp.readers) + close(bp.quit) + bp.publisher.Stop() +} diff --git a/xfer/background_publisher_test.go b/xfer/background_publisher_test.go new file mode 100644 index 000000000..60a1a6283 --- /dev/null +++ b/xfer/background_publisher_test.go @@ -0,0 +1,7 @@ +package xfer_test + +import "testing" + +func TestBackgroundPublisher(t *testing.T) { + t.Skip("TODO") +} diff --git a/xfer/http_publisher.go b/xfer/http_publisher.go new file mode 100644 index 000000000..8ff6e2f13 --- /dev/null +++ b/xfer/http_publisher.go @@ -0,0 +1,85 @@ +package xfer + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + "github.com/weaveworks/scope/common/sanitize" +) + +// HTTPPublisher publishes buffers by POST to a fixed endpoint. +type HTTPPublisher struct { + url string + token string + probeID string +} + +var fastClient = http.Client{ + Timeout: 5 * time.Second, +} + +// NewHTTPPublisher returns an HTTPPublisher ready for use. +func NewHTTPPublisher(target, token, probeID string) (string, *HTTPPublisher, error) { + targetAPI := sanitize.URL("http://", 0, "/api")(target) + resp, err := fastClient.Get(targetAPI) + if err != nil { + return "", nil, err + } + defer resp.Body.Close() + var apiResponse struct { + ID string `json:"id"` + } + if err := json.NewDecoder(resp.Body).Decode(&apiResponse); err != nil { + return "", nil, err + } + return apiResponse.ID, &HTTPPublisher{ + url: sanitize.URL("http://", 0, "/api/report")(target), + token: token, + probeID: probeID, + }, nil +} + +func (p HTTPPublisher) String() string { + return p.url +} + +// Publish publishes the report to the URL. +func (p HTTPPublisher) Publish(r io.Reader) error { + req, err := http.NewRequest("POST", p.url, r) + if err != nil { + return err + } + req.Header.Set("Authorization", AuthorizationHeader(p.token)) + req.Header.Set(ScopeProbeIDHeader, p.probeID) + req.Header.Set("Content-Encoding", "gzip") + // req.Header.Set("Content-Type", "application/binary") // TODO: we should use http.DetectContentType(..) on the gob'ed + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf(resp.Status) + } + return nil +} + +// Stop implements Publisher +func (p HTTPPublisher) Stop() {} + +// AuthorizationHeader returns a value suitable for an HTTP Authorization +// header, based on the passed token string. +func AuthorizationHeader(token string) string { + return fmt.Sprintf("Scope-Probe token=%s", token) +} + +// ScopeProbeIDHeader is the header we use to carry the probe's unique ID. The +// ID is currently set to the probe's hostname. It's designed to deduplicate +// reports from the same probe to the same receiver, in case the probe is +// configured to publish to multiple receivers that resolve to the same app. +const ScopeProbeIDHeader = "X-Scope-Probe-ID" diff --git a/xfer/publisher_test.go b/xfer/http_publisher_test.go similarity index 63% rename from xfer/publisher_test.go rename to xfer/http_publisher_test.go index 0bef0861e..d023f4469 100644 --- a/xfer/publisher_test.go +++ b/xfer/http_publisher_test.go @@ -1,9 +1,9 @@ package xfer_test import ( - "bytes" "compress/gzip" "encoding/gob" + "encoding/json" "net/http" "net/http/httptest" "reflect" @@ -12,7 +12,6 @@ import ( "time" "github.com/gorilla/handlers" - "github.com/weaveworks/scope/report" "github.com/weaveworks/scope/test" "github.com/weaveworks/scope/xfer" @@ -27,6 +26,11 @@ func TestHTTPPublisher(t *testing.T) { ) handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/api" { + _ = json.NewEncoder(w).Encode(map[string]string{"id": "irrelevant"}) + return + } + if want, have := xfer.AuthorizationHeader(token), r.Header.Get("Authorization"); want != have { t.Errorf("want %q, have %q", want, have) } @@ -61,7 +65,7 @@ func TestHTTPPublisher(t *testing.T) { s := httptest.NewServer(handlers.CompressHandler(handler)) defer s.Close() - p, err := xfer.NewHTTPPublisher(s.URL, token, id) + _, p, err := xfer.NewHTTPPublisher(s.URL, token, id) if err != nil { t.Fatal(err) } @@ -76,32 +80,3 @@ func TestHTTPPublisher(t *testing.T) { t.Error("timeout") } } - -func TestMultiPublisher(t *testing.T) { - var ( - p = &mockPublisher{} - factory = func(string) (xfer.Publisher, error) { return p, nil } - multiPublisher = xfer.NewMultiPublisher(factory) - ) - - multiPublisher.Add("first") - if err := multiPublisher.Publish(&bytes.Buffer{}); err != nil { - t.Error(err) - } - if want, have := 1, p.count; want != have { - t.Errorf("want %d, have %d", want, have) - } - - multiPublisher.Add("second") // but factory returns same mockPublisher - if err := multiPublisher.Publish(&bytes.Buffer{}); err != nil { - t.Error(err) - } - if want, have := 3, p.count; want != have { - t.Errorf("want %d, have %d", want, have) - } -} - -type mockPublisher struct{ count int } - -func (p *mockPublisher) Publish(*bytes.Buffer) error { p.count++; return nil } -func (p *mockPublisher) Stop() {} diff --git a/xfer/multi_publisher.go b/xfer/multi_publisher.go new file mode 100644 index 000000000..532525600 --- /dev/null +++ b/xfer/multi_publisher.go @@ -0,0 +1,143 @@ +package xfer + +import ( + "bytes" + "errors" + "io" + "io/ioutil" + "log" + "strings" + "sync" +) + +// MultiPublisher implements publisher over a collection of heterogeneous +// targets. See documentation of each method to understand the semantics. +type MultiPublisher struct { + mtx sync.Mutex + factory func(endpoint string) (string, Publisher, error) + sema semaphore + list []tuple +} + +// NewMultiPublisher returns a new MultiPublisher ready for use. +func NewMultiPublisher(factory func(endpoint string) (string, Publisher, error)) *MultiPublisher { + return &MultiPublisher{ + factory: factory, + sema: newSemaphore(maxConcurrentGET), + } +} + +type tuple struct { + publisher Publisher + target string // DNS name + endpoint string // IP addr + id string // unique ID from app + err error // if factory failed +} + +const maxConcurrentGET = 10 + +// Set declares that the target (DNS name) resolves to the provided endpoints +// (IPs), and that we want to publish to each of those endpoints. Set replaces +// any existing publishers to the given target. Set invokes the factory method +// to convert each endpoint to a publisher, and to get the remote receiver's +// unique ID. +func (p *MultiPublisher) Set(target string, endpoints []string) { + // Convert endpoints to publishers. + c := make(chan tuple, len(endpoints)) + for _, endpoint := range endpoints { + go func(endpoint string) { + p.sema.p() + defer p.sema.v() + id, publisher, err := p.factory(endpoint) + c <- tuple{publisher, target, endpoint, id, err} + }(endpoint) + } + list := make([]tuple, 0, len(p.list)+len(endpoints)) + for i := 0; i < cap(c); i++ { + t := <-c + if t.err != nil { + log.Printf("multi-publisher set: %s (%s): %v", t.target, t.endpoint, t.err) + continue + } + list = append(list, t) + } + + // Copy all other tuples over to the new list. + p.mtx.Lock() + defer p.mtx.Unlock() + p.list = p.appendFilter(list, func(t tuple) bool { return t.target != target }) +} + +// Delete removes all endpoints that match the given target. +func (p *MultiPublisher) Delete(target string) { + p.mtx.Lock() + defer p.mtx.Unlock() + p.list = p.appendFilter([]tuple{}, func(t tuple) bool { return t.target != target }) +} + +// Publish implements Publisher by publishing the reader to all of the +// underlying publishers sequentially. To do that, it needs to drain the +// reader, and recreate new readers for each publisher. Note that it will +// publish to one endpoint for each unique ID. Failed publishes don't count. +func (p *MultiPublisher) Publish(r io.Reader) error { + buf, err := ioutil.ReadAll(r) + if err != nil { + return err + } + + var ( + ids = map[string]struct{}{} + errs = []string{} + ) + + p.mtx.Lock() + defer p.mtx.Unlock() + + for _, t := range p.list { + if _, ok := ids[t.id]; ok { + continue + } + if err := t.publisher.Publish(bytes.NewReader(buf)); err != nil { + errs = append(errs, err.Error()) + continue + } + ids[t.id] = struct{}{} // sent already + } + if len(errs) > 0 { + return errors.New(strings.Join(errs, "; ")) + } + return nil +} + +// Stop invokes stop on all underlying publishers and removes them. +func (p *MultiPublisher) Stop() { + p.mtx.Lock() + defer p.mtx.Unlock() + for _, t := range p.list { + t.publisher.Stop() + } + p.list = []tuple{} +} + +func (p *MultiPublisher) appendFilter(list []tuple, f func(tuple) bool) []tuple { + for _, t := range p.list { + if !f(t) { + continue + } + list = append(list, t) + } + return list +} + +type semaphore chan struct{} + +func newSemaphore(n int) semaphore { + c := make(chan struct{}, n) + for i := 0; i < n; i++ { + c <- struct{}{} + } + return semaphore(c) +} +func (s semaphore) p() { <-s } +func (s semaphore) v() { s <- struct{}{} } diff --git a/xfer/multi_publisher_internal_test.go b/xfer/multi_publisher_internal_test.go new file mode 100644 index 000000000..4400b1b2e --- /dev/null +++ b/xfer/multi_publisher_internal_test.go @@ -0,0 +1,40 @@ +package xfer + +import ( + "testing" + "time" +) + +func TestSemaphore(t *testing.T) { + n := 3 + s := newSemaphore(n) + + // First n should be fine + for i := 0; i < n; i++ { + ok := make(chan struct{}) + go func() { s.p(); close(ok) }() + select { + case <-ok: + case <-time.After(10 * time.Millisecond): + t.Errorf("p (%d) failed", i+1) + } + } + + // This should block + ok := make(chan struct{}) + go func() { s.p(); close(ok) }() + select { + case <-ok: + t.Errorf("%dth p OK, but should block", n+1) + case <-time.After(10 * time.Millisecond): + //t.Logf("%dth p blocks, as expected", n+1) + } + + s.v() + + select { + case <-ok: + case <-time.After(10 * time.Millisecond): + t.Errorf("%dth p didn't resolve in time", n+1) + } +} diff --git a/xfer/multi_publisher_test.go b/xfer/multi_publisher_test.go new file mode 100644 index 000000000..2664340b7 --- /dev/null +++ b/xfer/multi_publisher_test.go @@ -0,0 +1,53 @@ +package xfer_test + +import ( + "bytes" + "fmt" + "io" + "testing" + + "github.com/weaveworks/scope/xfer" +) + +func TestMultiPublisher(t *testing.T) { + var ( + a1 = &mockPublisher{} // target a, endpoint 1 + a2 = &mockPublisher{} // target a, endpoint 2 (duplicate) + b2 = &mockPublisher{} // target b, endpoint 2 (duplicate) + b3 = &mockPublisher{} // target b, endpoint 3 + ) + + sum := func() int { return a1.count + a2.count + b2.count + b3.count } + + mp := xfer.NewMultiPublisher(func(endpoint string) (string, xfer.Publisher, error) { + switch endpoint { + case "a1": + return "1", a1, nil + case "a2": + return "2", a2, nil + case "b2": + return "2", b2, nil + case "b3": + return "3", b3, nil + default: + return "", nil, fmt.Errorf("invalid endpoint %s", endpoint) + } + }) + + mp.Set("a", []string{"a1", "a2"}) + mp.Set("b", []string{"b2", "b3"}) + + for i := 1; i < 10; i++ { + if err := mp.Publish(&bytes.Buffer{}); err != nil { + t.Error(err) + } + if want, have := 3*i, sum(); want != have { + t.Errorf("want %d, have %d", want, have) + } + } +} + +type mockPublisher struct{ count int } + +func (p *mockPublisher) Publish(io.Reader) error { p.count++; return nil } +func (p *mockPublisher) Stop() {} diff --git a/xfer/publisher.go b/xfer/publisher.go index 759e48a6e..7ac56a93e 100644 --- a/xfer/publisher.go +++ b/xfer/publisher.go @@ -1,215 +1,10 @@ package xfer -import ( - "bytes" - "fmt" - "log" - "net/http" - "net/url" - "strings" - "sync" - "time" -) +import "io" -const ( - initialBackoff = 1 * time.Second - maxBackoff = 60 * time.Second -) - -// Publisher is something which can send a buffered set of data somewhere, -// probably to a collector. +// Publisher is something which can send a stream of data somewhere, probably +// to a remote collector. type Publisher interface { - Publish(*bytes.Buffer) error + Publish(io.Reader) error Stop() } - -// HTTPPublisher publishes reports by POST to a fixed endpoint. -type HTTPPublisher struct { - url string - token string - id string -} - -// ScopeProbeIDHeader is the header we use to carry the probe's unique ID. The -// ID is currently set to the probe's hostname. It's designed to deduplicate -// reports from the same probe to the same receiver, in case the probe is -// configured to publish to multiple receivers that resolve to the same app. -const ScopeProbeIDHeader = "X-Scope-Probe-ID" - -// NewHTTPPublisher returns an HTTPPublisher ready for use. -func NewHTTPPublisher(target, token, id string) (*HTTPPublisher, error) { - if !strings.HasPrefix(target, "http") { - target = "http://" + target - } - u, err := url.Parse(target) - if err != nil { - return nil, err - } - if u.Path == "" { - u.Path = "/api/report" - } - return &HTTPPublisher{ - url: u.String(), - token: token, - id: id, - }, nil -} - -func (p HTTPPublisher) String() string { - return p.url -} - -// Publish publishes the report to the URL. -func (p HTTPPublisher) Publish(buf *bytes.Buffer) error { - req, err := http.NewRequest("POST", p.url, buf) - if err != nil { - return err - } - - req.Header.Set("Authorization", AuthorizationHeader(p.token)) - req.Header.Set(ScopeProbeIDHeader, p.id) - req.Header.Set("Content-Encoding", "gzip") - // req.Header.Set("Content-Type", "application/binary") // TODO: we should use http.DetectContentType(..) on the gob'ed - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return fmt.Errorf(resp.Status) - } - return nil -} - -// Stop implements Publisher -func (p HTTPPublisher) Stop() {} - -// AuthorizationHeader returns a value suitable for an HTTP Authorization -// header, based on the passed token string. -func AuthorizationHeader(token string) string { - return fmt.Sprintf("Scope-Probe token=%s", token) -} - -// BackgroundPublisher is a publisher which does the publish asynchronously. -// It will only do one publish at once; if there is an ongoing publish, -// concurrent publishes are dropped. -type BackgroundPublisher struct { - publisher Publisher - reports chan *bytes.Buffer - quit chan struct{} -} - -// NewBackgroundPublisher creates a new BackgroundPublisher with the given publisher -func NewBackgroundPublisher(p Publisher) *BackgroundPublisher { - result := &BackgroundPublisher{ - publisher: p, - reports: make(chan *bytes.Buffer), - quit: make(chan struct{}), - } - go result.loop() - return result -} - -func (b *BackgroundPublisher) loop() { - backoff := initialBackoff - - for r := range b.reports { - err := b.publisher.Publish(r) - if err == nil { - backoff = initialBackoff - continue - } - - log.Printf("Error publishing to %s, backing off %s: %v", b.publisher, backoff, err) - select { - case <-time.After(backoff): - case <-b.quit: - } - backoff = backoff * 2 - if backoff > maxBackoff { - backoff = maxBackoff - } - } -} - -// Publish implements Publisher -func (b *BackgroundPublisher) Publish(buf *bytes.Buffer) error { - select { - case b.reports <- buf: - default: - } - return nil -} - -// Stop implements Publisher -func (b *BackgroundPublisher) Stop() { - close(b.reports) - close(b.quit) - b.publisher.Stop() -} - -// MultiPublisher implements Publisher over a set of publishers. -type MultiPublisher struct { - mtx sync.RWMutex - factory func(string) (Publisher, error) - m map[string]Publisher -} - -// NewMultiPublisher returns a new MultiPublisher ready for use. The factory -// should be e.g. NewHTTPPublisher, except you need to curry it over the -// probe token. -func NewMultiPublisher(factory func(string) (Publisher, error)) *MultiPublisher { - return &MultiPublisher{ - factory: factory, - m: map[string]Publisher{}, - } -} - -// Add allows additional targets to be added dynamically. It will dedupe -// identical targets. TODO we have no good mechanism to remove. -func (p *MultiPublisher) Add(target string) { - p.mtx.Lock() - defer p.mtx.Unlock() - - if _, ok := p.m[target]; ok { - return - } - - publisher, err := p.factory(target) - if err != nil { - log.Printf("multi-publisher: %v", err) - return - } - - p.m[target] = publisher -} - -// Publish implements Publisher by emitting the report to all publishers. -func (p *MultiPublisher) Publish(buf *bytes.Buffer) error { - p.mtx.RLock() - defer p.mtx.RUnlock() - - var errs []string - for _, publisher := range p.m { - if err := publisher.Publish(bytes.NewBuffer(buf.Bytes())); err != nil { - errs = append(errs, err.Error()) - } - } - - if len(errs) > 0 { - return fmt.Errorf(strings.Join(errs, "; ")) - } - return nil -} - -// Stop implements Publisher -func (p *MultiPublisher) Stop() { - p.mtx.RLock() - defer p.mtx.RUnlock() - - for _, publisher := range p.m { - publisher.Stop() - } -}