Merge http publish and app client.

This commit is contained in:
Tom Wilkie
2015-12-04 12:17:21 +00:00
parent d921b528d8
commit 4f4d986571
11 changed files with 214 additions and 463 deletions

View File

@@ -101,17 +101,6 @@ func main() {
}
log.Printf("publishing to: %s", strings.Join(targets, ", "))
factory := func(hostname, endpoint string) (string, xfer.Publisher, error) {
id, publisher, err := xfer.NewHTTPPublisher(hostname, endpoint, *token, probeID, *insecure)
if err != nil {
return "", nil, err
}
return id, xfer.NewBackgroundPublisher(publisher), nil
}
publishers := xfer.NewMultiPublisher(factory)
defer publishers.Stop()
clients := xfer.NewMultiAppClient(xfer.ProbeConfig{
Token: *token,
ProbeID: probeID,
@@ -119,14 +108,14 @@ func main() {
}, xfer.ControlHandlerFunc(controls.HandleControlRequest), xfer.NewAppClient)
defer clients.Stop()
resolver := xfer.NewStaticResolver(targets, publishers.Set, clients.Set)
resolver := xfer.NewStaticResolver(targets, clients.Set)
defer resolver.Stop()
endpointReporter := endpoint.NewReporter(hostID, hostName, *spyProcs, *useConntrack)
defer endpointReporter.Stop()
processCache := process.NewCachingWalker(process.NewWalker(*procRoot))
p := probe.New(*spyInterval, *publishInterval, publishers)
p := probe.New(*spyInterval, *publishInterval, clients)
p.AddTicker(processCache)
p.AddReporter(
endpointReporter,

View File

@@ -2,6 +2,8 @@ package xfer
import (
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/rpc"
@@ -13,6 +15,11 @@ import (
"github.com/weaveworks/scope/common/sanitize"
)
const (
initialBackoff = 1 * time.Second
maxBackoff = 60 * time.Second
)
// Details are some generic details that can be fetched from /api
type Details struct {
ID string `json:"id"`
@@ -23,6 +30,7 @@ type Details struct {
type AppClient interface {
Details() (Details, error)
ControlConnection(handler ControlHandler)
Publish(r io.Reader) error
Stop()
}
@@ -34,6 +42,11 @@ type appClient struct {
insecure bool
client http.Client
// For publish
publishLoop sync.Once
readers chan io.Reader
// For controls
controlServerCodecMtx sync.Mutex
controlServerCodec rpc.ServerCodec
}
@@ -45,20 +58,24 @@ func NewAppClient(pc ProbeConfig, hostname, target string) (AppClient, error) {
return nil, err
}
return &appClient{
appClient := &appClient{
ProbeConfig: pc,
quit: make(chan struct{}),
readers: make(chan io.Reader),
target: target,
client: http.Client{
Transport: httpTransport,
},
}, nil
}
return appClient, nil
}
// Stop stops the appClient.
func (c *appClient) Stop() {
c.controlServerCodecMtx.Lock()
defer c.controlServerCodecMtx.Unlock()
close(c.readers)
close(c.quit)
if c.controlServerCodec != nil {
c.controlServerCodec.Close()
@@ -81,6 +98,32 @@ func (c *appClient) Details() (Details, error) {
return result, json.NewDecoder(resp.Body).Decode(&result)
}
func (c *appClient) doWithBackoff(msg string, f func() (bool, error)) {
backoff := initialBackoff
for {
again, err := f()
if !again {
return
}
if err == nil {
backoff = initialBackoff
continue
}
log.Printf("Error doing %s for %s, backing off %s: %v", msg, c.target, backoff, err)
select {
case <-time.After(backoff):
case <-c.quit:
return
}
backoff *= 2
if backoff > maxBackoff {
backoff = maxBackoff
}
}
}
func (c *appClient) controlConnection(handler ControlHandler) error {
dialer := websocket.Dialer{}
headers := http.Header{}
@@ -122,30 +165,58 @@ func (c *appClient) controlConnection(handler ControlHandler) error {
return nil
}
func (c *appClient) controlConnectionLoop(handler ControlHandler) {
defer log.Printf("Control connection to %s exiting", c.target)
backoff := initialBackoff
for {
err := c.controlConnection(handler)
if err == nil {
backoff = initialBackoff
continue
}
log.Printf("Error doing controls for %s, backing off %s: %v", c.target, backoff, err)
select {
case <-time.After(backoff):
case <-c.quit:
return
}
backoff *= 2
if backoff > maxBackoff {
backoff = maxBackoff
}
}
}
func (c *appClient) ControlConnection(handler ControlHandler) {
go c.controlConnectionLoop(handler)
go func() {
log.Printf("Control connection to %s starting", c.target)
defer log.Printf("Control connection to %s exiting", c.target)
c.doWithBackoff("controls", func() (bool, error) {
return true, c.controlConnection(handler)
})
}()
}
func (c *appClient) publish(r io.Reader) error {
url := sanitize.URL("", 0, "/api/report")(c.target)
req, err := c.ProbeConfig.authorizedRequest("POST", url, r)
if err != nil {
return err
}
req.Header.Set("Content-Encoding", "gzip")
// req.Header.Set("Content-Type", "application/binary") // TODO: we should use http.DetectContentType(..) on the gob'ed
resp, err := c.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf(resp.Status)
}
return nil
}
func (c *appClient) startPublishing() {
go func() {
log.Printf("Publish loop for %s starting", c.target)
defer log.Printf("Publish loop for %s exiting", c.target)
c.doWithBackoff("publish", func() (bool, error) {
r := <-c.readers
if r == nil {
return false, nil
}
return true, c.publish(r)
})
}()
}
// Publish implements Publisher
func (c *appClient) Publish(r io.Reader) error {
// Lazily start the background publishing loop.
c.publishLoop.Do(c.startPublishing)
select {
case c.readers <- r:
default:
}
return nil
}

View File

@@ -1,10 +1,10 @@
package xfer_test
package xfer
import (
"compress/gzip"
"encoding/gob"
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/url"
@@ -16,10 +16,17 @@ import (
"github.com/gorilla/handlers"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/test"
"github.com/weaveworks/scope/xfer"
)
func TestHTTPPublisher(t *testing.T) {
type publisherFunc func(io.Reader) error
func (p publisherFunc) Publish(r io.Reader) error {
return p(r)
}
func (publisherFunc) Stop() {}
func TestAppClientPublish(t *testing.T) {
var (
token = "abcdefg"
id = "1234567"
@@ -32,15 +39,10 @@ func TestHTTPPublisher(t *testing.T) {
t.Errorf("want %q, have %q", want, have)
}
if want, have := id, r.Header.Get(xfer.ScopeProbeIDHeader); want != have {
if want, have := id, r.Header.Get(ScopeProbeIDHeader); want != have {
t.Errorf("want %q, have %q", want, have)
}
if r.URL.Path == "/api" {
_ = json.NewEncoder(w).Encode(map[string]string{"id": "irrelevant"})
return
}
var have report.Report
reader := r.Body
@@ -73,18 +75,26 @@ func TestHTTPPublisher(t *testing.T) {
if err != nil {
t.Fatal(err)
}
_, p, err := xfer.NewHTTPPublisher(u.Host, s.URL, token, id, false)
pc := ProbeConfig{
Token: token,
ProbeID: id,
Insecure: false,
}
p, err := NewAppClient(pc, u.Host, s.URL)
if err != nil {
t.Fatal(err)
}
rp := xfer.NewReportPublisher(p)
defer p.Stop()
rp := NewReportPublisher(publisherFunc(p.(*appClient).publish))
if err := rp.Publish(rpt); err != nil {
t.Error(err)
}
select {
case <-done:
case <-time.After(time.Millisecond):
case <-time.After(100 * time.Millisecond):
t.Error("timeout")
}
}

View File

@@ -1,70 +0,0 @@
package xfer
import (
"io"
"log"
"time"
)
const (
initialBackoff = 1 * time.Second
maxBackoff = 60 * time.Second
)
// BackgroundPublisher is a publisher which does the publish asynchronously.
// It will only do one publish at once; if there is an ongoing publish,
// concurrent publishes are dropped.
type BackgroundPublisher struct {
publisher Publisher
readers chan io.Reader
quit chan struct{}
}
// NewBackgroundPublisher creates a new BackgroundPublisher with the given publisher
func NewBackgroundPublisher(p Publisher) *BackgroundPublisher {
bp := &BackgroundPublisher{
publisher: p,
readers: make(chan io.Reader),
quit: make(chan struct{}),
}
go bp.loop()
return bp
}
func (bp *BackgroundPublisher) loop() {
backoff := initialBackoff
for r := range bp.readers {
err := bp.publisher.Publish(r)
if err == nil {
backoff = initialBackoff
continue
}
log.Printf("Error publishing to %s, backing off %s: %v", bp.publisher, backoff, err)
select {
case <-time.After(backoff):
case <-bp.quit:
}
backoff *= 2
if backoff > maxBackoff {
backoff = maxBackoff
}
}
}
// Publish implements Publisher
func (bp *BackgroundPublisher) Publish(r io.Reader) error {
select {
case bp.readers <- r:
default:
}
return nil
}
// Stop implements Publisher
func (bp *BackgroundPublisher) Stop() {
close(bp.readers)
close(bp.quit)
bp.publisher.Stop()
}

View File

@@ -1,29 +0,0 @@
package xfer_test
import (
"bytes"
"runtime"
"testing"
"time"
"github.com/weaveworks/scope/test"
"github.com/weaveworks/scope/xfer"
)
func TestBackgroundPublisher(t *testing.T) {
mp := mockPublisher{}
backgroundPublisher := xfer.NewBackgroundPublisher(&mp)
defer backgroundPublisher.Stop()
runtime.Gosched()
for i := 1; i <= 10; i++ {
err := backgroundPublisher.Publish(&bytes.Buffer{})
if err != nil {
t.Fatalf("%v", err)
}
test.Poll(t, 100*time.Millisecond, i, func() interface{} {
return mp.count
})
}
}

View File

@@ -1,95 +0,0 @@
package xfer
import (
"encoding/json"
"fmt"
"io"
"net/http"
"time"
"github.com/weaveworks/scope/common/sanitize"
)
// HTTPPublisher publishes buffers by POST to a fixed endpoint.
type HTTPPublisher struct {
ProbeConfig
url string
client *http.Client
}
// NewHTTPPublisher returns an HTTPPublisher ready for use.
func NewHTTPPublisher(hostname, target, token, probeID string, insecure bool) (string, *HTTPPublisher, error) {
pc := ProbeConfig{
Token: token,
ProbeID: probeID,
Insecure: insecure,
}
httpTransport, err := pc.getHTTPTransport(hostname)
if err != nil {
return "", nil, err
}
p := &HTTPPublisher{
ProbeConfig: pc,
url: sanitize.URL("", 0, "/api/report")(target),
client: &http.Client{
Transport: httpTransport,
},
}
client := &http.Client{
Timeout: 5 * time.Second,
Transport: httpTransport,
}
req, err := pc.authorizedRequest("GET", sanitize.URL("", 0, "/api")(target), nil)
if err != nil {
return "", nil, err
}
resp, err := client.Do(req)
if err != nil {
return "", nil, err
}
defer resp.Body.Close()
var apiResponse struct {
ID string `json:"id"`
}
if err := json.NewDecoder(resp.Body).Decode(&apiResponse); err != nil {
return "", nil, err
}
return apiResponse.ID, p, nil
}
func (p HTTPPublisher) String() string {
return p.url
}
// Publish publishes the report to the URL.
func (p HTTPPublisher) Publish(r io.Reader) error {
req, err := p.ProbeConfig.authorizedRequest("POST", p.url, r)
if err != nil {
return err
}
req.Header.Set("Content-Encoding", "gzip")
// req.Header.Set("Content-Type", "application/binary") // TODO: we should use http.DetectContentType(..) on the gob'ed
resp, err := p.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf(resp.Status)
}
return nil
}
// Stop implements Publisher
func (p *HTTPPublisher) Stop() {
// We replace the HTTPPublishers pretty regularly, so we need to ensure the
// underlying connections get closed, or we end up with lots of idle
// goroutines on the server (see #604)
p.client.Transport.(*http.Transport).CloseIdleConnections()
}

View File

@@ -1,12 +1,19 @@
package xfer
import (
"bytes"
"errors"
"io"
"io/ioutil"
"log"
"strings"
"sync"
"github.com/weaveworks/scope/report"
)
const maxConcurrentGET = 10
// ClientFactory is a thing thats makes AppClients
type ClientFactory func(ProbeConfig, string, string) (AppClient, error)
@@ -33,6 +40,7 @@ type clientTuple struct {
type MultiAppClient interface {
Set(hostname string, endpoints []string)
Stop()
Publish(io.Reader) error
}
// NewMultiAppClient creates a new MultiAppClient.
@@ -116,3 +124,39 @@ func (c *multiClient) Stop() {
c.clients = map[string]AppClient{}
close(c.quit)
}
// Publish implements Publisher by publishing the reader to all of the
// underlying publishers sequentially. To do that, it needs to drain the
// reader, and recreate new readers for each publisher. Note that it will
// publish to one endpoint for each unique ID. Failed publishes don't count.
func (c *multiClient) Publish(r io.Reader) error {
buf, err := ioutil.ReadAll(r)
if err != nil {
return err
}
c.mtx.Lock()
defer c.mtx.Unlock()
errs := []string{}
for _, c := range c.clients {
if err := c.Publish(bytes.NewReader(buf)); err != nil {
errs = append(errs, err.Error())
}
}
if len(errs) > 0 {
return errors.New(strings.Join(errs, "; "))
}
return nil
}
type semaphore chan struct{}
func newSemaphore(n int) semaphore {
c := make(chan struct{}, n)
for i := 0; i < n; i++ {
c <- struct{}{}
}
return semaphore(c)
}
func (s semaphore) acquire() { <-s }
func (s semaphore) release() { s <- struct{}{} }

View File

@@ -1,6 +1,8 @@
package xfer_test
import (
"bytes"
"io"
"runtime"
"testing"
@@ -11,6 +13,7 @@ type mockClient struct {
id string
count int
stopped int
publish int
}
func (c *mockClient) Details() (xfer.Details, error) {
@@ -25,26 +28,33 @@ func (c *mockClient) Stop() {
c.stopped++
}
func (c *mockClient) Publish(io.Reader) error {
c.publish++
return nil
}
var (
a1 = &mockClient{id: "1"} // hostname a, app id 1
a2 = &mockClient{id: "2"} // hostname a, app id 2
b2 = &mockClient{id: "2"} // hostname b, app id 2 (duplicate)
b3 = &mockClient{id: "3"} // hostname b, app id 3
factory = func(_ xfer.ProbeConfig, hostname, target string) (xfer.AppClient, error) {
switch target {
case "a1":
return a1, nil
case "a2":
return a2, nil
case "b2":
return b2, nil
case "b3":
return b3, nil
}
panic(target)
}
)
func TestMultiClient(t *testing.T) {
var (
a1 = &mockClient{id: "1"} // hostname a, app id 1
a2 = &mockClient{id: "2"} // hostname a, app id 2
b2 = &mockClient{id: "2"} // hostname b, app id 2 (duplicate)
b3 = &mockClient{id: "3"} // hostname b, app id 3
factory = func(_ xfer.ProbeConfig, hostname, target string) (xfer.AppClient, error) {
switch target {
case "a1":
return a1, nil
case "a2":
return a2, nil
case "b2":
return b2, nil
case "b3":
return b3, nil
}
t.Fatal(target)
return a1, nil
}
controlHandler = xfer.ControlHandlerFunc(func(_ xfer.Request) xfer.Response {
return xfer.Response{}
})
@@ -76,3 +86,22 @@ func TestMultiClient(t *testing.T) {
mp.Set("b", []string{})
expect(b3.stopped, 1)
}
func TestMultiClientPublish(t *testing.T) {
mp := xfer.NewMultiAppClient(xfer.ProbeConfig{}, nil, factory)
defer mp.Stop()
sum := func() int { return a1.publish + a2.publish + b2.publish + b3.publish }
mp.Set("a", []string{"a1", "a2"})
mp.Set("b", []string{"b2", "b3"})
for i := 1; i < 10; i++ {
if err := mp.Publish(&bytes.Buffer{}); err != nil {
t.Error(err)
}
if want, have := 3*i, sum(); want != have {
t.Errorf("want %d, have %d", want, have)
}
}
}

View File

@@ -1,144 +0,0 @@
package xfer
import (
"bytes"
"errors"
"io"
"io/ioutil"
"log"
"strings"
"sync"
)
// MultiPublisher implements publisher over a collection of heterogeneous
// targets. See documentation of each method to understand the semantics.
type MultiPublisher struct {
mtx sync.Mutex
factory func(hostname, endpoint string) (string, Publisher, error)
sema semaphore
list []tuple
}
// NewMultiPublisher returns a new MultiPublisher ready for use.
func NewMultiPublisher(factory func(hostname, endpoint string) (string, Publisher, error)) *MultiPublisher {
return &MultiPublisher{
factory: factory,
sema: newSemaphore(maxConcurrentGET),
}
}
type tuple struct {
publisher Publisher
target string // DNS name
endpoint string // IP addr
id string // unique ID from app
err error // if factory failed
}
const maxConcurrentGET = 10
// Set declares that the target (DNS name) resolves to the provided endpoints
// (IPs), and that we want to publish to each of those endpoints. Set replaces
// any existing publishers to the given target. Set invokes the factory method
// to convert each endpoint to a publisher, and to get the remote receiver's
// unique ID.
func (p *MultiPublisher) Set(target string, endpoints []string) {
// Convert endpoints to publishers.
c := make(chan tuple, len(endpoints))
for _, endpoint := range endpoints {
go func(endpoint string) {
p.sema.acquire()
defer p.sema.release()
id, publisher, err := p.factory(target, endpoint)
c <- tuple{publisher, target, endpoint, id, err}
}(endpoint)
}
list := make([]tuple, 0, len(p.list)+len(endpoints))
for i := 0; i < cap(c); i++ {
t := <-c
if t.err != nil {
log.Printf("multi-publisher set: %s (%s): %v", t.target, t.endpoint, t.err)
continue
}
list = append(list, t)
}
// Copy all other tuples over to the new list.
p.mtx.Lock()
defer p.mtx.Unlock()
p.list = p.appendFilter(list, func(t tuple) bool { return t.target != target })
}
// Delete removes all endpoints that match the given target.
func (p *MultiPublisher) Delete(target string) {
p.mtx.Lock()
defer p.mtx.Unlock()
p.list = p.appendFilter([]tuple{}, func(t tuple) bool { return t.target != target })
}
// Publish implements Publisher by publishing the reader to all of the
// underlying publishers sequentially. To do that, it needs to drain the
// reader, and recreate new readers for each publisher. Note that it will
// publish to one endpoint for each unique ID. Failed publishes don't count.
func (p *MultiPublisher) Publish(r io.Reader) error {
buf, err := ioutil.ReadAll(r)
if err != nil {
return err
}
var (
ids = map[string]struct{}{}
errs = []string{}
)
p.mtx.Lock()
defer p.mtx.Unlock()
for _, t := range p.list {
if _, ok := ids[t.id]; ok {
continue
}
if err := t.publisher.Publish(bytes.NewReader(buf)); err != nil {
errs = append(errs, err.Error())
continue
}
ids[t.id] = struct{}{} // sent already
}
if len(errs) > 0 {
return errors.New(strings.Join(errs, "; "))
}
return nil
}
// Stop invokes stop on all underlying publishers and removes them.
func (p *MultiPublisher) Stop() {
p.mtx.Lock()
defer p.mtx.Unlock()
for _, t := range p.list {
t.publisher.Stop()
}
p.list = []tuple{}
}
func (p *MultiPublisher) appendFilter(list []tuple, f func(tuple) bool) []tuple {
for _, t := range p.list {
if !f(t) {
t.publisher.Stop()
continue
}
list = append(list, t)
}
return list
}
type semaphore chan struct{}
func newSemaphore(n int) semaphore {
c := make(chan struct{}, n)
for i := 0; i < n; i++ {
c <- struct{}{}
}
return semaphore(c)
}
func (s semaphore) acquire() { <-s }
func (s semaphore) release() { s <- struct{}{} }

View File

@@ -1,54 +0,0 @@
package xfer_test
import (
"bytes"
"fmt"
"io"
"testing"
"github.com/weaveworks/scope/xfer"
)
func TestMultiPublisher(t *testing.T) {
var (
a1 = &mockPublisher{} // target a, endpoint 1
a2 = &mockPublisher{} // target a, endpoint 2 (duplicate)
b2 = &mockPublisher{} // target b, endpoint 2 (duplicate)
b3 = &mockPublisher{} // target b, endpoint 3
)
sum := func() int { return a1.count + a2.count + b2.count + b3.count }
mp := xfer.NewMultiPublisher(func(hostname, endpoint string) (string, xfer.Publisher, error) {
switch endpoint {
case "a1":
return "1", a1, nil
case "a2":
return "2", a2, nil
case "b2":
return "2", b2, nil
case "b3":
return "3", b3, nil
default:
return "", nil, fmt.Errorf("invalid endpoint %s", endpoint)
}
})
defer mp.Stop()
mp.Set("a", []string{"a1", "a2"})
mp.Set("b", []string{"b2", "b3"})
for i := 1; i < 10; i++ {
if err := mp.Publish(&bytes.Buffer{}); err != nil {
t.Error(err)
}
if want, have := 3*i, sum(); want != have {
t.Errorf("want %d, have %d", want, have)
}
}
}
type mockPublisher struct{ count int }
func (p *mockPublisher) Publish(io.Reader) error { p.count++; return nil }
func (p *mockPublisher) Stop() {}