mirror of
https://github.com/weaveworks/scope.git
synced 2026-03-02 17:50:39 +00:00
Merge pull request #726 from weaveworks/630-merge-appclient
Merge http publisher and app client.
This commit is contained in:
@@ -23,11 +23,15 @@ func main() {
|
||||
)
|
||||
flag.Parse()
|
||||
|
||||
_, publisher, err := xfer.NewHTTPPublisher(*publish, *publish, "demoprobe", "demoprobe", false)
|
||||
client, err := xfer.NewAppClient(xfer.ProbeConfig{
|
||||
Token: "demoprobe",
|
||||
ProbeID: "demoprobe",
|
||||
Insecure: false,
|
||||
}, *publish, *publish)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
rp := xfer.NewReportPublisher(publisher)
|
||||
rp := xfer.NewReportPublisher(client)
|
||||
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
for range time.Tick(*publishInterval) {
|
||||
|
||||
@@ -34,12 +34,16 @@ func main() {
|
||||
}
|
||||
f.Close()
|
||||
|
||||
_, publisher, err := xfer.NewHTTPPublisher(*publish, *publish, "fixprobe", "fixprobe", false)
|
||||
client, err := xfer.NewAppClient(xfer.ProbeConfig{
|
||||
Token: "fixprobe",
|
||||
ProbeID: "fixprobe",
|
||||
Insecure: false,
|
||||
}, *publish, *publish)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
rp := xfer.NewReportPublisher(publisher)
|
||||
rp := xfer.NewReportPublisher(client)
|
||||
for range time.Tick(*publishInterval) {
|
||||
rp.Publish(fixedReport)
|
||||
}
|
||||
|
||||
@@ -101,17 +101,6 @@ func main() {
|
||||
}
|
||||
log.Printf("publishing to: %s", strings.Join(targets, ", "))
|
||||
|
||||
factory := func(hostname, endpoint string) (string, xfer.Publisher, error) {
|
||||
id, publisher, err := xfer.NewHTTPPublisher(hostname, endpoint, *token, probeID, *insecure)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
return id, xfer.NewBackgroundPublisher(publisher), nil
|
||||
}
|
||||
|
||||
publishers := xfer.NewMultiPublisher(factory)
|
||||
defer publishers.Stop()
|
||||
|
||||
clients := xfer.NewMultiAppClient(xfer.ProbeConfig{
|
||||
Token: *token,
|
||||
ProbeID: probeID,
|
||||
@@ -119,14 +108,14 @@ func main() {
|
||||
}, xfer.ControlHandlerFunc(controls.HandleControlRequest), xfer.NewAppClient)
|
||||
defer clients.Stop()
|
||||
|
||||
resolver := xfer.NewStaticResolver(targets, publishers.Set, clients.Set)
|
||||
resolver := xfer.NewStaticResolver(targets, clients.Set)
|
||||
defer resolver.Stop()
|
||||
|
||||
endpointReporter := endpoint.NewReporter(hostID, hostName, *spyProcs, *useConntrack)
|
||||
defer endpointReporter.Stop()
|
||||
|
||||
processCache := process.NewCachingWalker(process.NewWalker(*procRoot))
|
||||
p := probe.New(*spyInterval, *publishInterval, publishers)
|
||||
p := probe.New(*spyInterval, *publishInterval, clients)
|
||||
p.AddTicker(processCache)
|
||||
p.AddReporter(
|
||||
endpointReporter,
|
||||
|
||||
@@ -2,6 +2,8 @@ package xfer
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"net/rpc"
|
||||
@@ -13,6 +15,11 @@ import (
|
||||
"github.com/weaveworks/scope/common/sanitize"
|
||||
)
|
||||
|
||||
const (
|
||||
initialBackoff = 1 * time.Second
|
||||
maxBackoff = 60 * time.Second
|
||||
)
|
||||
|
||||
// Details are some generic details that can be fetched from /api
|
||||
type Details struct {
|
||||
ID string `json:"id"`
|
||||
@@ -24,6 +31,7 @@ type Details struct {
|
||||
type AppClient interface {
|
||||
Details() (Details, error)
|
||||
ControlConnection(handler ControlHandler)
|
||||
Publish(r io.Reader) error
|
||||
Stop()
|
||||
}
|
||||
|
||||
@@ -35,6 +43,11 @@ type appClient struct {
|
||||
insecure bool
|
||||
client http.Client
|
||||
|
||||
// For publish
|
||||
publishLoop sync.Once
|
||||
readers chan io.Reader
|
||||
|
||||
// For controls
|
||||
controlServerCodecMtx sync.Mutex
|
||||
controlServerCodec rpc.ServerCodec
|
||||
}
|
||||
@@ -46,20 +59,24 @@ func NewAppClient(pc ProbeConfig, hostname, target string) (AppClient, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &appClient{
|
||||
appClient := &appClient{
|
||||
ProbeConfig: pc,
|
||||
quit: make(chan struct{}),
|
||||
readers: make(chan io.Reader),
|
||||
target: target,
|
||||
client: http.Client{
|
||||
Transport: httpTransport,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
return appClient, nil
|
||||
}
|
||||
|
||||
// Stop stops the appClient.
|
||||
func (c *appClient) Stop() {
|
||||
c.controlServerCodecMtx.Lock()
|
||||
defer c.controlServerCodecMtx.Unlock()
|
||||
close(c.readers)
|
||||
close(c.quit)
|
||||
if c.controlServerCodec != nil {
|
||||
c.controlServerCodec.Close()
|
||||
@@ -82,6 +99,32 @@ func (c *appClient) Details() (Details, error) {
|
||||
return result, json.NewDecoder(resp.Body).Decode(&result)
|
||||
}
|
||||
|
||||
func (c *appClient) doWithBackoff(msg string, f func() (bool, error)) {
|
||||
backoff := initialBackoff
|
||||
|
||||
for {
|
||||
done, err := f()
|
||||
if done {
|
||||
return
|
||||
}
|
||||
if err == nil {
|
||||
backoff = initialBackoff
|
||||
continue
|
||||
}
|
||||
|
||||
log.Printf("Error doing %s for %s, backing off %s: %v", msg, c.target, backoff, err)
|
||||
select {
|
||||
case <-time.After(backoff):
|
||||
case <-c.quit:
|
||||
return
|
||||
}
|
||||
backoff *= 2
|
||||
if backoff > maxBackoff {
|
||||
backoff = maxBackoff
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *appClient) controlConnection(handler ControlHandler) error {
|
||||
dialer := websocket.Dialer{}
|
||||
headers := http.Header{}
|
||||
@@ -123,30 +166,58 @@ func (c *appClient) controlConnection(handler ControlHandler) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *appClient) controlConnectionLoop(handler ControlHandler) {
|
||||
defer log.Printf("Control connection to %s exiting", c.target)
|
||||
backoff := initialBackoff
|
||||
|
||||
for {
|
||||
err := c.controlConnection(handler)
|
||||
if err == nil {
|
||||
backoff = initialBackoff
|
||||
continue
|
||||
}
|
||||
|
||||
log.Printf("Error doing controls for %s, backing off %s: %v", c.target, backoff, err)
|
||||
select {
|
||||
case <-time.After(backoff):
|
||||
case <-c.quit:
|
||||
return
|
||||
}
|
||||
backoff *= 2
|
||||
if backoff > maxBackoff {
|
||||
backoff = maxBackoff
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *appClient) ControlConnection(handler ControlHandler) {
|
||||
go c.controlConnectionLoop(handler)
|
||||
go func() {
|
||||
log.Printf("Control connection to %s starting", c.target)
|
||||
defer log.Printf("Control connection to %s exiting", c.target)
|
||||
c.doWithBackoff("controls", func() (bool, error) {
|
||||
return false, c.controlConnection(handler)
|
||||
})
|
||||
}()
|
||||
}
|
||||
|
||||
func (c *appClient) publish(r io.Reader) error {
|
||||
url := sanitize.URL("", 0, "/api/report")(c.target)
|
||||
req, err := c.ProbeConfig.authorizedRequest("POST", url, r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Set("Content-Encoding", "gzip")
|
||||
// req.Header.Set("Content-Type", "application/binary") // TODO: we should use http.DetectContentType(..) on the gob'ed
|
||||
|
||||
resp, err := c.client.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf(resp.Status)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *appClient) startPublishing() {
|
||||
go func() {
|
||||
log.Printf("Publish loop for %s starting", c.target)
|
||||
defer log.Printf("Publish loop for %s exiting", c.target)
|
||||
c.doWithBackoff("publish", func() (bool, error) {
|
||||
r := <-c.readers
|
||||
if r == nil {
|
||||
return true, nil
|
||||
}
|
||||
return false, c.publish(r)
|
||||
})
|
||||
}()
|
||||
}
|
||||
|
||||
// Publish implements Publisher
|
||||
func (c *appClient) Publish(r io.Reader) error {
|
||||
// Lazily start the background publishing loop.
|
||||
c.publishLoop.Do(c.startPublishing)
|
||||
select {
|
||||
case c.readers <- r:
|
||||
default:
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
190
xfer/app_client_internal_test.go
Normal file
190
xfer/app_client_internal_test.go
Normal file
@@ -0,0 +1,190 @@
|
||||
package xfer
|
||||
|
||||
import (
|
||||
"compress/gzip"
|
||||
"encoding/gob"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/handlers"
|
||||
"github.com/weaveworks/scope/report"
|
||||
"github.com/weaveworks/scope/test"
|
||||
)
|
||||
|
||||
type publisherFunc func(io.Reader) error
|
||||
|
||||
func (p publisherFunc) Publish(r io.Reader) error {
|
||||
return p(r)
|
||||
}
|
||||
|
||||
func (publisherFunc) Stop() {}
|
||||
|
||||
func dummyServer(t *testing.T, expectedToken, expectedID string, expectedReport report.Report, done chan struct{}) *httptest.Server {
|
||||
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if have := r.Header.Get("Authorization"); fmt.Sprintf("Scope-Probe token=%s", expectedToken) != have {
|
||||
t.Errorf("want %q, have %q", expectedToken, have)
|
||||
}
|
||||
|
||||
if have := r.Header.Get(ScopeProbeIDHeader); expectedID != have {
|
||||
t.Errorf("want %q, have %q", expectedID, have)
|
||||
}
|
||||
|
||||
var have report.Report
|
||||
|
||||
reader := r.Body
|
||||
var err error
|
||||
if strings.Contains(r.Header.Get("Content-Encoding"), "gzip") {
|
||||
reader, err = gzip.NewReader(r.Body)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
defer reader.Close()
|
||||
}
|
||||
|
||||
if err := gob.NewDecoder(reader).Decode(&have); err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
if !reflect.DeepEqual(expectedReport, have) {
|
||||
t.Error(test.Diff(expectedReport, have))
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
close(done)
|
||||
})
|
||||
|
||||
return httptest.NewServer(handlers.CompressHandler(handler))
|
||||
}
|
||||
|
||||
func TestAppClientPublishInternal(t *testing.T) {
|
||||
var (
|
||||
token = "abcdefg"
|
||||
id = "1234567"
|
||||
rpt = report.MakeReport()
|
||||
done = make(chan struct{})
|
||||
)
|
||||
|
||||
s := dummyServer(t, token, id, rpt, done)
|
||||
defer s.Close()
|
||||
|
||||
u, err := url.Parse(s.URL)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
pc := ProbeConfig{
|
||||
Token: token,
|
||||
ProbeID: id,
|
||||
Insecure: false,
|
||||
}
|
||||
|
||||
p, err := NewAppClient(pc, u.Host, s.URL)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer p.Stop()
|
||||
rp := NewReportPublisher(publisherFunc(p.(*appClient).publish))
|
||||
if err := rp.Publish(rpt); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Error("timeout")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAppClientDetails(t *testing.T) {
|
||||
var (
|
||||
token = "abcdefg"
|
||||
id = "1234567"
|
||||
rpt = report.MakeReport()
|
||||
done = make(chan struct{})
|
||||
)
|
||||
|
||||
s := dummyServer(t, token, id, rpt, done)
|
||||
defer s.Close()
|
||||
|
||||
u, err := url.Parse(s.URL)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
pc := ProbeConfig{
|
||||
Token: token,
|
||||
ProbeID: id,
|
||||
Insecure: false,
|
||||
}
|
||||
|
||||
p, err := NewAppClient(pc, u.Host, s.URL)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer p.Stop()
|
||||
|
||||
// First few reports might be dropped as the client is spinning up.
|
||||
rp := NewReportPublisher(p)
|
||||
for i := 0; i < 3; i++ {
|
||||
if err := rp.Publish(rpt); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Error("timeout")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAppClientPublish(t *testing.T) {
|
||||
var (
|
||||
id = "foobarbaz"
|
||||
version = "imalittleteapot"
|
||||
want = Details{ID: id, Version: version}
|
||||
)
|
||||
|
||||
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if err := json.NewEncoder(w).Encode(want); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
})
|
||||
|
||||
s := httptest.NewServer(handlers.CompressHandler(handler))
|
||||
defer s.Close()
|
||||
|
||||
u, err := url.Parse(s.URL)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
pc := ProbeConfig{
|
||||
Token: "",
|
||||
ProbeID: "",
|
||||
Insecure: false,
|
||||
}
|
||||
p, err := NewAppClient(pc, u.Host, s.URL)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer p.Stop()
|
||||
|
||||
have, err := p.Details()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !reflect.DeepEqual(want, have) {
|
||||
t.Error(test.Diff(want, have))
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -1,70 +0,0 @@
|
||||
package xfer
|
||||
|
||||
import (
|
||||
"io"
|
||||
"log"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
initialBackoff = 1 * time.Second
|
||||
maxBackoff = 60 * time.Second
|
||||
)
|
||||
|
||||
// BackgroundPublisher is a publisher which does the publish asynchronously.
|
||||
// It will only do one publish at once; if there is an ongoing publish,
|
||||
// concurrent publishes are dropped.
|
||||
type BackgroundPublisher struct {
|
||||
publisher Publisher
|
||||
readers chan io.Reader
|
||||
quit chan struct{}
|
||||
}
|
||||
|
||||
// NewBackgroundPublisher creates a new BackgroundPublisher with the given publisher
|
||||
func NewBackgroundPublisher(p Publisher) *BackgroundPublisher {
|
||||
bp := &BackgroundPublisher{
|
||||
publisher: p,
|
||||
readers: make(chan io.Reader),
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
go bp.loop()
|
||||
return bp
|
||||
}
|
||||
|
||||
func (bp *BackgroundPublisher) loop() {
|
||||
backoff := initialBackoff
|
||||
|
||||
for r := range bp.readers {
|
||||
err := bp.publisher.Publish(r)
|
||||
if err == nil {
|
||||
backoff = initialBackoff
|
||||
continue
|
||||
}
|
||||
|
||||
log.Printf("Error publishing to %s, backing off %s: %v", bp.publisher, backoff, err)
|
||||
select {
|
||||
case <-time.After(backoff):
|
||||
case <-bp.quit:
|
||||
}
|
||||
backoff *= 2
|
||||
if backoff > maxBackoff {
|
||||
backoff = maxBackoff
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Publish implements Publisher
|
||||
func (bp *BackgroundPublisher) Publish(r io.Reader) error {
|
||||
select {
|
||||
case bp.readers <- r:
|
||||
default:
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop implements Publisher
|
||||
func (bp *BackgroundPublisher) Stop() {
|
||||
close(bp.readers)
|
||||
close(bp.quit)
|
||||
bp.publisher.Stop()
|
||||
}
|
||||
@@ -1,29 +0,0 @@
|
||||
package xfer_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"runtime"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/weaveworks/scope/test"
|
||||
"github.com/weaveworks/scope/xfer"
|
||||
)
|
||||
|
||||
func TestBackgroundPublisher(t *testing.T) {
|
||||
mp := mockPublisher{}
|
||||
backgroundPublisher := xfer.NewBackgroundPublisher(&mp)
|
||||
defer backgroundPublisher.Stop()
|
||||
runtime.Gosched()
|
||||
|
||||
for i := 1; i <= 10; i++ {
|
||||
err := backgroundPublisher.Publish(&bytes.Buffer{})
|
||||
if err != nil {
|
||||
t.Fatalf("%v", err)
|
||||
}
|
||||
|
||||
test.Poll(t, 100*time.Millisecond, i, func() interface{} {
|
||||
return mp.count
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -1,95 +0,0 @@
|
||||
package xfer
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/weaveworks/scope/common/sanitize"
|
||||
)
|
||||
|
||||
// HTTPPublisher publishes buffers by POST to a fixed endpoint.
|
||||
type HTTPPublisher struct {
|
||||
ProbeConfig
|
||||
|
||||
url string
|
||||
client *http.Client
|
||||
}
|
||||
|
||||
// NewHTTPPublisher returns an HTTPPublisher ready for use.
|
||||
func NewHTTPPublisher(hostname, target, token, probeID string, insecure bool) (string, *HTTPPublisher, error) {
|
||||
pc := ProbeConfig{
|
||||
Token: token,
|
||||
ProbeID: probeID,
|
||||
Insecure: insecure,
|
||||
}
|
||||
|
||||
httpTransport, err := pc.getHTTPTransport(hostname)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
|
||||
p := &HTTPPublisher{
|
||||
ProbeConfig: pc,
|
||||
url: sanitize.URL("", 0, "/api/report")(target),
|
||||
client: &http.Client{
|
||||
Transport: httpTransport,
|
||||
},
|
||||
}
|
||||
|
||||
client := &http.Client{
|
||||
Timeout: 5 * time.Second,
|
||||
Transport: httpTransport,
|
||||
}
|
||||
req, err := pc.authorizedRequest("GET", sanitize.URL("", 0, "/api")(target), nil)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
var apiResponse struct {
|
||||
ID string `json:"id"`
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&apiResponse); err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
return apiResponse.ID, p, nil
|
||||
}
|
||||
|
||||
func (p HTTPPublisher) String() string {
|
||||
return p.url
|
||||
}
|
||||
|
||||
// Publish publishes the report to the URL.
|
||||
func (p HTTPPublisher) Publish(r io.Reader) error {
|
||||
req, err := p.ProbeConfig.authorizedRequest("POST", p.url, r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Set("Content-Encoding", "gzip")
|
||||
// req.Header.Set("Content-Type", "application/binary") // TODO: we should use http.DetectContentType(..) on the gob'ed
|
||||
|
||||
resp, err := p.client.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf(resp.Status)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop implements Publisher
|
||||
func (p *HTTPPublisher) Stop() {
|
||||
// We replace the HTTPPublishers pretty regularly, so we need to ensure the
|
||||
// underlying connections get closed, or we end up with lots of idle
|
||||
// goroutines on the server (see #604)
|
||||
p.client.Transport.(*http.Transport).CloseIdleConnections()
|
||||
}
|
||||
@@ -1,90 +0,0 @@
|
||||
package xfer_test
|
||||
|
||||
import (
|
||||
"compress/gzip"
|
||||
"encoding/gob"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/handlers"
|
||||
"github.com/weaveworks/scope/report"
|
||||
"github.com/weaveworks/scope/test"
|
||||
"github.com/weaveworks/scope/xfer"
|
||||
)
|
||||
|
||||
func TestHTTPPublisher(t *testing.T) {
|
||||
var (
|
||||
token = "abcdefg"
|
||||
id = "1234567"
|
||||
rpt = report.MakeReport()
|
||||
done = make(chan struct{})
|
||||
)
|
||||
|
||||
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if want, have := fmt.Sprintf("Scope-Probe token=%s", token), r.Header.Get("Authorization"); want != have {
|
||||
t.Errorf("want %q, have %q", want, have)
|
||||
}
|
||||
|
||||
if want, have := id, r.Header.Get(xfer.ScopeProbeIDHeader); want != have {
|
||||
t.Errorf("want %q, have %q", want, have)
|
||||
}
|
||||
|
||||
if r.URL.Path == "/api" {
|
||||
_ = json.NewEncoder(w).Encode(map[string]string{"id": "irrelevant"})
|
||||
return
|
||||
}
|
||||
|
||||
var have report.Report
|
||||
|
||||
reader := r.Body
|
||||
var err error
|
||||
if strings.Contains(r.Header.Get("Content-Encoding"), "gzip") {
|
||||
reader, err = gzip.NewReader(r.Body)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
defer reader.Close()
|
||||
}
|
||||
|
||||
if err := gob.NewDecoder(reader).Decode(&have); err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
if want := rpt; !reflect.DeepEqual(want, have) {
|
||||
t.Error(test.Diff(want, have))
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
close(done)
|
||||
})
|
||||
|
||||
s := httptest.NewServer(handlers.CompressHandler(handler))
|
||||
defer s.Close()
|
||||
|
||||
u, err := url.Parse(s.URL)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_, p, err := xfer.NewHTTPPublisher(u.Host, s.URL, token, id, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
rp := xfer.NewReportPublisher(p)
|
||||
if err := rp.Publish(rpt); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(time.Millisecond):
|
||||
t.Error("timeout")
|
||||
}
|
||||
}
|
||||
@@ -1,12 +1,19 @@
|
||||
package xfer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/weaveworks/scope/report"
|
||||
)
|
||||
|
||||
const maxConcurrentGET = 10
|
||||
|
||||
// ClientFactory is a thing thats makes AppClients
|
||||
type ClientFactory func(ProbeConfig, string, string) (AppClient, error)
|
||||
|
||||
@@ -33,6 +40,7 @@ type clientTuple struct {
|
||||
type MultiAppClient interface {
|
||||
Set(hostname string, endpoints []string)
|
||||
Stop()
|
||||
Publish(io.Reader) error
|
||||
}
|
||||
|
||||
// NewMultiAppClient creates a new MultiAppClient.
|
||||
@@ -116,3 +124,39 @@ func (c *multiClient) Stop() {
|
||||
c.clients = map[string]AppClient{}
|
||||
close(c.quit)
|
||||
}
|
||||
|
||||
// Publish implements Publisher by publishing the reader to all of the
|
||||
// underlying publishers sequentially. To do that, it needs to drain the
|
||||
// reader, and recreate new readers for each publisher. Note that it will
|
||||
// publish to one endpoint for each unique ID. Failed publishes don't count.
|
||||
func (c *multiClient) Publish(r io.Reader) error {
|
||||
buf, err := ioutil.ReadAll(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
errs := []string{}
|
||||
for _, c := range c.clients {
|
||||
if err := c.Publish(bytes.NewReader(buf)); err != nil {
|
||||
errs = append(errs, err.Error())
|
||||
}
|
||||
}
|
||||
if len(errs) > 0 {
|
||||
return errors.New(strings.Join(errs, "; "))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type semaphore chan struct{}
|
||||
|
||||
func newSemaphore(n int) semaphore {
|
||||
c := make(chan struct{}, n)
|
||||
for i := 0; i < n; i++ {
|
||||
c <- struct{}{}
|
||||
}
|
||||
return semaphore(c)
|
||||
}
|
||||
func (s semaphore) acquire() { <-s }
|
||||
func (s semaphore) release() { s <- struct{}{} }
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package xfer_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"runtime"
|
||||
"testing"
|
||||
|
||||
@@ -11,6 +13,7 @@ type mockClient struct {
|
||||
id string
|
||||
count int
|
||||
stopped int
|
||||
publish int
|
||||
}
|
||||
|
||||
func (c *mockClient) Details() (xfer.Details, error) {
|
||||
@@ -25,26 +28,33 @@ func (c *mockClient) Stop() {
|
||||
c.stopped++
|
||||
}
|
||||
|
||||
func (c *mockClient) Publish(io.Reader) error {
|
||||
c.publish++
|
||||
return nil
|
||||
}
|
||||
|
||||
var (
|
||||
a1 = &mockClient{id: "1"} // hostname a, app id 1
|
||||
a2 = &mockClient{id: "2"} // hostname a, app id 2
|
||||
b2 = &mockClient{id: "2"} // hostname b, app id 2 (duplicate)
|
||||
b3 = &mockClient{id: "3"} // hostname b, app id 3
|
||||
factory = func(_ xfer.ProbeConfig, hostname, target string) (xfer.AppClient, error) {
|
||||
switch target {
|
||||
case "a1":
|
||||
return a1, nil
|
||||
case "a2":
|
||||
return a2, nil
|
||||
case "b2":
|
||||
return b2, nil
|
||||
case "b3":
|
||||
return b3, nil
|
||||
}
|
||||
panic(target)
|
||||
}
|
||||
)
|
||||
|
||||
func TestMultiClient(t *testing.T) {
|
||||
var (
|
||||
a1 = &mockClient{id: "1"} // hostname a, app id 1
|
||||
a2 = &mockClient{id: "2"} // hostname a, app id 2
|
||||
b2 = &mockClient{id: "2"} // hostname b, app id 2 (duplicate)
|
||||
b3 = &mockClient{id: "3"} // hostname b, app id 3
|
||||
factory = func(_ xfer.ProbeConfig, hostname, target string) (xfer.AppClient, error) {
|
||||
switch target {
|
||||
case "a1":
|
||||
return a1, nil
|
||||
case "a2":
|
||||
return a2, nil
|
||||
case "b2":
|
||||
return b2, nil
|
||||
case "b3":
|
||||
return b3, nil
|
||||
}
|
||||
t.Fatal(target)
|
||||
return a1, nil
|
||||
}
|
||||
controlHandler = xfer.ControlHandlerFunc(func(_ xfer.Request) xfer.Response {
|
||||
return xfer.Response{}
|
||||
})
|
||||
@@ -76,3 +86,22 @@ func TestMultiClient(t *testing.T) {
|
||||
mp.Set("b", []string{})
|
||||
expect(b3.stopped, 1)
|
||||
}
|
||||
|
||||
func TestMultiClientPublish(t *testing.T) {
|
||||
mp := xfer.NewMultiAppClient(xfer.ProbeConfig{}, nil, factory)
|
||||
defer mp.Stop()
|
||||
|
||||
sum := func() int { return a1.publish + a2.publish + b2.publish + b3.publish }
|
||||
|
||||
mp.Set("a", []string{"a1", "a2"})
|
||||
mp.Set("b", []string{"b2", "b3"})
|
||||
|
||||
for i := 1; i < 10; i++ {
|
||||
if err := mp.Publish(&bytes.Buffer{}); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if want, have := 3*i, sum(); want != have {
|
||||
t.Errorf("want %d, have %d", want, have)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,144 +0,0 @@
|
||||
package xfer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// MultiPublisher implements publisher over a collection of heterogeneous
|
||||
// targets. See documentation of each method to understand the semantics.
|
||||
type MultiPublisher struct {
|
||||
mtx sync.Mutex
|
||||
factory func(hostname, endpoint string) (string, Publisher, error)
|
||||
sema semaphore
|
||||
list []tuple
|
||||
}
|
||||
|
||||
// NewMultiPublisher returns a new MultiPublisher ready for use.
|
||||
func NewMultiPublisher(factory func(hostname, endpoint string) (string, Publisher, error)) *MultiPublisher {
|
||||
return &MultiPublisher{
|
||||
factory: factory,
|
||||
sema: newSemaphore(maxConcurrentGET),
|
||||
}
|
||||
}
|
||||
|
||||
type tuple struct {
|
||||
publisher Publisher
|
||||
target string // DNS name
|
||||
endpoint string // IP addr
|
||||
id string // unique ID from app
|
||||
err error // if factory failed
|
||||
}
|
||||
|
||||
const maxConcurrentGET = 10
|
||||
|
||||
// Set declares that the target (DNS name) resolves to the provided endpoints
|
||||
// (IPs), and that we want to publish to each of those endpoints. Set replaces
|
||||
// any existing publishers to the given target. Set invokes the factory method
|
||||
// to convert each endpoint to a publisher, and to get the remote receiver's
|
||||
// unique ID.
|
||||
func (p *MultiPublisher) Set(target string, endpoints []string) {
|
||||
// Convert endpoints to publishers.
|
||||
c := make(chan tuple, len(endpoints))
|
||||
for _, endpoint := range endpoints {
|
||||
go func(endpoint string) {
|
||||
p.sema.acquire()
|
||||
defer p.sema.release()
|
||||
id, publisher, err := p.factory(target, endpoint)
|
||||
c <- tuple{publisher, target, endpoint, id, err}
|
||||
}(endpoint)
|
||||
}
|
||||
list := make([]tuple, 0, len(p.list)+len(endpoints))
|
||||
for i := 0; i < cap(c); i++ {
|
||||
t := <-c
|
||||
if t.err != nil {
|
||||
log.Printf("multi-publisher set: %s (%s): %v", t.target, t.endpoint, t.err)
|
||||
continue
|
||||
}
|
||||
list = append(list, t)
|
||||
}
|
||||
|
||||
// Copy all other tuples over to the new list.
|
||||
p.mtx.Lock()
|
||||
defer p.mtx.Unlock()
|
||||
p.list = p.appendFilter(list, func(t tuple) bool { return t.target != target })
|
||||
}
|
||||
|
||||
// Delete removes all endpoints that match the given target.
|
||||
func (p *MultiPublisher) Delete(target string) {
|
||||
p.mtx.Lock()
|
||||
defer p.mtx.Unlock()
|
||||
p.list = p.appendFilter([]tuple{}, func(t tuple) bool { return t.target != target })
|
||||
}
|
||||
|
||||
// Publish implements Publisher by publishing the reader to all of the
|
||||
// underlying publishers sequentially. To do that, it needs to drain the
|
||||
// reader, and recreate new readers for each publisher. Note that it will
|
||||
// publish to one endpoint for each unique ID. Failed publishes don't count.
|
||||
func (p *MultiPublisher) Publish(r io.Reader) error {
|
||||
buf, err := ioutil.ReadAll(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var (
|
||||
ids = map[string]struct{}{}
|
||||
errs = []string{}
|
||||
)
|
||||
|
||||
p.mtx.Lock()
|
||||
defer p.mtx.Unlock()
|
||||
|
||||
for _, t := range p.list {
|
||||
if _, ok := ids[t.id]; ok {
|
||||
continue
|
||||
}
|
||||
if err := t.publisher.Publish(bytes.NewReader(buf)); err != nil {
|
||||
errs = append(errs, err.Error())
|
||||
continue
|
||||
}
|
||||
ids[t.id] = struct{}{} // sent already
|
||||
}
|
||||
if len(errs) > 0 {
|
||||
return errors.New(strings.Join(errs, "; "))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop invokes stop on all underlying publishers and removes them.
|
||||
func (p *MultiPublisher) Stop() {
|
||||
p.mtx.Lock()
|
||||
defer p.mtx.Unlock()
|
||||
for _, t := range p.list {
|
||||
t.publisher.Stop()
|
||||
}
|
||||
p.list = []tuple{}
|
||||
}
|
||||
|
||||
func (p *MultiPublisher) appendFilter(list []tuple, f func(tuple) bool) []tuple {
|
||||
for _, t := range p.list {
|
||||
if !f(t) {
|
||||
t.publisher.Stop()
|
||||
continue
|
||||
}
|
||||
list = append(list, t)
|
||||
}
|
||||
return list
|
||||
}
|
||||
|
||||
type semaphore chan struct{}
|
||||
|
||||
func newSemaphore(n int) semaphore {
|
||||
c := make(chan struct{}, n)
|
||||
for i := 0; i < n; i++ {
|
||||
c <- struct{}{}
|
||||
}
|
||||
return semaphore(c)
|
||||
}
|
||||
func (s semaphore) acquire() { <-s }
|
||||
func (s semaphore) release() { s <- struct{}{} }
|
||||
@@ -1,54 +0,0 @@
|
||||
package xfer_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
"github.com/weaveworks/scope/xfer"
|
||||
)
|
||||
|
||||
func TestMultiPublisher(t *testing.T) {
|
||||
var (
|
||||
a1 = &mockPublisher{} // target a, endpoint 1
|
||||
a2 = &mockPublisher{} // target a, endpoint 2 (duplicate)
|
||||
b2 = &mockPublisher{} // target b, endpoint 2 (duplicate)
|
||||
b3 = &mockPublisher{} // target b, endpoint 3
|
||||
)
|
||||
|
||||
sum := func() int { return a1.count + a2.count + b2.count + b3.count }
|
||||
|
||||
mp := xfer.NewMultiPublisher(func(hostname, endpoint string) (string, xfer.Publisher, error) {
|
||||
switch endpoint {
|
||||
case "a1":
|
||||
return "1", a1, nil
|
||||
case "a2":
|
||||
return "2", a2, nil
|
||||
case "b2":
|
||||
return "2", b2, nil
|
||||
case "b3":
|
||||
return "3", b3, nil
|
||||
default:
|
||||
return "", nil, fmt.Errorf("invalid endpoint %s", endpoint)
|
||||
}
|
||||
})
|
||||
defer mp.Stop()
|
||||
|
||||
mp.Set("a", []string{"a1", "a2"})
|
||||
mp.Set("b", []string{"b2", "b3"})
|
||||
|
||||
for i := 1; i < 10; i++ {
|
||||
if err := mp.Publish(&bytes.Buffer{}); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if want, have := 3*i, sum(); want != have {
|
||||
t.Errorf("want %d, have %d", want, have)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type mockPublisher struct{ count int }
|
||||
|
||||
func (p *mockPublisher) Publish(io.Reader) error { p.count++; return nil }
|
||||
func (p *mockPublisher) Stop() {}
|
||||
Reference in New Issue
Block a user