From 43bfecbba4be7d62561a87f4e0f3d2ab36a04013 Mon Sep 17 00:00:00 2001 From: Peter Bourgon Date: Wed, 12 Aug 2015 17:12:20 +0200 Subject: [PATCH] Add X-Scope-Probe-ID header to POSTs --- experimental/demoprobe/main.go | 2 +- experimental/fixprobe/main.go | 2 +- probe/main.go | 11 +++++++---- xfer/publisher.go | 11 ++++++++++- xfer/publisher_test.go | 15 ++++++++++++++- 5 files changed, 33 insertions(+), 8 deletions(-) diff --git a/experimental/demoprobe/main.go b/experimental/demoprobe/main.go index fbb933cf9..acb203ffb 100644 --- a/experimental/demoprobe/main.go +++ b/experimental/demoprobe/main.go @@ -23,7 +23,7 @@ func main() { ) flag.Parse() - publisher, err := xfer.NewHTTPPublisher(*publish, "demoprobe") + publisher, err := xfer.NewHTTPPublisher(*publish, "demoprobe", "demoprobe") if err != nil { log.Fatal(err) } diff --git a/experimental/fixprobe/main.go b/experimental/fixprobe/main.go index 6715e4a3c..f0d0a1812 100644 --- a/experimental/fixprobe/main.go +++ b/experimental/fixprobe/main.go @@ -34,7 +34,7 @@ func main() { } f.Close() - publisher, err := xfer.NewHTTPPublisher(*publish, "fixprobe") + publisher, err := xfer.NewHTTPPublisher(*publish, "fixprobe", "fixprobe") if err != nil { log.Fatal(err) } diff --git a/probe/main.go b/probe/main.go index 0383384f0..499badfa3 100644 --- a/probe/main.go +++ b/probe/main.go @@ -48,7 +48,12 @@ func main() { ) flag.Parse() - log.Printf("probe starting, version %s", version) + var ( + hostName = hostname() + hostID = hostName // TODO: we should sanitize the hostname + probeID = hostName // TODO: does this need to be a random string instead? + ) + log.Printf("probe starting, version %s, ID %s", version, probeID) if len(flag.Args()) > 0 { targets = flag.Args() @@ -74,7 +79,7 @@ func main() { log.Printf("warning: process reporting enabled, but that requires root to find everything") } - publisherFactory := func(target string) (xfer.Publisher, error) { return xfer.NewHTTPPublisher(target, *token) } + publisherFactory := func(target string) (xfer.Publisher, error) { return xfer.NewHTTPPublisher(target, *token, probeID) } publishers := xfer.NewMultiPublisher(publisherFactory) resolver := newStaticResolver(targets, publishers.Add) defer resolver.Stop() @@ -92,8 +97,6 @@ func main() { } var ( - hostName = hostname() - hostID = hostName // TODO: we should sanitize the hostname taggers = []Tagger{newTopologyTagger(), host.NewTagger(hostID)} reporters = []Reporter{host.NewReporter(hostID, hostName, localNets), endpoint.NewReporter(hostID, hostName, *spyProcs)} processCache *process.CachingWalker diff --git a/xfer/publisher.go b/xfer/publisher.go index 6b457b314..acedbccaf 100644 --- a/xfer/publisher.go +++ b/xfer/publisher.go @@ -22,10 +22,17 @@ type Publisher interface { type HTTPPublisher struct { url string token string + id string } +// ScopeProbeIDHeader is the header we use to carry the probe's unique ID. The +// ID is currently set to the probe's hostname. It's designed to deduplicate +// reports from the same probe to the same receiver, in case the probe is +// configured to publish to multiple receivers that resolve to the same app. +const ScopeProbeIDHeader = "X-Scope-Probe-ID" + // NewHTTPPublisher returns an HTTPPublisher ready for use. -func NewHTTPPublisher(target, token string) (*HTTPPublisher, error) { +func NewHTTPPublisher(target, token, id string) (*HTTPPublisher, error) { if !strings.HasPrefix(target, "http") { target = "http://" + target } @@ -39,6 +46,7 @@ func NewHTTPPublisher(target, token string) (*HTTPPublisher, error) { return &HTTPPublisher{ url: u.String(), token: token, + id: id, }, nil } @@ -53,6 +61,7 @@ func (p HTTPPublisher) Publish(rpt report.Report) error { return err } req.Header.Set("Authorization", AuthorizationHeader(p.token)) + req.Header.Set(ScopeProbeIDHeader, p.id) resp, err := http.DefaultClient.Do(req) if err != nil { return err diff --git a/xfer/publisher_test.go b/xfer/publisher_test.go index 2618bf34a..6849bfd93 100644 --- a/xfer/publisher_test.go +++ b/xfer/publisher_test.go @@ -6,6 +6,7 @@ import ( "net/http/httptest" "reflect" "testing" + "time" "github.com/weaveworks/scope/report" "github.com/weaveworks/scope/test" @@ -15,13 +16,18 @@ import ( func TestHTTPPublisher(t *testing.T) { var ( token = "abcdefg" + id = "1234567" rpt = report.MakeReport() + done = make(chan struct{}) ) s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if want, have := xfer.AuthorizationHeader(token), r.Header.Get("Authorization"); want != have { t.Errorf("want %q, have %q", want, have) } + if want, have := id, r.Header.Get(xfer.ScopeProbeIDHeader); want != have { + t.Errorf("want %q, have %q", want, have) + } var have report.Report if err := gob.NewDecoder(r.Body).Decode(&have); err != nil { t.Error(err) @@ -32,16 +38,23 @@ func TestHTTPPublisher(t *testing.T) { return } w.WriteHeader(http.StatusOK) + close(done) })) defer s.Close() - p, err := xfer.NewHTTPPublisher(s.URL, token) + p, err := xfer.NewHTTPPublisher(s.URL, token, id) if err != nil { t.Fatal(err) } if err := p.Publish(rpt); err != nil { t.Error(err) } + + select { + case <-done: + case <-time.After(time.Millisecond): + t.Error("timeout") + } } func TestMultiPublisher(t *testing.T) {