Merge pull request #485 from weaveworks/463-dedupe-probe-posts

Dedupe probe POSTs
This commit is contained in:
Peter Bourgon
2015-09-25 10:10:21 +02:00
19 changed files with 606 additions and 383 deletions

View File

@@ -16,8 +16,13 @@ import (
"github.com/weaveworks/scope/xfer"
)
// Set during buildtime.
var version = "dev"
var (
// Set at buildtime.
version = "dev"
// Set at runtime.
uniqueID = "0"
)
func main() {
var (
@@ -33,8 +38,8 @@ func main() {
}
rand.Seed(time.Now().UnixNano())
id := strconv.FormatInt(rand.Int63(), 16)
log.Printf("app starting, version %s, ID %s", version, id)
uniqueID = strconv.FormatInt(rand.Int63(), 16)
log.Printf("app starting, version %s, ID %s", version, uniqueID)
c := xfer.NewCollector(*window)
http.Handle("/", Router(c))

View File

@@ -121,11 +121,12 @@ func captureTopology(rep xfer.Reporter, f func(xfer.Reporter, topologyView, http
// APIDetails are some generic details that can be fetched from /api
type APIDetails struct {
ID string `json:"id"`
Version string `json:"version"`
}
func apiHandler(w http.ResponseWriter, r *http.Request) {
respondWith(w, http.StatusOK, APIDetails{Version: version})
respondWith(w, http.StatusOK, APIDetails{ID: uniqueID, Version: version})
}
// Topology option labels should tell the current state. The first item must

View File

@@ -0,0 +1,39 @@
package sanitize
import (
"fmt"
"log"
"net"
"net/url"
"strings"
)
// URL returns a function that sanitizes a URL string. It lets underspecified
// strings to be converted to usable URLs via some default arguments.
func URL(scheme string, port int, path string) func(string) string {
if scheme == "" {
scheme = "http://"
}
return func(s string) string {
if s == "" {
return s // can't do much here
}
if !strings.HasPrefix(s, "http") {
s = scheme + s
}
u, err := url.Parse(s)
if err != nil {
log.Printf("%q: %v", s, err)
return s // oh well
}
if port > 0 {
if _, _, err = net.SplitHostPort(u.Host); err != nil {
u.Host += fmt.Sprintf(":%d", port)
}
}
if path != "" && u.Path != path {
u.Path = path
}
return u.String()
}
}

View File

@@ -0,0 +1,34 @@
package sanitize_test
import (
"testing"
"github.com/weaveworks/scope/common/sanitize"
)
func TestSanitizeURL(t *testing.T) {
for _, input := range []struct {
scheme string
port int
path string
input string
want string
}{
{"", 0, "", "", ""},
{"", 0, "", "foo", "http://foo"},
{"", 80, "", "foo", "http://foo:80"},
{"", 0, "some/path", "foo", "http://foo/some/path"},
{"", 0, "/some/path", "foo", "http://foo/some/path"},
{"https://", 0, "", "foo", "https://foo"},
{"https://", 80, "", "foo", "https://foo:80"},
{"https://", 0, "some/path", "foo", "https://foo/some/path"},
{"https://", 0, "", "http://foo", "http://foo"}, // specified scheme beats default...
{"", 9999, "", "foo:80", "http://foo:80"}, // specified port beats default...
{"", 0, "/bar", "foo/baz", "http://foo/bar"}, // ...but default path beats specified!
} {
if want, have := input.want, sanitize.URL(input.scheme, input.port, input.path)(input.input); want != have {
t.Errorf("sanitize.URL(%q, %d, %q)(%q): want %q, have %q", input.scheme, input.port, input.path, input.input, want, have)
continue
}
}
}

View File

@@ -23,7 +23,7 @@ func main() {
)
flag.Parse()
publisher, err := xfer.NewHTTPPublisher(*publish, "demoprobe", "demoprobe")
_, publisher, err := xfer.NewHTTPPublisher(*publish, "demoprobe", "demoprobe")
if err != nil {
log.Fatal(err)
}

View File

@@ -34,7 +34,7 @@ func main() {
}
f.Close()
publisher, err := xfer.NewHTTPPublisher(*publish, "fixprobe", "fixprobe")
_, publisher, err := xfer.NewHTTPPublisher(*publish, "fixprobe", "fixprobe")
if err != nil {
log.Fatal(err)
}

View File

@@ -80,16 +80,16 @@ func main() {
log.Printf("warning: process reporting enabled, but that requires root to find everything")
}
publisherFactory := func(target string) (xfer.Publisher, error) {
publisher, err := xfer.NewHTTPPublisher(target, *token, probeID)
factory := func(endpoint string) (string, xfer.Publisher, error) {
id, publisher, err := xfer.NewHTTPPublisher(endpoint, *token, probeID)
if err != nil {
return nil, err
return "", nil, err
}
return xfer.NewBackgroundPublisher(publisher), nil
return id, xfer.NewBackgroundPublisher(publisher), nil
}
publishers := xfer.NewMultiPublisher(publisherFactory)
publishers := xfer.NewMultiPublisher(factory)
defer publishers.Stop()
resolver := newStaticResolver(targets, publishers.Add)
resolver := newStaticResolver(targets, publishers.Set)
defer resolver.Stop()
addrs, err := net.InterfaceAddrs()
@@ -133,10 +133,7 @@ func main() {
}
if *weaveRouterAddr != "" {
weave, err := overlay.NewWeave(hostID, *weaveRouterAddr)
if err != nil {
log.Fatalf("failed to start Weave tagger: %v", err)
}
weave := overlay.NewWeave(hostID, *weaveRouterAddr)
tickers = append(tickers, weave)
taggers = append(taggers, weave)
reporters = append(reporters, weave)

View File

@@ -5,14 +5,13 @@ import (
"encoding/json"
"fmt"
"log"
"net"
"net/http"
"net/url"
"regexp"
"strings"
"sync"
"github.com/weaveworks/scope/common/exec"
"github.com/weaveworks/scope/common/sanitize"
"github.com/weaveworks/scope/probe/docker"
"github.com/weaveworks/scope/report"
)
@@ -68,15 +67,11 @@ type weaveStatus struct {
// NewWeave returns a new Weave tagger based on the Weave router at
// address. The address should be an IP or FQDN, no port.
func NewWeave(hostID, weaveRouterAddress string) (*Weave, error) {
s, err := sanitize("http://", 6784, "/report")(weaveRouterAddress)
if err != nil {
return nil, err
}
func NewWeave(hostID, weaveRouterAddress string) *Weave {
return &Weave{
url: s,
url: sanitize.URL("http://", 6784, "/report")(weaveRouterAddress),
hostID: hostID,
}, nil
}
}
// Tick implements Ticker
@@ -202,25 +197,3 @@ func (w *Weave) Report() (report.Report, error) {
}
return r, nil
}
func sanitize(scheme string, port int, path string) func(string) (string, error) {
return func(s string) (string, error) {
if s == "" {
return "", fmt.Errorf("no host")
}
if !strings.HasPrefix(s, "http") {
s = scheme + s
}
u, err := url.Parse(s)
if err != nil {
return "", err
}
if _, _, err = net.SplitHostPort(u.Host); err != nil {
u.Host += fmt.Sprintf(":%d", port)
}
if u.Path != path {
u.Path = path
}
return u.String(), nil
}
}

View File

@@ -25,11 +25,7 @@ func TestWeaveTaggerOverlayTopology(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(mockWeaveRouter))
defer s.Close()
w, err := overlay.NewWeave(mockHostID, s.URL)
if err != nil {
t.Fatal(err)
}
w := overlay.NewWeave(mockHostID, s.URL)
w.Tick()
{

View File

@@ -16,92 +16,96 @@ var (
)
type staticResolver struct {
quit chan struct{}
add func(string)
peers []peer
set func(string, []string)
targets []target
quit chan struct{}
}
type peer struct {
hostname string
port string
}
type target struct{ host, port string }
// NewResolver starts a new resolver that periodically
// tries to resolve peers and the calls add() with all the
// resolved IPs. It explictiy supports hostnames which
// resolve to multiple IPs; it will repeatedly call
// add with the same IP, expecting the target to dedupe.
func newStaticResolver(peers []string, add func(string)) staticResolver {
func (t target) String() string { return net.JoinHostPort(t.host, t.port) }
// newStaticResolver periodically resolves the targets, and calls the set
// function with all the resolved IPs. It explictiy supports targets which
// resolve to multiple IPs.
func newStaticResolver(targets []string, set func(target string, endpoints []string)) staticResolver {
r := staticResolver{
quit: make(chan struct{}),
add: add,
peers: prepareNames(peers),
targets: prepare(targets),
set: set,
quit: make(chan struct{}),
}
go r.loop()
return r
}
func prepareNames(strs []string) []peer {
var results []peer
for _, s := range strs {
var (
hostname string
port string
)
if strings.Contains(s, ":") {
var err error
hostname, port, err = net.SplitHostPort(s)
if err != nil {
log.Printf("invalid address %s: %v", s, err)
continue
}
} else {
hostname, port = s, strconv.Itoa(xfer.AppPort)
}
results = append(results, peer{hostname, port})
}
return results
}
func (r staticResolver) loop() {
r.resolveHosts()
r.resolve()
t := tick(time.Minute)
for {
select {
case <-t:
r.resolveHosts()
r.resolve()
case <-r.quit:
return
}
}
}
func (r staticResolver) resolveHosts() {
for _, peer := range r.peers {
var addrs []net.IP
if addr := net.ParseIP(peer.hostname); addr != nil {
addrs = []net.IP{addr}
} else {
var err error
addrs, err = lookupIP(peer.hostname)
if err != nil {
continue
}
}
for _, addr := range addrs {
// For now, ignore IPv6
if addr.To4() == nil {
continue
}
r.add(net.JoinHostPort(addr.String(), peer.port))
}
}
}
func (r staticResolver) Stop() {
close(r.quit)
}
func prepare(strs []string) []target {
var targets []target
for _, s := range strs {
var host, port string
if strings.Contains(s, ":") {
var err error
host, port, err = net.SplitHostPort(s)
if err != nil {
log.Printf("invalid address %s: %v", s, err)
continue
}
} else {
host, port = s, strconv.Itoa(xfer.AppPort)
}
targets = append(targets, target{host, port})
}
return targets
}
func (r staticResolver) resolve() {
for t, endpoints := range resolveMany(r.targets) {
r.set(t.String(), endpoints)
}
}
func resolveMany(targets []target) map[target][]string {
result := map[target][]string{}
for _, t := range targets {
result[t] = resolveOne(t)
}
return result
}
func resolveOne(t target) []string {
var addrs []net.IP
if addr := net.ParseIP(t.host); addr != nil {
addrs = []net.IP{addr}
} else {
var err error
addrs, err = lookupIP(t.host)
if err != nil {
return []string{}
}
}
endpoints := make([]string, 0, len(addrs))
for _, addr := range addrs {
// For now, ignore IPv6
if addr.To4() == nil {
continue
}
endpoints = append(endpoints, net.JoinHostPort(addr.String(), t.port))
}
return endpoints
}

View File

@@ -39,47 +39,53 @@ func TestResolver(t *testing.T) {
port := ":80"
ip1 := "192.168.0.1"
ip2 := "192.168.0.10"
adds := make(chan string)
add := func(s string) { adds <- s }
sets := make(chan string)
set := func(target string, endpoints []string) {
for _, endpoint := range endpoints {
sets <- endpoint
}
}
r := newStaticResolver([]string{"symbolic.name" + port, "namewithnoport", ip1 + port, ip2}, add)
r := newStaticResolver([]string{"symbolic.name" + port, "namewithnoport", ip1 + port, ip2}, set)
assertAdd := func(want string) {
assertAdd := func(want ...string) {
remaining := map[string]struct{}{}
for _, s := range want {
remaining[s] = struct{}{}
}
_, _, line, _ := runtime.Caller(1)
select {
case have := <-adds:
if want != have {
t.Errorf("line %d: want %q, have %q", line, want, have)
for len(remaining) > 0 {
select {
case s := <-sets:
if _, ok := remaining[s]; ok {
t.Logf("line %d: got %q OK", line, s)
delete(remaining, s)
} else {
t.Errorf("line %d: got unexpected %q", line, s)
}
case <-time.After(100 * time.Millisecond):
t.Fatalf("line %d: didn't get the adds in time", line)
}
case <-time.After(100 * time.Millisecond):
t.Fatalf("line %d: didn't get add in time", line)
}
}
// Initial resolve should just give us IPs
assertAdd(ip1 + port)
assertAdd(fmt.Sprintf("%s:%d", ip2, xfer.AppPort))
assertAdd(ip1+port, fmt.Sprintf("%s:%d", ip2, xfer.AppPort))
// Trigger another resolve with a tick; again,
// just want ips.
c <- time.Now()
assertAdd(ip1 + port)
assertAdd(fmt.Sprintf("%s:%d", ip2, xfer.AppPort))
assertAdd(ip1+port, fmt.Sprintf("%s:%d", ip2, xfer.AppPort))
ip3 := "1.2.3.4"
updateIPs("symbolic.name", makeIPs(ip3))
c <- time.Now() // trigger a resolve
assertAdd(ip3 + port) // we want 1 add
assertAdd(ip1 + port)
assertAdd(fmt.Sprintf("%s:%d", ip2, xfer.AppPort))
c <- time.Now() // trigger a resolve
assertAdd(ip3+port, ip1+port, fmt.Sprintf("%s:%d", ip2, xfer.AppPort))
ip4 := "10.10.10.10"
updateIPs("symbolic.name", makeIPs(ip3, ip4))
c <- time.Now() // trigger another resolve, this time with 2 adds
assertAdd(ip3 + port) // first add
assertAdd(ip4 + port) // second add
assertAdd(ip1 + port)
assertAdd(fmt.Sprintf("%s:%d", ip2, xfer.AppPort))
c <- time.Now() // trigger another resolve, this time with 2 adds
assertAdd(ip3+port, ip4+port, ip1+port, fmt.Sprintf("%s:%d", ip2, xfer.AppPort))
done := make(chan struct{})
go func() { r.Stop(); close(done) }()

View File

@@ -0,0 +1,70 @@
package xfer
import (
"io"
"log"
"time"
)
const (
initialBackoff = 1 * time.Second
maxBackoff = 60 * time.Second
)
// BackgroundPublisher is a publisher which does the publish asynchronously.
// It will only do one publish at once; if there is an ongoing publish,
// concurrent publishes are dropped.
type BackgroundPublisher struct {
publisher Publisher
readers chan io.Reader
quit chan struct{}
}
// NewBackgroundPublisher creates a new BackgroundPublisher with the given publisher
func NewBackgroundPublisher(p Publisher) *BackgroundPublisher {
bp := &BackgroundPublisher{
publisher: p,
readers: make(chan io.Reader),
quit: make(chan struct{}),
}
go bp.loop()
return bp
}
func (bp *BackgroundPublisher) loop() {
backoff := initialBackoff
for r := range bp.readers {
err := bp.publisher.Publish(r)
if err == nil {
backoff = initialBackoff
continue
}
log.Printf("Error publishing to %s, backing off %s: %v", bp.publisher, backoff, err)
select {
case <-time.After(backoff):
case <-bp.quit:
}
backoff *= 2
if backoff > maxBackoff {
backoff = maxBackoff
}
}
}
// Publish implements Publisher
func (bp *BackgroundPublisher) Publish(r io.Reader) error {
select {
case bp.readers <- r:
default:
}
return nil
}
// Stop implements Publisher
func (bp *BackgroundPublisher) Stop() {
close(bp.readers)
close(bp.quit)
bp.publisher.Stop()
}

View File

@@ -0,0 +1,7 @@
package xfer_test
import "testing"
func TestBackgroundPublisher(t *testing.T) {
t.Skip("TODO")
}

85
xfer/http_publisher.go Normal file
View File

@@ -0,0 +1,85 @@
package xfer
import (
"encoding/json"
"fmt"
"io"
"net/http"
"time"
"github.com/weaveworks/scope/common/sanitize"
)
// HTTPPublisher publishes buffers by POST to a fixed endpoint.
type HTTPPublisher struct {
url string
token string
probeID string
}
var fastClient = http.Client{
Timeout: 5 * time.Second,
}
// NewHTTPPublisher returns an HTTPPublisher ready for use.
func NewHTTPPublisher(target, token, probeID string) (string, *HTTPPublisher, error) {
targetAPI := sanitize.URL("http://", 0, "/api")(target)
resp, err := fastClient.Get(targetAPI)
if err != nil {
return "", nil, err
}
defer resp.Body.Close()
var apiResponse struct {
ID string `json:"id"`
}
if err := json.NewDecoder(resp.Body).Decode(&apiResponse); err != nil {
return "", nil, err
}
return apiResponse.ID, &HTTPPublisher{
url: sanitize.URL("http://", 0, "/api/report")(target),
token: token,
probeID: probeID,
}, nil
}
func (p HTTPPublisher) String() string {
return p.url
}
// Publish publishes the report to the URL.
func (p HTTPPublisher) Publish(r io.Reader) error {
req, err := http.NewRequest("POST", p.url, r)
if err != nil {
return err
}
req.Header.Set("Authorization", AuthorizationHeader(p.token))
req.Header.Set(ScopeProbeIDHeader, p.probeID)
req.Header.Set("Content-Encoding", "gzip")
// req.Header.Set("Content-Type", "application/binary") // TODO: we should use http.DetectContentType(..) on the gob'ed
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf(resp.Status)
}
return nil
}
// Stop implements Publisher
func (p HTTPPublisher) Stop() {}
// AuthorizationHeader returns a value suitable for an HTTP Authorization
// header, based on the passed token string.
func AuthorizationHeader(token string) string {
return fmt.Sprintf("Scope-Probe token=%s", token)
}
// ScopeProbeIDHeader is the header we use to carry the probe's unique ID. The
// ID is currently set to the probe's hostname. It's designed to deduplicate
// reports from the same probe to the same receiver, in case the probe is
// configured to publish to multiple receivers that resolve to the same app.
const ScopeProbeIDHeader = "X-Scope-Probe-ID"

View File

@@ -1,9 +1,9 @@
package xfer_test
import (
"bytes"
"compress/gzip"
"encoding/gob"
"encoding/json"
"net/http"
"net/http/httptest"
"reflect"
@@ -12,7 +12,6 @@ import (
"time"
"github.com/gorilla/handlers"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/test"
"github.com/weaveworks/scope/xfer"
@@ -27,6 +26,11 @@ func TestHTTPPublisher(t *testing.T) {
)
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/api" {
_ = json.NewEncoder(w).Encode(map[string]string{"id": "irrelevant"})
return
}
if want, have := xfer.AuthorizationHeader(token), r.Header.Get("Authorization"); want != have {
t.Errorf("want %q, have %q", want, have)
}
@@ -61,7 +65,7 @@ func TestHTTPPublisher(t *testing.T) {
s := httptest.NewServer(handlers.CompressHandler(handler))
defer s.Close()
p, err := xfer.NewHTTPPublisher(s.URL, token, id)
_, p, err := xfer.NewHTTPPublisher(s.URL, token, id)
if err != nil {
t.Fatal(err)
}
@@ -76,32 +80,3 @@ func TestHTTPPublisher(t *testing.T) {
t.Error("timeout")
}
}
func TestMultiPublisher(t *testing.T) {
var (
p = &mockPublisher{}
factory = func(string) (xfer.Publisher, error) { return p, nil }
multiPublisher = xfer.NewMultiPublisher(factory)
)
multiPublisher.Add("first")
if err := multiPublisher.Publish(&bytes.Buffer{}); err != nil {
t.Error(err)
}
if want, have := 1, p.count; want != have {
t.Errorf("want %d, have %d", want, have)
}
multiPublisher.Add("second") // but factory returns same mockPublisher
if err := multiPublisher.Publish(&bytes.Buffer{}); err != nil {
t.Error(err)
}
if want, have := 3, p.count; want != have {
t.Errorf("want %d, have %d", want, have)
}
}
type mockPublisher struct{ count int }
func (p *mockPublisher) Publish(*bytes.Buffer) error { p.count++; return nil }
func (p *mockPublisher) Stop() {}

143
xfer/multi_publisher.go Normal file
View File

@@ -0,0 +1,143 @@
package xfer
import (
"bytes"
"errors"
"io"
"io/ioutil"
"log"
"strings"
"sync"
)
// MultiPublisher implements publisher over a collection of heterogeneous
// targets. See documentation of each method to understand the semantics.
type MultiPublisher struct {
mtx sync.Mutex
factory func(endpoint string) (string, Publisher, error)
sema semaphore
list []tuple
}
// NewMultiPublisher returns a new MultiPublisher ready for use.
func NewMultiPublisher(factory func(endpoint string) (string, Publisher, error)) *MultiPublisher {
return &MultiPublisher{
factory: factory,
sema: newSemaphore(maxConcurrentGET),
}
}
type tuple struct {
publisher Publisher
target string // DNS name
endpoint string // IP addr
id string // unique ID from app
err error // if factory failed
}
const maxConcurrentGET = 10
// Set declares that the target (DNS name) resolves to the provided endpoints
// (IPs), and that we want to publish to each of those endpoints. Set replaces
// any existing publishers to the given target. Set invokes the factory method
// to convert each endpoint to a publisher, and to get the remote receiver's
// unique ID.
func (p *MultiPublisher) Set(target string, endpoints []string) {
// Convert endpoints to publishers.
c := make(chan tuple, len(endpoints))
for _, endpoint := range endpoints {
go func(endpoint string) {
p.sema.p()
defer p.sema.v()
id, publisher, err := p.factory(endpoint)
c <- tuple{publisher, target, endpoint, id, err}
}(endpoint)
}
list := make([]tuple, 0, len(p.list)+len(endpoints))
for i := 0; i < cap(c); i++ {
t := <-c
if t.err != nil {
log.Printf("multi-publisher set: %s (%s): %v", t.target, t.endpoint, t.err)
continue
}
list = append(list, t)
}
// Copy all other tuples over to the new list.
p.mtx.Lock()
defer p.mtx.Unlock()
p.list = p.appendFilter(list, func(t tuple) bool { return t.target != target })
}
// Delete removes all endpoints that match the given target.
func (p *MultiPublisher) Delete(target string) {
p.mtx.Lock()
defer p.mtx.Unlock()
p.list = p.appendFilter([]tuple{}, func(t tuple) bool { return t.target != target })
}
// Publish implements Publisher by publishing the reader to all of the
// underlying publishers sequentially. To do that, it needs to drain the
// reader, and recreate new readers for each publisher. Note that it will
// publish to one endpoint for each unique ID. Failed publishes don't count.
func (p *MultiPublisher) Publish(r io.Reader) error {
buf, err := ioutil.ReadAll(r)
if err != nil {
return err
}
var (
ids = map[string]struct{}{}
errs = []string{}
)
p.mtx.Lock()
defer p.mtx.Unlock()
for _, t := range p.list {
if _, ok := ids[t.id]; ok {
continue
}
if err := t.publisher.Publish(bytes.NewReader(buf)); err != nil {
errs = append(errs, err.Error())
continue
}
ids[t.id] = struct{}{} // sent already
}
if len(errs) > 0 {
return errors.New(strings.Join(errs, "; "))
}
return nil
}
// Stop invokes stop on all underlying publishers and removes them.
func (p *MultiPublisher) Stop() {
p.mtx.Lock()
defer p.mtx.Unlock()
for _, t := range p.list {
t.publisher.Stop()
}
p.list = []tuple{}
}
func (p *MultiPublisher) appendFilter(list []tuple, f func(tuple) bool) []tuple {
for _, t := range p.list {
if !f(t) {
continue
}
list = append(list, t)
}
return list
}
type semaphore chan struct{}
func newSemaphore(n int) semaphore {
c := make(chan struct{}, n)
for i := 0; i < n; i++ {
c <- struct{}{}
}
return semaphore(c)
}
func (s semaphore) p() { <-s }
func (s semaphore) v() { s <- struct{}{} }

View File

@@ -0,0 +1,40 @@
package xfer
import (
"testing"
"time"
)
func TestSemaphore(t *testing.T) {
n := 3
s := newSemaphore(n)
// First n should be fine
for i := 0; i < n; i++ {
ok := make(chan struct{})
go func() { s.p(); close(ok) }()
select {
case <-ok:
case <-time.After(10 * time.Millisecond):
t.Errorf("p (%d) failed", i+1)
}
}
// This should block
ok := make(chan struct{})
go func() { s.p(); close(ok) }()
select {
case <-ok:
t.Errorf("%dth p OK, but should block", n+1)
case <-time.After(10 * time.Millisecond):
//t.Logf("%dth p blocks, as expected", n+1)
}
s.v()
select {
case <-ok:
case <-time.After(10 * time.Millisecond):
t.Errorf("%dth p didn't resolve in time", n+1)
}
}

View File

@@ -0,0 +1,53 @@
package xfer_test
import (
"bytes"
"fmt"
"io"
"testing"
"github.com/weaveworks/scope/xfer"
)
func TestMultiPublisher(t *testing.T) {
var (
a1 = &mockPublisher{} // target a, endpoint 1
a2 = &mockPublisher{} // target a, endpoint 2 (duplicate)
b2 = &mockPublisher{} // target b, endpoint 2 (duplicate)
b3 = &mockPublisher{} // target b, endpoint 3
)
sum := func() int { return a1.count + a2.count + b2.count + b3.count }
mp := xfer.NewMultiPublisher(func(endpoint string) (string, xfer.Publisher, error) {
switch endpoint {
case "a1":
return "1", a1, nil
case "a2":
return "2", a2, nil
case "b2":
return "2", b2, nil
case "b3":
return "3", b3, nil
default:
return "", nil, fmt.Errorf("invalid endpoint %s", endpoint)
}
})
mp.Set("a", []string{"a1", "a2"})
mp.Set("b", []string{"b2", "b3"})
for i := 1; i < 10; i++ {
if err := mp.Publish(&bytes.Buffer{}); err != nil {
t.Error(err)
}
if want, have := 3*i, sum(); want != have {
t.Errorf("want %d, have %d", want, have)
}
}
}
type mockPublisher struct{ count int }
func (p *mockPublisher) Publish(io.Reader) error { p.count++; return nil }
func (p *mockPublisher) Stop() {}

View File

@@ -1,215 +1,10 @@
package xfer
import (
"bytes"
"fmt"
"log"
"net/http"
"net/url"
"strings"
"sync"
"time"
)
import "io"
const (
initialBackoff = 1 * time.Second
maxBackoff = 60 * time.Second
)
// Publisher is something which can send a buffered set of data somewhere,
// probably to a collector.
// Publisher is something which can send a stream of data somewhere, probably
// to a remote collector.
type Publisher interface {
Publish(*bytes.Buffer) error
Publish(io.Reader) error
Stop()
}
// HTTPPublisher publishes reports by POST to a fixed endpoint.
type HTTPPublisher struct {
url string
token string
id string
}
// ScopeProbeIDHeader is the header we use to carry the probe's unique ID. The
// ID is currently set to the probe's hostname. It's designed to deduplicate
// reports from the same probe to the same receiver, in case the probe is
// configured to publish to multiple receivers that resolve to the same app.
const ScopeProbeIDHeader = "X-Scope-Probe-ID"
// NewHTTPPublisher returns an HTTPPublisher ready for use.
func NewHTTPPublisher(target, token, id string) (*HTTPPublisher, error) {
if !strings.HasPrefix(target, "http") {
target = "http://" + target
}
u, err := url.Parse(target)
if err != nil {
return nil, err
}
if u.Path == "" {
u.Path = "/api/report"
}
return &HTTPPublisher{
url: u.String(),
token: token,
id: id,
}, nil
}
func (p HTTPPublisher) String() string {
return p.url
}
// Publish publishes the report to the URL.
func (p HTTPPublisher) Publish(buf *bytes.Buffer) error {
req, err := http.NewRequest("POST", p.url, buf)
if err != nil {
return err
}
req.Header.Set("Authorization", AuthorizationHeader(p.token))
req.Header.Set(ScopeProbeIDHeader, p.id)
req.Header.Set("Content-Encoding", "gzip")
// req.Header.Set("Content-Type", "application/binary") // TODO: we should use http.DetectContentType(..) on the gob'ed
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf(resp.Status)
}
return nil
}
// Stop implements Publisher
func (p HTTPPublisher) Stop() {}
// AuthorizationHeader returns a value suitable for an HTTP Authorization
// header, based on the passed token string.
func AuthorizationHeader(token string) string {
return fmt.Sprintf("Scope-Probe token=%s", token)
}
// BackgroundPublisher is a publisher which does the publish asynchronously.
// It will only do one publish at once; if there is an ongoing publish,
// concurrent publishes are dropped.
type BackgroundPublisher struct {
publisher Publisher
reports chan *bytes.Buffer
quit chan struct{}
}
// NewBackgroundPublisher creates a new BackgroundPublisher with the given publisher
func NewBackgroundPublisher(p Publisher) *BackgroundPublisher {
result := &BackgroundPublisher{
publisher: p,
reports: make(chan *bytes.Buffer),
quit: make(chan struct{}),
}
go result.loop()
return result
}
func (b *BackgroundPublisher) loop() {
backoff := initialBackoff
for r := range b.reports {
err := b.publisher.Publish(r)
if err == nil {
backoff = initialBackoff
continue
}
log.Printf("Error publishing to %s, backing off %s: %v", b.publisher, backoff, err)
select {
case <-time.After(backoff):
case <-b.quit:
}
backoff = backoff * 2
if backoff > maxBackoff {
backoff = maxBackoff
}
}
}
// Publish implements Publisher
func (b *BackgroundPublisher) Publish(buf *bytes.Buffer) error {
select {
case b.reports <- buf:
default:
}
return nil
}
// Stop implements Publisher
func (b *BackgroundPublisher) Stop() {
close(b.reports)
close(b.quit)
b.publisher.Stop()
}
// MultiPublisher implements Publisher over a set of publishers.
type MultiPublisher struct {
mtx sync.RWMutex
factory func(string) (Publisher, error)
m map[string]Publisher
}
// NewMultiPublisher returns a new MultiPublisher ready for use. The factory
// should be e.g. NewHTTPPublisher, except you need to curry it over the
// probe token.
func NewMultiPublisher(factory func(string) (Publisher, error)) *MultiPublisher {
return &MultiPublisher{
factory: factory,
m: map[string]Publisher{},
}
}
// Add allows additional targets to be added dynamically. It will dedupe
// identical targets. TODO we have no good mechanism to remove.
func (p *MultiPublisher) Add(target string) {
p.mtx.Lock()
defer p.mtx.Unlock()
if _, ok := p.m[target]; ok {
return
}
publisher, err := p.factory(target)
if err != nil {
log.Printf("multi-publisher: %v", err)
return
}
p.m[target] = publisher
}
// Publish implements Publisher by emitting the report to all publishers.
func (p *MultiPublisher) Publish(buf *bytes.Buffer) error {
p.mtx.RLock()
defer p.mtx.RUnlock()
var errs []string
for _, publisher := range p.m {
if err := publisher.Publish(bytes.NewBuffer(buf.Bytes())); err != nil {
errs = append(errs, err.Error())
}
}
if len(errs) > 0 {
return fmt.Errorf(strings.Join(errs, "; "))
}
return nil
}
// Stop implements Publisher
func (p *MultiPublisher) Stop() {
p.mtx.RLock()
defer p.mtx.RUnlock()
for _, publisher := range p.m {
publisher.Stop()
}
}