From 2a00fd2d78bea07c8ac6469924681a9b50e8afe5 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 5 Oct 2016 10:59:56 -0700 Subject: [PATCH] Allow user to specify URLs on the command line, and use that to allow per-target tokens. (#1901) Also: - Parse targets on startup and catch badly formed ones before Scope can start. - If no port is specified, use default port for scheme; if no scheme is specificed, use 4040. - Use username as probe token --- app/controls_test.go | 4 +- app/pipes_internal_test.go | 4 +- extras/fixprobe/main.go | 8 +- probe/appclient/app_client.go | 48 ++++--- probe/appclient/app_client_internal_test.go | 18 ++- probe/appclient/multi_client.go | 19 +-- probe/appclient/multi_client_test.go | 19 +-- probe/appclient/probe_config.go | 8 +- probe/appclient/resolver.go | 142 +++++++++++++------- probe/appclient/resolver_internal_test.go | 135 ++++++++++++++++--- prog/main.go | 29 +++- prog/probe.go | 70 ++++++---- 12 files changed, 365 insertions(+), 139 deletions(-) diff --git a/app/controls_test.go b/app/controls_test.go index 2e1a7c520..bc2508eda 100644 --- a/app/controls_test.go +++ b/app/controls_test.go @@ -4,6 +4,7 @@ import ( "net" "net/http" "net/http/httptest" + "net/url" "strings" "testing" "time" @@ -43,7 +44,8 @@ func TestControl(t *testing.T) { Value: "foo", } }) - client, err := appclient.NewAppClient(probeConfig, ip+":"+port, ip+":"+port, controlHandler) + url := url.URL{Scheme: "http", Host: ip + ":" + port} + client, err := appclient.NewAppClient(probeConfig, ip+":"+port, url, controlHandler) if err != nil { t.Fatal(err) } diff --git a/app/pipes_internal_test.go b/app/pipes_internal_test.go index 308919755..20bb6f760 100644 --- a/app/pipes_internal_test.go +++ b/app/pipes_internal_test.go @@ -6,6 +6,7 @@ import ( "net" "net/http" "net/http/httptest" + "net/url" "strings" "testing" "time" @@ -83,7 +84,8 @@ func TestPipeClose(t *testing.T) { probeConfig := appclient.ProbeConfig{ ProbeID: "foo", } - client, err := appclient.NewAppClient(probeConfig, ip+":"+port, ip+":"+port, nil) + url := url.URL{Scheme: "http", Host: ip + ":" + port} + client, err := appclient.NewAppClient(probeConfig, ip+":"+port, url, nil) if err != nil { t.Fatal(err) } diff --git a/extras/fixprobe/main.go b/extras/fixprobe/main.go index f490a089f..cb1ceaffb 100644 --- a/extras/fixprobe/main.go +++ b/extras/fixprobe/main.go @@ -6,6 +6,7 @@ import ( "fmt" "io/ioutil" "log" + "net/url" "time" "github.com/ugorji/go/codec" @@ -45,11 +46,16 @@ func main() { } } + url, err := url.Parse(fmt.Sprintf("http://%s", *publish)) + if err != nil { + log.Fatal(err) + } + client, err := appclient.NewAppClient(appclient.ProbeConfig{ Token: *publishToken, ProbeID: *publishID, Insecure: false, - }, *publish, *publish, nil) + }, *publish, *url, nil) if err != nil { log.Fatal(err) } diff --git a/probe/appclient/app_client.go b/probe/appclient/app_client.go index cb240ecde..a99a42fde 100644 --- a/probe/appclient/app_client.go +++ b/probe/appclient/app_client.go @@ -7,6 +7,7 @@ import ( "net" "net/http" "net/rpc" + "net/url" "sync" "time" @@ -14,7 +15,6 @@ import ( "github.com/gorilla/websocket" "github.com/ugorji/go/codec" - "github.com/weaveworks/scope/common/sanitize" "github.com/weaveworks/scope/common/xfer" ) @@ -41,10 +41,11 @@ type appClient struct { quit chan struct{} mtx sync.Mutex - target string client http.Client wsDialer websocket.Dialer appID string + hostname string + target url.URL // Track all the background goroutines, ensure they all stop backgroundWait sync.WaitGroup @@ -61,7 +62,7 @@ type appClient struct { } // NewAppClient makes a new appClient. -func NewAppClient(pc ProbeConfig, hostname, target string, control xfer.ControlHandler) (AppClient, error) { +func NewAppClient(pc ProbeConfig, hostname string, target url.URL, control xfer.ControlHandler) (AppClient, error) { httpTransport, err := pc.getHTTPTransport(hostname) if err != nil { return nil, err @@ -74,6 +75,7 @@ func NewAppClient(pc ProbeConfig, hostname, target string, control xfer.ControlH return &appClient{ ProbeConfig: pc, quit: make(chan struct{}), + hostname: hostname, target: target, client: http.Client{ Transport: httpTransport, @@ -88,6 +90,20 @@ func NewAppClient(pc ProbeConfig, hostname, target string, control xfer.ControlH }, nil } +func (c *appClient) url(path string) string { + return c.target.String() + path +} + +func (c *appClient) wsURL(path string) string { + output := c.target //copy the url + if output.Scheme == "https" { + output.Scheme = "wss" + } else { + output.Scheme = "ws" + } + return output.String() + path +} + func (c *appClient) hasQuit() bool { select { case <-c.quit: @@ -150,7 +166,7 @@ func (c *appClient) Stop() { // Details fetches the details (version, id) of the app. func (c *appClient) Details() (xfer.Details, error) { result := xfer.Details{} - req, err := c.ProbeConfig.authorizedRequest("GET", sanitize.URL("", 0, "/api")(c.target), nil) + req, err := c.ProbeConfig.authorizedRequest("GET", c.url("/api"), nil) if err != nil { return result, err } @@ -184,7 +200,7 @@ func (c *appClient) doWithBackoff(msg string, f func() (bool, error)) { continue } - log.Errorf("Error doing %s for %s, backing off %s: %v", msg, c.target, backoff, err) + log.Errorf("Error doing %s for %s, backing off %s: %v", msg, c.hostname, backoff, err) select { case <-time.After(backoff): case <-c.quit: @@ -200,7 +216,7 @@ func (c *appClient) doWithBackoff(msg string, f func() (bool, error)) { func (c *appClient) controlConnection() (bool, error) { headers := http.Header{} c.ProbeConfig.authorizeHeaders(headers) - url := sanitize.URL("ws://", 0, "/api/control/ws")(c.target) + url := c.wsURL("/api/control/ws") conn, _, err := xfer.DialWS(&c.wsDialer, url, headers) if err != nil { return false, err @@ -232,14 +248,14 @@ func (c *appClient) controlConnection() (bool, error) { func (c *appClient) ControlConnection() { go func() { - log.Infof("Control connection to %s starting", c.target) - defer log.Infof("Control connection to %s exiting", c.target) + log.Infof("Control connection to %s starting", c.hostname) + defer log.Infof("Control connection to %s exiting", c.hostname) c.doWithBackoff("controls", c.controlConnection) }() } func (c *appClient) publish(r io.Reader) error { - url := sanitize.URL("", 0, "/api/report")(c.target) + url := c.url("/api/report") req, err := c.ProbeConfig.authorizedRequest("POST", url, r) if err != nil { return err @@ -266,8 +282,8 @@ func (c *appClient) publish(r io.Reader) error { func (c *appClient) startPublishing() { go func() { - log.Infof("Publish loop for %s starting", c.target) - defer log.Infof("Publish loop for %s exiting", c.target) + log.Infof("Publish loop for %s starting", c.hostname) + defer log.Infof("Publish loop for %s exiting", c.hostname) c.doWithBackoff("publish", func() (bool, error) { r := <-c.readers if r == nil { @@ -285,7 +301,7 @@ func (c *appClient) Publish(r io.Reader) error { select { case c.readers <- r: default: - log.Errorf("Dropping report to %s", c.target) + log.Errorf("Dropping report to %s", c.hostname) } return nil } @@ -293,7 +309,7 @@ func (c *appClient) Publish(r io.Reader) error { func (c *appClient) pipeConnection(id string, pipe xfer.Pipe) (bool, error) { headers := http.Header{} c.ProbeConfig.authorizeHeaders(headers) - url := sanitize.URL("ws://", 0, fmt.Sprintf("/api/pipe/%s/probe", id))(c.target) + url := c.wsURL(fmt.Sprintf("/api/pipe/%s/probe", id)) conn, resp, err := xfer.DialWS(&c.wsDialer, url, headers) if resp != nil && resp.StatusCode == http.StatusNotFound { // Special handling - 404 means the app/user has closed the pipe @@ -319,8 +335,8 @@ func (c *appClient) pipeConnection(id string, pipe xfer.Pipe) (bool, error) { func (c *appClient) PipeConnection(id string, pipe xfer.Pipe) { go func() { - log.Infof("Pipe %s connection to %s starting", id, c.target) - defer log.Infof("Pipe %s connection to %s exiting", id, c.target) + log.Infof("Pipe %s connection to %s starting", id, c.hostname) + defer log.Infof("Pipe %s connection to %s exiting", id, c.hostname) c.doWithBackoff(id, func() (bool, error) { return c.pipeConnection(id, pipe) }) @@ -329,7 +345,7 @@ func (c *appClient) PipeConnection(id string, pipe xfer.Pipe) { // PipeClose closes the given pipe id on the app. func (c *appClient) PipeClose(id string) error { - url := sanitize.URL("", 0, fmt.Sprintf("/api/pipe/%s", id))(c.target) + url := c.url(fmt.Sprintf("/api/pipe/%s", id)) req, err := c.ProbeConfig.authorizedRequest("DELETE", url, nil) if err != nil { return err diff --git a/probe/appclient/app_client_internal_test.go b/probe/appclient/app_client_internal_test.go index d8acc0ed7..e29a5bc75 100644 --- a/probe/appclient/app_client_internal_test.go +++ b/probe/appclient/app_client_internal_test.go @@ -109,7 +109,11 @@ func TestAppClientPublish(t *testing.T) { Insecure: false, } - p, err := NewAppClient(pc, u.Host, s.URL, nil) + url, err := url.Parse(s.URL) + if err != nil { + t.Fatal(err) + } + p, err := NewAppClient(pc, u.Host, *url, nil) if err != nil { t.Fatal(err) } @@ -158,7 +162,11 @@ func TestAppClientDetails(t *testing.T) { ProbeID: "", Insecure: false, } - p, err := NewAppClient(pc, u.Host, s.URL, nil) + url, err := url.Parse(s.URL) + if err != nil { + t.Fatal(err) + } + p, err := NewAppClient(pc, u.Host, *url, nil) if err != nil { t.Fatal(err) } @@ -203,7 +211,11 @@ func TestStop(t *testing.T) { Insecure: false, } - p, err := NewAppClient(pc, u.Host, s.URL, nil) + url, err := url.Parse(s.URL) + if err != nil { + t.Fatal(err) + } + p, err := NewAppClient(pc, u.Host, *url, nil) if err != nil { t.Fatal(err) } diff --git a/probe/appclient/multi_client.go b/probe/appclient/multi_client.go index 012556b8d..c38c9d3ed 100644 --- a/probe/appclient/multi_client.go +++ b/probe/appclient/multi_client.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "io/ioutil" + "net/url" "strings" "sync" @@ -18,7 +19,7 @@ import ( const maxConcurrentGET = 10 // ClientFactory is a thing thats makes AppClients -type ClientFactory func(string, string) (AppClient, error) +type ClientFactory func(string, url.URL) (AppClient, error) type multiClient struct { clientFactory ClientFactory @@ -46,7 +47,7 @@ type Publisher interface { // MultiAppClient maintains a set of upstream apps, and ensures we have an // AppClient for each one. type MultiAppClient interface { - Set(hostname string, endpoints []string) + Set(hostname string, urls []url.URL) PipeConnection(appID, pipeID string, pipe xfer.Pipe) error PipeClose(appID, pipeID string) error Stop() @@ -67,17 +68,17 @@ func NewMultiAppClient(clientFactory ClientFactory, noControls bool) MultiAppCli } // Set the list of endpoints for the given hostname. -func (c *multiClient) Set(hostname string, endpoints []string) { +func (c *multiClient) Set(hostname string, urls []url.URL) { wg := sync.WaitGroup{} - wg.Add(len(endpoints)) - clients := make(chan clientTuple, len(endpoints)) - for _, endpoint := range endpoints { - go func(endpoint string) { + wg.Add(len(urls)) + clients := make(chan clientTuple, len(urls)) + for _, u := range urls { + go func(u url.URL) { c.sema.acquire() defer c.sema.release() defer wg.Done() - client, err := c.clientFactory(hostname, endpoint) + client, err := c.clientFactory(hostname, u) if err != nil { log.Errorf("Error creating new app client: %v", err) return @@ -90,7 +91,7 @@ func (c *multiClient) Set(hostname string, endpoints []string) { } clients <- clientTuple{details, client} - }(endpoint) + }(u) } wg.Wait() diff --git a/probe/appclient/multi_client_test.go b/probe/appclient/multi_client_test.go index 5a24519b5..93795083b 100644 --- a/probe/appclient/multi_client_test.go +++ b/probe/appclient/multi_client_test.go @@ -3,6 +3,7 @@ package appclient_test import ( "bytes" "io" + "net/url" "runtime" "testing" @@ -42,8 +43,8 @@ var ( a2 = &mockClient{id: "2"} // hostname a, app id 2 b2 = &mockClient{id: "2"} // hostname b, app id 2 (duplicate) b3 = &mockClient{id: "3"} // hostname b, app id 3 - factory = func(hostname, target string) (appclient.AppClient, error) { - switch target { + factory = func(hostname string, url url.URL) (appclient.AppClient, error) { + switch url.Host { case "a1": return a1, nil case "a2": @@ -53,7 +54,7 @@ var ( case "b3": return b3, nil } - panic(target) + panic(url.Host) } ) @@ -71,20 +72,20 @@ func TestMultiClient(t *testing.T) { defer mp.Stop() // Add two hostnames with overlapping apps, check we don't add the same app twice - mp.Set("a", []string{"a1", "a2"}) - mp.Set("b", []string{"b2", "b3"}) + mp.Set("a", []url.URL{{Host: "a1"}, {Host: "a2"}}) + mp.Set("b", []url.URL{{Host: "b2"}, {Host: "b3"}}) expect(a1.count, 1) expect(a2.count+b2.count, 1) expect(b3.count, 1) // Now drop the overlap, check we don't remove the app - mp.Set("b", []string{"b3"}) + mp.Set("b", []url.URL{{Host: "b3"}}) expect(a1.count, 1) expect(a2.count+b2.count, 1) expect(b3.count, 1) // Now check we remove apps - mp.Set("b", []string{}) + mp.Set("b", []url.URL{}) expect(b3.stopped, 1) } @@ -94,8 +95,8 @@ func TestMultiClientPublish(t *testing.T) { sum := func() int { return a1.publish + a2.publish + b2.publish + b3.publish } - mp.Set("a", []string{"a1", "a2"}) - mp.Set("b", []string{"b2", "b3"}) + mp.Set("a", []url.URL{{Host: "a1"}, {Host: "a2"}}) + mp.Set("b", []url.URL{{Host: "b2"}, {Host: "b3"}}) for i := 1; i < 10; i++ { if err := mp.Publish(&bytes.Buffer{}); err != nil { diff --git a/probe/appclient/probe_config.go b/probe/appclient/probe_config.go index 2b8493a62..04d6533c2 100644 --- a/probe/appclient/probe_config.go +++ b/probe/appclient/probe_config.go @@ -5,7 +5,6 @@ import ( "crypto/x509" "fmt" "io" - "net" "net/http" "github.com/certifi/gocertifi" @@ -51,15 +50,10 @@ func (pc ProbeConfig) getHTTPTransport(hostname string) (*http.Transport, error) }, nil } - host, _, err := net.SplitHostPort(hostname) - if err != nil { - return nil, err - } - return &http.Transport{ TLSClientConfig: &tls.Config{ RootCAs: certPool, - ServerName: host, + ServerName: hostname, }, }, nil } diff --git a/probe/appclient/resolver.go b/probe/appclient/resolver.go index f65907254..f965a5a06 100644 --- a/probe/appclient/resolver.go +++ b/probe/appclient/resolver.go @@ -2,6 +2,7 @@ package appclient import ( "net" + "net/url" "strconv" "strings" "time" @@ -16,10 +17,6 @@ const ( dnsPollInterval = 10 * time.Second ) -var ( - tick = fastStartTicker -) - // fastStartTicker is a ticker that 'ramps up' from 1 sec to duration. func fastStartTicker(duration time.Duration) <-chan time.Time { c := make(chan time.Time, 1) @@ -41,41 +38,60 @@ func fastStartTicker(duration time.Duration) <-chan time.Time { return c } -type setter func(string, []string) - // Resolver is a thing that can be stopped... type Resolver interface { Stop() } type staticResolver struct { - setters []setter - targets []target + ResolverConfig + failedResolutions map[string]struct{} quit chan struct{} - lookup LookupIP } // LookupIP type is used for looking up IPs. type LookupIP func(host string) (ips []net.IP, err error) -type target struct{ host, port string } +// Target is a parsed representation of the app location. +type Target struct { + original string // the original url string + url *url.URL // the parsed url + hostname string // the hostname (without port) from the url + port int // the port, or a sensible default +} -func (t target) String() string { return net.JoinHostPort(t.host, t.port) } +func (t Target) String() string { + return net.JoinHostPort(t.hostname, strconv.Itoa(t.port)) +} + +// ResolverConfig is the config for a resolver. +type ResolverConfig struct { + Targets []Target + Set func(string, []url.URL) + + // Optional + Lookup LookupIP + Ticker func(time.Duration) <-chan time.Time +} // NewResolver periodically resolves the targets, and calls the set // function with all the resolved IPs. It explictiy supports targets which // resolve to multiple IPs. It uses the supplied DNS server name. -func NewResolver(targets []string, lookup LookupIP, setters ...setter) Resolver { +func NewResolver(config ResolverConfig) (Resolver, error) { + if config.Lookup == nil { + config.Lookup = net.LookupIP + } + if config.Ticker == nil { + config.Ticker = fastStartTicker + } r := staticResolver{ - targets: prepare(targets), - setters: setters, + ResolverConfig: config, failedResolutions: map[string]struct{}{}, quit: make(chan struct{}), - lookup: lookup, } go r.loop() - return r + return r, nil } // LookupUsing produces a LookupIP function for the given DNS server. @@ -102,7 +118,7 @@ func LookupUsing(dnsServer string) func(host string) (ips []net.IP, err error) { func (r staticResolver) loop() { r.resolve() - t := tick(dnsPollInterval) + t := r.Ticker(dnsPollInterval) for { select { case <-t: @@ -117,58 +133,94 @@ func (r staticResolver) Stop() { close(r.quit) } -func prepare(strs []string) []target { - var targets []target - for _, s := range strs { - var host, port string - if strings.Contains(s, ":") { - var err error - host, port, err = net.SplitHostPort(s) +// ParseTargets deals with missing information in the targets string, defaulting +// the scheme, port etc. +func ParseTargets(urls []string) ([]Target, error) { + var targets []Target + for _, u := range urls { + // naked hostnames (such as "localhost") are interpreted as relative URLs + // so we add a scheme if u doesn't have one. + prefixAdded := false + if !strings.Contains(u, "://") { + prefixAdded = true + if strings.HasSuffix(u, ":443") { + u = "https://" + u + } else { + u = "http://" + u + } + } + parsed, err := url.Parse(u) + if err != nil { + return nil, err + } + + var hostname string + var port int + if strings.Contains(parsed.Host, ":") { + var portStr string + hostname, portStr, err = net.SplitHostPort(parsed.Host) if err != nil { - log.Errorf("invalid address %s: %v", s, err) - continue + return nil, err + } + port, err = strconv.Atoi(portStr) + if err != nil { + return nil, err } } else { - host, port = s, strconv.Itoa(xfer.AppPort) + if prefixAdded { + port = xfer.AppPort + } else if strings.HasPrefix(u, "https://") { + port = 443 + } else { + port = 80 + } + hostname = parsed.Host } - targets = append(targets, target{host, port}) + targets = append(targets, Target{ + original: u, + url: parsed, + hostname: hostname, + port: port, + }) } - return targets + return targets, nil } func (r staticResolver) resolve() { - for t, endpoints := range r.resolveMany(r.targets) { - for _, setter := range r.setters { - setter(t.String(), endpoints) - } + for _, t := range r.Targets { + ips := r.resolveOne(t) + urls := makeURLs(t, ips) + r.Set(t.hostname, urls) } } -func (r staticResolver) resolveMany(targets []target) map[target][]string { - result := map[target][]string{} - for _, t := range targets { - result[t] = r.resolveOne(t) +func makeURLs(t Target, ips []string) []url.URL { + result := []url.URL{} + for _, ip := range ips { + u := *t.url + u.Host = net.JoinHostPort(ip, strconv.Itoa(t.port)) + result = append(result, u) } return result } -func (r staticResolver) resolveOne(t target) []string { +func (r staticResolver) resolveOne(t Target) []string { var addrs []net.IP - if addr := net.ParseIP(t.host); addr != nil { + if addr := net.ParseIP(t.hostname); addr != nil { addrs = []net.IP{addr} } else { var err error - addrs, err = r.lookup(t.host) + addrs, err = r.Lookup(t.hostname) if err != nil { - if _, ok := r.failedResolutions[t.host]; !ok { - log.Warnf("Cannot resolve %s: %v", t.host, err) + if _, ok := r.failedResolutions[t.hostname]; !ok { + log.Warnf("Cannot resolve '%s': %v", t.hostname, err) // Only log the error once - r.failedResolutions[t.host] = struct{}{} + r.failedResolutions[t.hostname] = struct{}{} } return []string{} } // Allow logging errors in future resolutions - delete(r.failedResolutions, t.host) + delete(r.failedResolutions, t.hostname) } endpoints := make([]string, 0, len(addrs)) for _, addr := range addrs { @@ -176,7 +228,7 @@ func (r staticResolver) resolveOne(t target) []string { if addr.To4() == nil { continue } - endpoints = append(endpoints, net.JoinHostPort(addr.String(), t.port)) + endpoints = append(endpoints, addr.String()) } return endpoints } diff --git a/probe/appclient/resolver_internal_test.go b/probe/appclient/resolver_internal_test.go index a3b239737..62a80a8f3 100644 --- a/probe/appclient/resolver_internal_test.go +++ b/probe/appclient/resolver_internal_test.go @@ -3,19 +3,97 @@ package appclient import ( "fmt" "net" + "net/url" "runtime" "sync" "testing" "time" "github.com/weaveworks/scope/common/xfer" + "github.com/weaveworks/scope/test" ) -func TestResolver(t *testing.T) { - oldTick := tick - defer func() { tick = oldTick }() +func TestResolverCases(t *testing.T) { c := make(chan time.Time) - tick = func(_ time.Duration) <-chan time.Time { return c } + ticker := func(_ time.Duration) <-chan time.Time { return c } + + ips := map[string][]net.IP{ + "foo": {net.IPv4(192, 168, 0, 1)}, + "bar": {net.IPv4(192, 168, 0, 2), net.IPv4(192, 168, 0, 3)}, + } + lookupIP := func(host string) ([]net.IP, error) { + addrs, ok := ips[host] + if !ok { + return nil, fmt.Errorf("Not found") + } + return addrs, nil + } + + testResolver := func(target string, expected []url.URL) { + mtx := sync.Mutex{} + found := map[url.URL]struct{}{} + set := func(target string, urls []url.URL) { + mtx.Lock() + defer mtx.Unlock() + for _, url := range urls { + found[url] = struct{}{} + } + } + + targets, err := ParseTargets([]string{target}) + if err != nil { + t.Fatal(err) + } + + r, err := NewResolver(ResolverConfig{ + Targets: targets, + Lookup: lookupIP, + Set: set, + Ticker: ticker, + }) + if err != nil { + t.Fatal(err) + } + defer r.Stop() + + c <- time.Now() + test.Poll(t, 200*time.Millisecond, expected, func() interface{} { + mtx.Lock() + defer mtx.Unlock() + have := []url.URL{} + for url := range found { + have = append(have, url) + } + return have + }) + } + + for _, tc := range []struct { + in string + expected []url.URL + }{ + {"foo", []url.URL{{Scheme: "http", Host: "192.168.0.1:4040"}}}, + {"foo:80", []url.URL{{Scheme: "http", Host: "192.168.0.1:80"}}}, + {"foo:443", []url.URL{{Scheme: "https", Host: "192.168.0.1:443"}}}, + {"foo:1234", []url.URL{{Scheme: "http", Host: "192.168.0.1:1234"}}}, + {"http://foo", []url.URL{{Scheme: "http", Host: "192.168.0.1:80"}}}, + {"http://foo:80", []url.URL{{Scheme: "http", Host: "192.168.0.1:80"}}}, + {"http://foo:443", []url.URL{{Scheme: "http", Host: "192.168.0.1:443"}}}, + {"http://foo:1234", []url.URL{{Scheme: "http", Host: "192.168.0.1:1234"}}}, + {"https://foo", []url.URL{{Scheme: "https", Host: "192.168.0.1:443"}}}, + {"https://foo:80", []url.URL{{Scheme: "https", Host: "192.168.0.1:80"}}}, + {"https://foo:443", []url.URL{{Scheme: "https", Host: "192.168.0.1:443"}}}, + {"https://foo:1234", []url.URL{{Scheme: "https", Host: "192.168.0.1:1234"}}}, + {"user:pass@foo", []url.URL{{Scheme: "http", Host: "192.168.0.1:4040", User: url.UserPassword("user", "pass")}}}, + {"bar", []url.URL{{Scheme: "http", Host: "192.168.0.2:4040"}, {Scheme: "http", Host: "192.168.0.3:4040"}}}, + } { + testResolver(tc.in, tc.expected) + } +} + +func TestResolver(t *testing.T) { + c := make(chan time.Time) + ticker := func(_ time.Duration) <-chan time.Time { return c } ipsLock := sync.Mutex{} ips := map[string][]net.IP{} @@ -37,17 +115,30 @@ func TestResolver(t *testing.T) { port := ":80" ip1 := "192.168.0.1" ip2 := "192.168.0.10" - sets := make(chan string) - set := func(target string, endpoints []string) { + sets := make(chan url.URL) + set := func(target string, endpoints []url.URL) { for _, endpoint := range endpoints { sets <- endpoint } } - r := NewResolver([]string{"symbolic.name" + port, "namewithnoport", ip1 + port, ip2}, lookupIP, set) + targets, err := ParseTargets([]string{"symbolic.name" + port, "namewithnoport", ip1 + port, ip2}) + if err != nil { + t.Fatal(err) + } - assertAdd := func(want ...string) { - remaining := map[string]struct{}{} + r, err := NewResolver(ResolverConfig{ + Targets: targets, + Lookup: lookupIP, + Set: set, + Ticker: ticker, + }) + if err != nil { + t.Fatal(err) + } + + assertAdd := func(want ...url.URL) { + remaining := map[url.URL]struct{}{} for _, s := range want { remaining[s] = struct{}{} } @@ -56,10 +147,10 @@ func TestResolver(t *testing.T) { select { case s := <-sets: if _, ok := remaining[s]; ok { - t.Logf("line %d: got %q OK", line, s) + t.Logf("line %d: got %v OK", line, s) delete(remaining, s) } else { - t.Errorf("line %d: got unexpected %q", line, s) + t.Errorf("line %d: got unexpected %v", line, s) } case <-time.After(100 * time.Millisecond): t.Fatalf("line %d: didn't get the adds in time", line) @@ -68,22 +159,36 @@ func TestResolver(t *testing.T) { } // Initial resolve should just give us IPs - assertAdd(ip1+port, fmt.Sprintf("%s:%d", ip2, xfer.AppPort)) + assertAdd( + url.URL{Scheme: "http", Host: ip1 + port}, + url.URL{Scheme: "http", Host: fmt.Sprintf("%s:%d", ip2, xfer.AppPort)}, + ) // Trigger another resolve with a tick; again, // just want ips. c <- time.Now() - assertAdd(ip1+port, fmt.Sprintf("%s:%d", ip2, xfer.AppPort)) + assertAdd( + url.URL{Scheme: "http", Host: ip1 + port}, + url.URL{Scheme: "http", Host: fmt.Sprintf("%s:%d", ip2, xfer.AppPort)}, + ) ip3 := "1.2.3.4" updateIPs("symbolic.name", makeIPs(ip3)) c <- time.Now() // trigger a resolve - assertAdd(ip3+port, ip1+port, fmt.Sprintf("%s:%d", ip2, xfer.AppPort)) + assertAdd( + url.URL{Scheme: "http", Host: ip3 + port}, + url.URL{Scheme: "http", Host: ip1 + port}, url.URL{Scheme: "http", Host: fmt.Sprintf("%s:%d", ip2, xfer.AppPort)}, + ) ip4 := "10.10.10.10" updateIPs("symbolic.name", makeIPs(ip3, ip4)) c <- time.Now() // trigger another resolve, this time with 2 adds - assertAdd(ip3+port, ip4+port, ip1+port, fmt.Sprintf("%s:%d", ip2, xfer.AppPort)) + assertAdd( + url.URL{Scheme: "http", Host: ip3 + port}, + url.URL{Scheme: "http", Host: ip4 + port}, + url.URL{Scheme: "http", Host: ip1 + port}, + url.URL{Scheme: "http", Host: fmt.Sprintf("%s:%d", ip2, xfer.AppPort)}, + ) done := make(chan struct{}) go func() { r.Stop(); close(done) }() diff --git a/prog/main.go b/prog/main.go index c60a4b008..ad9b1c656 100644 --- a/prog/main.go +++ b/prog/main.go @@ -14,6 +14,7 @@ import ( "github.com/weaveworks/scope/app" "github.com/weaveworks/scope/common/xfer" + "github.com/weaveworks/scope/probe/appclient" "github.com/weaveworks/scope/probe/kubernetes" "github.com/weaveworks/weave/common" ) @@ -278,12 +279,34 @@ func main() { // Special case for #1191, check listen address is well formed _, _, err := net.SplitHostPort(flags.app.listen) if err != nil { - log.Errorf("Invalid value for -app.http.address: %v", err) + log.Fatalf("Invalid value for -app.http.address: %v", err) } if flags.probe.httpListen != "" { _, _, err := net.SplitHostPort(flags.probe.httpListen) if err != nil { - log.Errorf("Invalid value for -app.http.address: %v", err) + log.Fatalf("Invalid value for -app.http.address: %v", err) + } + } + + // Special case probe push address parsing + targets := []appclient.Target{} + if mode == "probe" || dryRun { + args := []string{} + if flags.probe.token != "" { + // service mode + if len(flag.Args()) == 0 { + args = append(args, defaultServiceHost) + } + } else if !flags.probe.noApp { + args = append(args, fmt.Sprintf("localhost:%d", xfer.AppPort)) + } + args = append(args, flag.Args()...) + if !dryRun { + log.Infof("publishing to: %s", strings.Join(args, ", ")) + } + targets, err = appclient.ParseTargets(args) + if err != nil { + log.Fatalf("Invalid targets: %v", err) } } @@ -295,7 +318,7 @@ func main() { case "app": appMain(flags.app) case "probe": - probeMain(flags.probe) + probeMain(flags.probe, targets) case "version": fmt.Println("Weave Scope version", version) case "help": diff --git a/prog/probe.go b/prog/probe.go index d564d8177..88e1d2eb7 100644 --- a/prog/probe.go +++ b/prog/probe.go @@ -1,15 +1,13 @@ package main import ( - "flag" - "fmt" "math/rand" "net" "net/http" _ "net/http/pprof" + "net/url" "os" "strconv" - "strings" "time" log "github.com/Sirupsen/logrus" @@ -38,7 +36,7 @@ import ( const ( versionCheckPeriod = 6 * time.Hour - defaultServiceHost = "cloud.weave.works:443" + defaultServiceHost = "https://cloud.weave.works:443" ) var pluginAPIVersion = "1" @@ -65,7 +63,7 @@ func check(flags map[string]string) { } // Main runs the probe -func probeMain(flags probeFlags) { +func probeMain(flags probeFlags, targets []appclient.Target) { setLogLevel(flags.logLevel) setLogFormatter(flags.logPrefix) @@ -94,28 +92,21 @@ func probeMain(flags probeFlags) { } go check(checkpointFlags) - var targets = []string{} - if flags.token != "" { - // service mode - if len(flag.Args()) == 0 { - targets = append(targets, defaultServiceHost) - } - } else if !flags.noApp { - targets = append(targets, fmt.Sprintf("localhost:%d", xfer.AppPort)) - } - targets = append(targets, flag.Args()...) - log.Infof("publishing to: %s", strings.Join(targets, ", ")) - - probeConfig := appclient.ProbeConfig{ - Token: flags.token, - ProbeVersion: version, - ProbeID: probeID, - Insecure: flags.insecure, - } handlerRegistry := controls.NewDefaultHandlerRegistry() - clientFactory := func(hostname, endpoint string) (appclient.AppClient, error) { + clientFactory := func(hostname string, url url.URL) (appclient.AppClient, error) { + token := flags.token + if url.User != nil { + token = url.User.Username() + url.User = nil // erase credentials, as we use a special header + } + probeConfig := appclient.ProbeConfig{ + Token: token, + ProbeVersion: version, + ProbeID: probeID, + Insecure: flags.insecure, + } return appclient.NewAppClient( - probeConfig, hostname, endpoint, + probeConfig, hostname, url, xfer.ControlHandlerFunc(handlerRegistry.HandleControlRequest), ) } @@ -126,7 +117,15 @@ func probeMain(flags probeFlags) { if flags.resolver != "" { dnsLookupFn = appclient.LookupUsing(flags.resolver) } - resolver := appclient.NewResolver(targets, dnsLookupFn, clients.Set) + resolver, err := appclient.NewResolver(appclient.ResolverConfig{ + Targets: targets, + Lookup: dnsLookupFn, + Set: clients.Set, + }) + if err != nil { + log.Fatalf("Failed to create resolver: %v", err) + return + } defer resolver.Stop() p := probe.New(flags.spyInterval, flags.publishInterval, clients, flags.noControls) @@ -210,8 +209,21 @@ func probeMain(flags probeFlags) { log.Println("Error getting docker bridge ip:", err) } else { weaveDNSLookup := appclient.LookupUsing(dockerBridgeIP + ":53") - weaveResolver := appclient.NewResolver([]string{flags.weaveHostname}, weaveDNSLookup, clients.Set) - defer weaveResolver.Stop() + weaveTargets, err := appclient.ParseTargets([]string{flags.weaveHostname}) + if err != nil { + log.Errorf("Failed to parse weave targets: %v", err) + } else { + weaveResolver, err := appclient.NewResolver(appclient.ResolverConfig{ + Targets: weaveTargets, + Lookup: weaveDNSLookup, + Set: clients.Set, + }) + if err != nil { + log.Errorf("Failed to create weave resolver: %v", err) + } else { + defer weaveResolver.Stop() + } + } } } @@ -235,7 +247,7 @@ func probeMain(flags probeFlags) { if flags.httpListen != "" { go func() { log.Infof("Profiling data being exported to %s", flags.httpListen) - log.Infof("go tool pprof http://%s/debug/pprof/{profile,heap,block}", flags.httpListen) + log.Infof("go tool proof http://%s/debug/pprof/{profile,heap,block}", flags.httpListen) log.Infof("Profiling endpoint %s terminated: %v", flags.httpListen, http.ListenAndServe(flags.httpListen, nil)) }() }