Merge pull request #351 from weaveworks/probe-id-header

Add X-Scope-Probe-ID header to POSTs
This commit is contained in:
Peter Bourgon
2015-08-13 14:03:38 +02:00
5 changed files with 33 additions and 8 deletions

View File

@@ -23,7 +23,7 @@ func main() {
)
flag.Parse()
publisher, err := xfer.NewHTTPPublisher(*publish, "demoprobe")
publisher, err := xfer.NewHTTPPublisher(*publish, "demoprobe", "demoprobe")
if err != nil {
log.Fatal(err)
}

View File

@@ -34,7 +34,7 @@ func main() {
}
f.Close()
publisher, err := xfer.NewHTTPPublisher(*publish, "fixprobe")
publisher, err := xfer.NewHTTPPublisher(*publish, "fixprobe", "fixprobe")
if err != nil {
log.Fatal(err)
}

View File

@@ -48,7 +48,12 @@ func main() {
)
flag.Parse()
log.Printf("probe starting, version %s", version)
var (
hostName = hostname()
hostID = hostName // TODO: we should sanitize the hostname
probeID = hostName // TODO: does this need to be a random string instead?
)
log.Printf("probe starting, version %s, ID %s", version, probeID)
if len(flag.Args()) > 0 {
targets = flag.Args()
@@ -74,7 +79,7 @@ func main() {
log.Printf("warning: process reporting enabled, but that requires root to find everything")
}
publisherFactory := func(target string) (xfer.Publisher, error) { return xfer.NewHTTPPublisher(target, *token) }
publisherFactory := func(target string) (xfer.Publisher, error) { return xfer.NewHTTPPublisher(target, *token, probeID) }
publishers := xfer.NewMultiPublisher(publisherFactory)
resolver := newStaticResolver(targets, publishers.Add)
defer resolver.Stop()
@@ -92,8 +97,6 @@ func main() {
}
var (
hostName = hostname()
hostID = hostName // TODO: we should sanitize the hostname
taggers = []Tagger{newTopologyTagger(), host.NewTagger(hostID)}
reporters = []Reporter{host.NewReporter(hostID, hostName, localNets), endpoint.NewReporter(hostID, hostName, *spyProcs)}
processCache *process.CachingWalker

View File

@@ -22,10 +22,17 @@ type Publisher interface {
type HTTPPublisher struct {
url string
token string
id string
}
// ScopeProbeIDHeader is the header we use to carry the probe's unique ID. The
// ID is currently set to the probe's hostname. It's designed to deduplicate
// reports from the same probe to the same receiver, in case the probe is
// configured to publish to multiple receivers that resolve to the same app.
const ScopeProbeIDHeader = "X-Scope-Probe-ID"
// NewHTTPPublisher returns an HTTPPublisher ready for use.
func NewHTTPPublisher(target, token string) (*HTTPPublisher, error) {
func NewHTTPPublisher(target, token, id string) (*HTTPPublisher, error) {
if !strings.HasPrefix(target, "http") {
target = "http://" + target
}
@@ -39,6 +46,7 @@ func NewHTTPPublisher(target, token string) (*HTTPPublisher, error) {
return &HTTPPublisher{
url: u.String(),
token: token,
id: id,
}, nil
}
@@ -53,6 +61,7 @@ func (p HTTPPublisher) Publish(rpt report.Report) error {
return err
}
req.Header.Set("Authorization", AuthorizationHeader(p.token))
req.Header.Set(ScopeProbeIDHeader, p.id)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err

View File

@@ -6,6 +6,7 @@ import (
"net/http/httptest"
"reflect"
"testing"
"time"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/test"
@@ -15,13 +16,18 @@ import (
func TestHTTPPublisher(t *testing.T) {
var (
token = "abcdefg"
id = "1234567"
rpt = report.MakeReport()
done = make(chan struct{})
)
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if want, have := xfer.AuthorizationHeader(token), r.Header.Get("Authorization"); want != have {
t.Errorf("want %q, have %q", want, have)
}
if want, have := id, r.Header.Get(xfer.ScopeProbeIDHeader); want != have {
t.Errorf("want %q, have %q", want, have)
}
var have report.Report
if err := gob.NewDecoder(r.Body).Decode(&have); err != nil {
t.Error(err)
@@ -32,16 +38,23 @@ func TestHTTPPublisher(t *testing.T) {
return
}
w.WriteHeader(http.StatusOK)
close(done)
}))
defer s.Close()
p, err := xfer.NewHTTPPublisher(s.URL, token)
p, err := xfer.NewHTTPPublisher(s.URL, token, id)
if err != nil {
t.Fatal(err)
}
if err := p.Publish(rpt); err != nil {
t.Error(err)
}
select {
case <-done:
case <-time.After(time.Millisecond):
t.Error("timeout")
}
}
func TestMultiPublisher(t *testing.T) {