mirror of
https://github.com/weaveworks/scope.git
synced 2026-03-03 18:20:27 +00:00
Merge pull request #1209 from weaveworks/multitenant-tests
Tests for Consul Pipe Router.
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -12,6 +12,7 @@ releases
|
||||
# Architecture specific extensions/prefixes
|
||||
*.[568vq]
|
||||
[568vq].out
|
||||
.DS_Store
|
||||
|
||||
*.cgo1.go
|
||||
*.cgo2.c
|
||||
|
||||
@@ -8,8 +8,6 @@ import (
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
consul "github.com/hashicorp/consul/api"
|
||||
|
||||
"github.com/weaveworks/scope/common/mtime"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -21,8 +19,7 @@ const (
|
||||
type ConsulClient interface {
|
||||
Get(key string, out interface{}) error
|
||||
CAS(key string, out interface{}, f CASCallback) error
|
||||
Watch(key string, deadline time.Time, out interface{}, f func(interface{}) (bool, error)) error
|
||||
WatchPrefix(prefix string, out interface{}, f func(string, interface{}) bool)
|
||||
WatchPrefix(prefix string, out interface{}, done chan struct{}, f func(string, interface{}) bool)
|
||||
}
|
||||
|
||||
// CASCallback is the type of the callback to CAS. If err is nil, out must be non-nil.
|
||||
@@ -37,7 +34,7 @@ func NewConsulClient(addr string) (ConsulClient, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return (*consulClient)(client), nil
|
||||
return &consulClient{client.KV()}, nil
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -50,12 +47,19 @@ var (
|
||||
ErrNotFound = fmt.Errorf("Not found")
|
||||
)
|
||||
|
||||
type consulClient consul.Client
|
||||
type kv interface {
|
||||
CAS(p *consul.KVPair, q *consul.WriteOptions) (bool, *consul.WriteMeta, error)
|
||||
Get(key string, q *consul.QueryOptions) (*consul.KVPair, *consul.QueryMeta, error)
|
||||
List(prefix string, q *consul.QueryOptions) (consul.KVPairs, *consul.QueryMeta, error)
|
||||
}
|
||||
|
||||
type consulClient struct {
|
||||
kv
|
||||
}
|
||||
|
||||
// Get and deserialise a JSON value from consul.
|
||||
func (c *consulClient) Get(key string, out interface{}) error {
|
||||
kv := (*consul.Client)(c).KV()
|
||||
kvp, _, err := kv.Get(key, queryOptions)
|
||||
kvp, _, err := c.kv.Get(key, queryOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -73,13 +77,12 @@ func (c *consulClient) Get(key string, out interface{}) error {
|
||||
func (c *consulClient) CAS(key string, out interface{}, f CASCallback) error {
|
||||
var (
|
||||
index = uint64(0)
|
||||
kv = (*consul.Client)(c).KV()
|
||||
retries = 10
|
||||
retry = true
|
||||
intermediate interface{}
|
||||
)
|
||||
for i := 0; i < retries; i++ {
|
||||
kvp, _, err := kv.Get(key, queryOptions)
|
||||
kvp, _, err := c.kv.Get(key, queryOptions)
|
||||
if err != nil {
|
||||
log.Errorf("Error getting %s: %v", key, err)
|
||||
continue
|
||||
@@ -111,7 +114,7 @@ func (c *consulClient) CAS(key string, out interface{}, f CASCallback) error {
|
||||
log.Errorf("Error serialising value for %s: %v", key, err)
|
||||
continue
|
||||
}
|
||||
ok, _, err := kv.CAS(&consul.KVPair{
|
||||
ok, _, err := c.kv.CAS(&consul.KVPair{
|
||||
Key: key,
|
||||
Value: value.Bytes(),
|
||||
ModifyIndex: index,
|
||||
@@ -129,52 +132,16 @@ func (c *consulClient) CAS(key string, out interface{}, f CASCallback) error {
|
||||
return fmt.Errorf("Failed to CAS %s", key)
|
||||
}
|
||||
|
||||
// Watch a given key value and trigger a callback when it changes.
|
||||
// if callback returns false or error, exit (with the error).
|
||||
func (c *consulClient) Watch(key string, deadline time.Time, out interface{}, f func(interface{}) (bool, error)) error {
|
||||
var (
|
||||
index = uint64(0)
|
||||
kv = (*consul.Client)(c).KV()
|
||||
)
|
||||
for deadline.After(mtime.Now()) {
|
||||
// Do a (blocking) long poll waiting for the entry to get updated. index here
|
||||
// is really a version number; this call will wait for the key to be updated
|
||||
// past said version. As we always start from version 0, we're guaranteed
|
||||
// not to miss any updates - in fact we will always call the callback with
|
||||
// the current value of the key immediately.
|
||||
kvp, meta, err := kv.Get(key, &consul.QueryOptions{
|
||||
RequireConsistent: true,
|
||||
WaitIndex: index,
|
||||
WaitTime: longPollDuration,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error getting %s: %v", key, err)
|
||||
}
|
||||
if kvp == nil {
|
||||
return ErrNotFound
|
||||
}
|
||||
|
||||
if err := json.NewDecoder(bytes.NewReader(kvp.Value)).Decode(out); err != nil {
|
||||
return err
|
||||
}
|
||||
if ok, err := f(out); !ok {
|
||||
return nil
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
index = meta.LastIndex
|
||||
}
|
||||
return fmt.Errorf("Timed out waiting on %s", key)
|
||||
}
|
||||
|
||||
func (c *consulClient) WatchPrefix(prefix string, out interface{}, f func(string, interface{}) bool) {
|
||||
var (
|
||||
index = uint64(0)
|
||||
kv = (*consul.Client)(c).KV()
|
||||
)
|
||||
func (c *consulClient) WatchPrefix(prefix string, out interface{}, done chan struct{}, f func(string, interface{}) bool) {
|
||||
index := uint64(0)
|
||||
for {
|
||||
kvps, meta, err := kv.List(prefix, &consul.QueryOptions{
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
kvps, meta, err := c.kv.List(prefix, &consul.QueryOptions{
|
||||
RequireConsistent: true,
|
||||
WaitIndex: index,
|
||||
WaitTime: longPollDuration,
|
||||
|
||||
@@ -15,7 +15,6 @@ import (
|
||||
|
||||
"github.com/weaveworks/scope/app"
|
||||
"github.com/weaveworks/scope/common/mtime"
|
||||
"github.com/weaveworks/scope/common/network"
|
||||
"github.com/weaveworks/scope/common/xfer"
|
||||
)
|
||||
|
||||
@@ -23,8 +22,6 @@ const (
|
||||
gcInterval = 30 * time.Second // we check all the pipes every 30s
|
||||
pipeTimeout = 1 * time.Minute // pipes are closed when a client hasn't been connected for 1 minute
|
||||
gcTimeout = 10 * time.Minute // after another 10 minutes, tombstoned pipes are forgotten
|
||||
|
||||
privateAPIPort = 4444
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -84,6 +81,7 @@ type consulPipeRouter struct {
|
||||
activePipes map[string]xfer.Pipe
|
||||
bridges map[string]*bridgeConnection
|
||||
actorChan chan func()
|
||||
pipeWaiters map[string][]chan xfer.Pipe
|
||||
|
||||
// Used by Stop()
|
||||
quit chan struct{}
|
||||
@@ -91,11 +89,7 @@ type consulPipeRouter struct {
|
||||
}
|
||||
|
||||
// NewConsulPipeRouter returns a new consul based router
|
||||
func NewConsulPipeRouter(client ConsulClient, prefix, inf string, userIDer UserIDer) (app.PipeRouter, error) {
|
||||
advertise, err := network.GetFirstAddressOf(inf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
func NewConsulPipeRouter(client ConsulClient, prefix, advertise string, userIDer UserIDer) app.PipeRouter {
|
||||
pipeRouter := &consulPipeRouter{
|
||||
prefix: prefix,
|
||||
advertise: advertise,
|
||||
@@ -105,13 +99,15 @@ func NewConsulPipeRouter(client ConsulClient, prefix, inf string, userIDer UserI
|
||||
activePipes: map[string]xfer.Pipe{},
|
||||
bridges: map[string]*bridgeConnection{},
|
||||
actorChan: make(chan func()),
|
||||
quit: make(chan struct{}),
|
||||
pipeWaiters: map[string][]chan xfer.Pipe{},
|
||||
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
pipeRouter.wait.Add(2)
|
||||
go pipeRouter.watchAll()
|
||||
go pipeRouter.privateAPI()
|
||||
go pipeRouter.actor()
|
||||
return pipeRouter, nil
|
||||
go pipeRouter.privateAPI()
|
||||
return pipeRouter
|
||||
}
|
||||
|
||||
func (pr *consulPipeRouter) Stop() {
|
||||
@@ -138,28 +134,25 @@ func (pr *consulPipeRouter) actor() {
|
||||
// trigger an event in this loop.
|
||||
func (pr *consulPipeRouter) watchAll() {
|
||||
defer pr.wait.Done()
|
||||
pr.client.WatchPrefix(pr.prefix, &consulPipe{}, func(key string, value interface{}) bool {
|
||||
pr.client.WatchPrefix(pr.prefix, &consulPipe{}, pr.quit, func(key string, value interface{}) bool {
|
||||
cp := *value.(*consulPipe)
|
||||
select {
|
||||
case pr.actorChan <- func() { pr.handlePipeUpdate(key, cp) }:
|
||||
return true
|
||||
case <-pr.quit:
|
||||
return false
|
||||
default:
|
||||
}
|
||||
|
||||
pr.actorChan <- func() { pr.handlePipeUpdate(key, value.(*consulPipe)) }
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func (pr *consulPipeRouter) handlePipeUpdate(key string, cp *consulPipe) {
|
||||
log.Infof("Got update to pipe %s", key)
|
||||
|
||||
func (pr *consulPipeRouter) handlePipeUpdate(key string, cp consulPipe) {
|
||||
// 1. If this pipe is closed, or we're not one of the ends, we
|
||||
// should ensure our local pipe (and bridge) is closed.
|
||||
if !cp.DeletedAt.IsZero() || !cp.eitherEndFor(pr.advertise) {
|
||||
log.Infof("Pipe %s not in use on this node.", key)
|
||||
pipe, ok := pr.activePipes[key]
|
||||
delete(pr.activePipes, key)
|
||||
if ok {
|
||||
log.Infof("Deleting pipe %s", key)
|
||||
pipe.Close()
|
||||
}
|
||||
|
||||
@@ -178,8 +171,13 @@ func (pr *consulPipeRouter) handlePipeUpdate(key string, cp *consulPipe) {
|
||||
// 2. If this pipe if for us, we should have a pipe for it.
|
||||
pipe, ok := pr.activePipes[key]
|
||||
if !ok {
|
||||
log.Infof("Creating pipe %s", key)
|
||||
pipe = xfer.NewPipe()
|
||||
pr.activePipes[key] = pipe
|
||||
for _, pw := range pr.pipeWaiters[key] {
|
||||
pw <- pipe
|
||||
}
|
||||
delete(pr.pipeWaiters, key)
|
||||
}
|
||||
|
||||
// 3. Ensure there is a bridging connection for this pipe.
|
||||
@@ -193,7 +191,6 @@ func (pr *consulPipeRouter) handlePipeUpdate(key string, cp *consulPipe) {
|
||||
// If we shouldn't be bridging but are, or we should be bridging but are pointing
|
||||
// at the wrong place, stop the current bridge.
|
||||
if (!shouldBridge && ok) || (shouldBridge && ok && bridge.addr != cp.addrFor(app.ProbeEnd)) {
|
||||
log.Infof("Stopping bridge connection for %s", key)
|
||||
delete(pr.bridges, key)
|
||||
bridge.stop()
|
||||
ok = false
|
||||
@@ -201,46 +198,69 @@ func (pr *consulPipeRouter) handlePipeUpdate(key string, cp *consulPipe) {
|
||||
|
||||
// If we should be bridging and are not, start a new bridge
|
||||
if shouldBridge && !ok {
|
||||
log.Infof("Starting bridge connection for %s", key)
|
||||
bridge = newBridgeConnection(key, cp.addrFor(app.ProbeEnd), pipe)
|
||||
pr.bridges[key] = bridge
|
||||
}
|
||||
}
|
||||
|
||||
func (pr *consulPipeRouter) getPipe(key string) xfer.Pipe {
|
||||
pc := make(chan xfer.Pipe)
|
||||
select {
|
||||
case pr.actorChan <- func() { pc <- pr.activePipes[key] }:
|
||||
return <-pc
|
||||
case <-pr.quit:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (pr *consulPipeRouter) waitForPipe(key string) xfer.Pipe {
|
||||
pc := make(chan xfer.Pipe)
|
||||
select {
|
||||
case pr.actorChan <- func() {
|
||||
pipe, ok := pr.activePipes[key]
|
||||
if ok {
|
||||
pc <- pipe
|
||||
} else {
|
||||
pr.pipeWaiters[key] = append(pr.pipeWaiters[key], pc)
|
||||
}
|
||||
}:
|
||||
return <-pc
|
||||
case <-pr.quit:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (pr *consulPipeRouter) privateAPI() {
|
||||
router := mux.NewRouter()
|
||||
router.Methods("GET").
|
||||
MatcherFunc(app.URLMatcher("/private/api/pipe/{key}")).
|
||||
HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
var (
|
||||
key = mux.Vars(r)["key"]
|
||||
pc = make(chan xfer.Pipe)
|
||||
)
|
||||
pr.actorChan <- func() {
|
||||
pc <- pr.activePipes[key]
|
||||
}
|
||||
pipe := <-pc
|
||||
key := mux.Vars(r)["key"]
|
||||
log.Infof("%s: Server bridge connection started", key)
|
||||
defer log.Infof("%s: Server bridge connection stopped", key)
|
||||
|
||||
pipe := pr.getPipe(key)
|
||||
if pipe == nil {
|
||||
http.NotFound(w, r)
|
||||
log.Errorf("%s: Server bridge connection; Unknown pipe!", key)
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
conn, err := xfer.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
log.Errorf("Error upgrading pipe %s websocket: %v", key, err)
|
||||
log.Errorf("%s: Server bridge connection; Error upgrading to websocket: %v", key, err)
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
end, _ := pipe.Ends()
|
||||
if err := pipe.CopyToWebsocket(end, conn); err != nil && !xfer.IsExpectedWSCloseError(err) {
|
||||
log.Printf("Error copying to pipe %s websocket: %v", key, err)
|
||||
log.Errorf("%s: Server bridge connection; Error copying pipe to websocket: %v", key, err)
|
||||
}
|
||||
})
|
||||
|
||||
addr := fmt.Sprintf("%s:%d", pr.advertise, privateAPIPort)
|
||||
log.Infof("Serving private API on endpoint %s.", addr)
|
||||
log.Infof("Private API terminated: %v", http.ListenAndServe(addr, router))
|
||||
log.Infof("Serving private API on endpoint %s.", pr.advertise)
|
||||
log.Infof("Private API terminated: %v", http.ListenAndServe(pr.advertise, router))
|
||||
}
|
||||
|
||||
func (pr *consulPipeRouter) Exists(ctx context.Context, id string) (bool, error) {
|
||||
@@ -293,18 +313,7 @@ func (pr *consulPipeRouter) Get(ctx context.Context, id string, e app.End) (xfer
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// next see if we already have a active pipe
|
||||
pc := make(chan xfer.Pipe)
|
||||
pr.actorChan <- func() {
|
||||
pipe, ok := pr.activePipes[key]
|
||||
if !ok {
|
||||
pipe = xfer.NewPipe()
|
||||
pr.activePipes[key] = pipe
|
||||
}
|
||||
pc <- pipe
|
||||
}
|
||||
pipe := <-pc
|
||||
|
||||
pipe := pr.waitForPipe(key)
|
||||
myEnd, _ := pipe.Ends()
|
||||
if e == app.ProbeEnd {
|
||||
_, myEnd = pipe.Ends()
|
||||
@@ -370,6 +379,7 @@ type bridgeConnection struct {
|
||||
}
|
||||
|
||||
func newBridgeConnection(key, addr string, pipe xfer.Pipe) *bridgeConnection {
|
||||
log.Infof("%s: Starting client bridge connection", key)
|
||||
result := &bridgeConnection{
|
||||
key: key,
|
||||
addr: addr,
|
||||
@@ -381,22 +391,25 @@ func newBridgeConnection(key, addr string, pipe xfer.Pipe) *bridgeConnection {
|
||||
}
|
||||
|
||||
func (bc *bridgeConnection) stop() {
|
||||
log.Infof("%s: Stopping client bridge connection", bc.key)
|
||||
bc.mtx.Lock()
|
||||
bc.stopped = true
|
||||
if bc.conn != nil {
|
||||
bc.conn.Close()
|
||||
end, _ := bc.pipe.Ends()
|
||||
end.Write(nil) // this will cause the other end of wake up and exit
|
||||
}
|
||||
bc.mtx.Unlock()
|
||||
bc.wait.Wait()
|
||||
}
|
||||
|
||||
func (bc *bridgeConnection) loop() {
|
||||
log.Infof("Making bridge connection for pipe %s to %s", bc.key, bc.addr)
|
||||
log.Infof("%s: Client bridge connection started", bc.key)
|
||||
defer bc.wait.Done()
|
||||
defer log.Infof("Stopping bridge connection for pipe %s to %s", bc.key, bc.addr)
|
||||
defer log.Infof("%s: Client bridge connection stopped", bc.key)
|
||||
|
||||
_, end := bc.pipe.Ends()
|
||||
url := fmt.Sprintf("ws://%s:%d/private/api/pipe/%s", bc.addr, privateAPIPort, url.QueryEscape(bc.key))
|
||||
url := fmt.Sprintf("ws://%s/private/api/pipe/%s", bc.addr, url.QueryEscape(bc.key))
|
||||
|
||||
for {
|
||||
bc.mtx.Lock()
|
||||
@@ -410,7 +423,7 @@ func (bc *bridgeConnection) loop() {
|
||||
// connect to other pipes instance
|
||||
conn, _, err := xfer.DialWS(wsDialer, url, http.Header{})
|
||||
if err != nil {
|
||||
log.Errorf("Error connecting to %s: %v", url, err)
|
||||
log.Errorf("%s: Client bridge connection; Error connecting to %s: %v", bc.key, url, err)
|
||||
time.Sleep(time.Second) // TODO backoff
|
||||
continue
|
||||
}
|
||||
@@ -425,7 +438,8 @@ func (bc *bridgeConnection) loop() {
|
||||
bc.mtx.Unlock()
|
||||
|
||||
if err := bc.pipe.CopyToWebsocket(end, conn); err != nil && !xfer.IsExpectedWSCloseError(err) {
|
||||
log.Printf("Error copying to pipe %s websocket: %v", bc.key, err)
|
||||
log.Errorf("%s: Client bridge connection; Error copying pipe to websocket: %v", bc.key, err)
|
||||
}
|
||||
conn.Close()
|
||||
}
|
||||
}
|
||||
|
||||
206
app/multitenant/consul_pipe_router_internal_test.go
Normal file
206
app/multitenant/consul_pipe_router_internal_test.go
Normal file
@@ -0,0 +1,206 @@
|
||||
package multitenant
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/weaveworks/scope/app"
|
||||
"github.com/weaveworks/scope/common/xfer"
|
||||
"github.com/weaveworks/scope/probe/appclient"
|
||||
)
|
||||
|
||||
type adapter struct {
|
||||
c appclient.AppClient
|
||||
}
|
||||
|
||||
func (a adapter) PipeConnection(_, pipeID string, pipe xfer.Pipe) error {
|
||||
a.c.PipeConnection(pipeID, pipe)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a adapter) PipeClose(_, pipeID string) error {
|
||||
return a.c.PipeClose(pipeID)
|
||||
}
|
||||
|
||||
type pipeconn struct {
|
||||
id string
|
||||
uiPR, probePR app.PipeRouter
|
||||
uiPipe, probePipe xfer.Pipe
|
||||
uiIO, probeIO io.ReadWriter
|
||||
}
|
||||
|
||||
func (p *pipeconn) test(t *testing.T) {
|
||||
msg := []byte("hello " + p.id)
|
||||
wait := sync.WaitGroup{}
|
||||
wait.Add(2)
|
||||
|
||||
go func() {
|
||||
defer wait.Done()
|
||||
|
||||
// write something to the probe end
|
||||
_, err := p.probeIO.Write(msg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer wait.Done()
|
||||
|
||||
// read it back off the other end
|
||||
buf := make([]byte, len(msg))
|
||||
n, err := p.uiIO.Read(buf)
|
||||
if n != len(buf) {
|
||||
t.Fatalf("only read %d", n)
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !bytes.Equal(buf, msg) {
|
||||
t.Fatalf("Got: %v, Expected: %v", buf, msg)
|
||||
}
|
||||
}()
|
||||
|
||||
wait.Wait()
|
||||
}
|
||||
|
||||
type pipeTest struct {
|
||||
prs []app.PipeRouter
|
||||
pipes []*pipeconn
|
||||
}
|
||||
|
||||
func (pt *pipeTest) newPipe(t *testing.T) {
|
||||
// make a new pipe id
|
||||
id := fmt.Sprintf("pipe-%d", rand.Int63())
|
||||
log.Printf(">>>> newPipe %s", id)
|
||||
|
||||
// pick a random PR to connect app to
|
||||
uiIndex := rand.Intn(len(pt.prs))
|
||||
uiPR := pt.prs[uiIndex]
|
||||
uiPipe, uiIO, err := uiPR.Get(context.Background(), id, app.UIEnd)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// pick a random PR to connect probe to
|
||||
probeIndex := rand.Intn(len(pt.prs))
|
||||
for probeIndex == uiIndex {
|
||||
probeIndex = rand.Intn(len(pt.prs))
|
||||
}
|
||||
probePR := pt.prs[probeIndex]
|
||||
probePipe, probeIO, err := probePR.Get(context.Background(), id, app.ProbeEnd)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
pipe := &pipeconn{
|
||||
id: id,
|
||||
uiPR: uiPR,
|
||||
uiPipe: uiPipe,
|
||||
uiIO: uiIO,
|
||||
probePR: probePR,
|
||||
probePipe: probePipe,
|
||||
probeIO: probeIO,
|
||||
}
|
||||
pipe.test(t)
|
||||
pt.pipes = append(pt.pipes, pipe)
|
||||
}
|
||||
|
||||
func (pt *pipeTest) deletePipe(t *testing.T) {
|
||||
// pick a random pipe
|
||||
i := rand.Intn(len(pt.pipes))
|
||||
pipe := pt.pipes[i]
|
||||
log.Printf(">>>> deletePipe %s", pipe.id)
|
||||
|
||||
if err := pipe.uiPR.Release(context.Background(), pipe.id, app.UIEnd); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := pipe.probePR.Release(context.Background(), pipe.id, app.ProbeEnd); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// remove from list
|
||||
pt.pipes = pt.pipes[:i+copy(pt.pipes[i:], pt.pipes[i+1:])]
|
||||
}
|
||||
|
||||
func (pt *pipeTest) reconnectPipe(t *testing.T) {
|
||||
// pick a random pipe
|
||||
pipe := pt.pipes[rand.Intn(len(pt.pipes))]
|
||||
log.Printf(">>>> reconnectPipe %s", pipe.id)
|
||||
|
||||
// pick a random PR to connect to
|
||||
newPR := pt.prs[rand.Intn(len(pt.prs))]
|
||||
|
||||
// pick a random end
|
||||
if rand.Float32() < 0.5 {
|
||||
if err := pipe.uiPR.Release(context.Background(), pipe.id, app.UIEnd); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
uiPipe, uiIO, err := newPR.Get(context.Background(), pipe.id, app.UIEnd)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
pipe.uiPR, pipe.uiPipe, pipe.uiIO = newPR, uiPipe, uiIO
|
||||
} else {
|
||||
if err := pipe.probePR.Release(context.Background(), pipe.id, app.ProbeEnd); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
probePipe, probeIO, err := newPR.Get(context.Background(), pipe.id, app.ProbeEnd)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
pipe.probePR, pipe.probePipe, pipe.probeIO = newPR, probePipe, probeIO
|
||||
}
|
||||
}
|
||||
|
||||
func TestPipeRouter(t *testing.T) {
|
||||
var (
|
||||
consul = newMockConsulClient()
|
||||
replicas = 2
|
||||
iterations = 50
|
||||
pt = pipeTest{}
|
||||
)
|
||||
|
||||
for i := 0; i < replicas; i++ {
|
||||
pr := NewConsulPipeRouter(consul, "", fmt.Sprintf("127.0.0.1:44%02d", i), NoopUserIDer)
|
||||
defer pr.Stop()
|
||||
pt.prs = append(pt.prs, pr)
|
||||
}
|
||||
|
||||
for i := 0; i < iterations; i++ {
|
||||
log.Printf("Iteration %d", i)
|
||||
pt.newPipe(t)
|
||||
pt.deletePipe(t)
|
||||
}
|
||||
}
|
||||
|
||||
//func TestPipeHard(t *testing.T) {
|
||||
// if len(pipes) <= 0 {
|
||||
// newPipe()
|
||||
// continue
|
||||
// } else if len(pipes) >= 2 {
|
||||
// deletePipe()
|
||||
// continue
|
||||
// }
|
||||
// r := rand.Float32()
|
||||
// switch {
|
||||
// case 0.0 < r && r <= 0.3:
|
||||
// newPipe()
|
||||
// case 0.3 < r && r <= 0.6:
|
||||
// deletePipe()
|
||||
// case 0.6 < r && r <= 1.0:
|
||||
// reconnectPipe()
|
||||
// }
|
||||
//}
|
||||
98
app/multitenant/mock_consul_client_internal_test.go
Normal file
98
app/multitenant/mock_consul_client_internal_test.go
Normal file
@@ -0,0 +1,98 @@
|
||||
package multitenant
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
consul "github.com/hashicorp/consul/api"
|
||||
)
|
||||
|
||||
type mockKV struct {
|
||||
mtx sync.Mutex
|
||||
cond *sync.Cond
|
||||
kvps map[string]*consul.KVPair
|
||||
next uint64 // the next update will have this 'index in the the log'
|
||||
}
|
||||
|
||||
func newMockConsulClient() ConsulClient {
|
||||
m := mockKV{
|
||||
kvps: map[string]*consul.KVPair{},
|
||||
}
|
||||
m.cond = sync.NewCond(&m.mtx)
|
||||
go m.loop()
|
||||
return &consulClient{&m}
|
||||
}
|
||||
|
||||
func copyKVPair(in *consul.KVPair) *consul.KVPair {
|
||||
value := make([]byte, len(in.Value))
|
||||
copy(value, in.Value)
|
||||
return &consul.KVPair{
|
||||
Key: in.Key,
|
||||
CreateIndex: in.CreateIndex,
|
||||
ModifyIndex: in.ModifyIndex,
|
||||
LockIndex: in.LockIndex,
|
||||
Flags: in.Flags,
|
||||
Value: value,
|
||||
Session: in.Session,
|
||||
}
|
||||
}
|
||||
|
||||
// periodic loop to wake people up, so they can honour timeouts
|
||||
func (m *mockKV) loop() {
|
||||
for range time.Tick(1 * time.Second) {
|
||||
m.mtx.Lock()
|
||||
m.cond.Broadcast()
|
||||
m.mtx.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mockKV) CAS(p *consul.KVPair, q *consul.WriteOptions) (bool, *consul.WriteMeta, error) {
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
existing, ok := m.kvps[p.Key]
|
||||
if ok && existing.ModifyIndex != p.ModifyIndex {
|
||||
return false, nil, nil
|
||||
}
|
||||
if ok {
|
||||
existing.Value = p.Value
|
||||
} else {
|
||||
m.kvps[p.Key] = copyKVPair(p)
|
||||
}
|
||||
m.kvps[p.Key].ModifyIndex++
|
||||
m.kvps[p.Key].LockIndex = m.next
|
||||
m.next++
|
||||
m.cond.Broadcast()
|
||||
return true, nil, nil
|
||||
}
|
||||
|
||||
func (m *mockKV) Get(key string, q *consul.QueryOptions) (*consul.KVPair, *consul.QueryMeta, error) {
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
value, ok := m.kvps[key]
|
||||
if !ok {
|
||||
return nil, nil, nil
|
||||
}
|
||||
for q.WaitIndex >= value.ModifyIndex {
|
||||
m.cond.Wait()
|
||||
}
|
||||
return copyKVPair(value), nil, nil
|
||||
}
|
||||
|
||||
func (m *mockKV) List(prefix string, q *consul.QueryOptions) (consul.KVPairs, *consul.QueryMeta, error) {
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
deadline := time.Now().Add(q.WaitTime)
|
||||
for m.next <= q.WaitIndex && time.Now().Before(deadline) {
|
||||
m.cond.Wait()
|
||||
}
|
||||
if time.Now().After(deadline) {
|
||||
return nil, &consul.QueryMeta{LastIndex: q.WaitIndex}, nil
|
||||
}
|
||||
result := consul.KVPairs{}
|
||||
for _, kvp := range m.kvps {
|
||||
if kvp.LockIndex >= q.WaitIndex {
|
||||
result = append(result, copyKVPair(kvp))
|
||||
}
|
||||
}
|
||||
return result, &consul.QueryMeta{LastIndex: m.next}, nil
|
||||
}
|
||||
@@ -22,6 +22,7 @@ type pipe struct {
|
||||
mtx sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
port, starboard io.ReadWriter
|
||||
closers []io.Closer
|
||||
quit chan struct{}
|
||||
closed bool
|
||||
onClose func()
|
||||
@@ -53,6 +54,9 @@ func NewPipe() Pipe {
|
||||
}{
|
||||
r2, w1,
|
||||
},
|
||||
closers: []io.Closer{
|
||||
r1, r2, w1, w2,
|
||||
},
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
@@ -67,6 +71,9 @@ func (p *pipe) Close() error {
|
||||
if !p.closed {
|
||||
p.closed = true
|
||||
close(p.quit)
|
||||
for _, c := range p.closers {
|
||||
c.Close()
|
||||
}
|
||||
onClose = p.onClose
|
||||
}
|
||||
p.mtx.Unlock()
|
||||
@@ -116,6 +123,10 @@ func (p *pipe) CopyToWebsocket(end io.ReadWriter, conn Websocket) error {
|
||||
return
|
||||
}
|
||||
|
||||
if p.Closed() {
|
||||
return
|
||||
}
|
||||
|
||||
if _, err := end.Write(buf); err != nil {
|
||||
errors <- err
|
||||
return
|
||||
@@ -133,6 +144,10 @@ func (p *pipe) CopyToWebsocket(end io.ReadWriter, conn Websocket) error {
|
||||
return
|
||||
}
|
||||
|
||||
if p.Closed() {
|
||||
return
|
||||
}
|
||||
|
||||
if err := conn.WriteMessage(websocket.BinaryMessage, buf[:n]); err != nil {
|
||||
errors <- err
|
||||
return
|
||||
|
||||
@@ -90,7 +90,7 @@ func (p *pingingWebsocket) ping() {
|
||||
defer p.writeLock.Unlock()
|
||||
if err := p.conn.WriteControl(websocket.PingMessage, nil, mtime.Now().Add(writeWait)); err != nil {
|
||||
log.Errorf("websocket ping error: %v", err)
|
||||
p.Close()
|
||||
p.conn.Close()
|
||||
return
|
||||
}
|
||||
p.pinger.Reset(pingPeriod)
|
||||
@@ -158,6 +158,8 @@ func (p *pingingWebsocket) ReadJSON(v interface{}) error {
|
||||
|
||||
// Close closes the connection
|
||||
func (p *pingingWebsocket) Close() error {
|
||||
p.writeLock.Lock()
|
||||
defer p.writeLock.Unlock()
|
||||
p.pinger.Stop()
|
||||
return p.conn.Close()
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ import (
|
||||
"github.com/weaveworks/scope/app"
|
||||
"github.com/weaveworks/scope/app/multitenant"
|
||||
"github.com/weaveworks/scope/common/middleware"
|
||||
"github.com/weaveworks/scope/common/network"
|
||||
"github.com/weaveworks/scope/common/weave"
|
||||
"github.com/weaveworks/scope/common/xfer"
|
||||
"github.com/weaveworks/scope/probe/docker"
|
||||
@@ -129,7 +130,12 @@ func pipeRouterFactory(userIDer multitenant.UserIDer, pipeRouterURL, consulInf s
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return multitenant.NewConsulPipeRouter(consulClient, strings.TrimPrefix(parsed.Path, "/"), consulInf, userIDer)
|
||||
advertise, err := network.GetFirstAddressOf(consulInf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
addr := fmt.Sprintf("%s:4444", advertise)
|
||||
return multitenant.NewConsulPipeRouter(consulClient, strings.TrimPrefix(parsed.Path, "/"), addr, userIDer), nil
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("Invalid pipe router '%s'", pipeRouterURL)
|
||||
|
||||
Reference in New Issue
Block a user