diff --git a/.gitignore b/.gitignore index 3744f2783..e34814e67 100644 --- a/.gitignore +++ b/.gitignore @@ -12,6 +12,7 @@ releases # Architecture specific extensions/prefixes *.[568vq] [568vq].out +.DS_Store *.cgo1.go *.cgo2.c diff --git a/app/multitenant/consul_client.go b/app/multitenant/consul_client.go index 9bb4370b0..693273f0b 100644 --- a/app/multitenant/consul_client.go +++ b/app/multitenant/consul_client.go @@ -8,8 +8,6 @@ import ( log "github.com/Sirupsen/logrus" consul "github.com/hashicorp/consul/api" - - "github.com/weaveworks/scope/common/mtime" ) const ( @@ -21,8 +19,7 @@ const ( type ConsulClient interface { Get(key string, out interface{}) error CAS(key string, out interface{}, f CASCallback) error - Watch(key string, deadline time.Time, out interface{}, f func(interface{}) (bool, error)) error - WatchPrefix(prefix string, out interface{}, f func(string, interface{}) bool) + WatchPrefix(prefix string, out interface{}, done chan struct{}, f func(string, interface{}) bool) } // CASCallback is the type of the callback to CAS. If err is nil, out must be non-nil. @@ -37,7 +34,7 @@ func NewConsulClient(addr string) (ConsulClient, error) { if err != nil { return nil, err } - return (*consulClient)(client), nil + return &consulClient{client.KV()}, nil } var ( @@ -50,12 +47,19 @@ var ( ErrNotFound = fmt.Errorf("Not found") ) -type consulClient consul.Client +type kv interface { + CAS(p *consul.KVPair, q *consul.WriteOptions) (bool, *consul.WriteMeta, error) + Get(key string, q *consul.QueryOptions) (*consul.KVPair, *consul.QueryMeta, error) + List(prefix string, q *consul.QueryOptions) (consul.KVPairs, *consul.QueryMeta, error) +} + +type consulClient struct { + kv +} // Get and deserialise a JSON value from consul. func (c *consulClient) Get(key string, out interface{}) error { - kv := (*consul.Client)(c).KV() - kvp, _, err := kv.Get(key, queryOptions) + kvp, _, err := c.kv.Get(key, queryOptions) if err != nil { return err } @@ -73,13 +77,12 @@ func (c *consulClient) Get(key string, out interface{}) error { func (c *consulClient) CAS(key string, out interface{}, f CASCallback) error { var ( index = uint64(0) - kv = (*consul.Client)(c).KV() retries = 10 retry = true intermediate interface{} ) for i := 0; i < retries; i++ { - kvp, _, err := kv.Get(key, queryOptions) + kvp, _, err := c.kv.Get(key, queryOptions) if err != nil { log.Errorf("Error getting %s: %v", key, err) continue @@ -111,7 +114,7 @@ func (c *consulClient) CAS(key string, out interface{}, f CASCallback) error { log.Errorf("Error serialising value for %s: %v", key, err) continue } - ok, _, err := kv.CAS(&consul.KVPair{ + ok, _, err := c.kv.CAS(&consul.KVPair{ Key: key, Value: value.Bytes(), ModifyIndex: index, @@ -129,52 +132,16 @@ func (c *consulClient) CAS(key string, out interface{}, f CASCallback) error { return fmt.Errorf("Failed to CAS %s", key) } -// Watch a given key value and trigger a callback when it changes. -// if callback returns false or error, exit (with the error). -func (c *consulClient) Watch(key string, deadline time.Time, out interface{}, f func(interface{}) (bool, error)) error { - var ( - index = uint64(0) - kv = (*consul.Client)(c).KV() - ) - for deadline.After(mtime.Now()) { - // Do a (blocking) long poll waiting for the entry to get updated. index here - // is really a version number; this call will wait for the key to be updated - // past said version. As we always start from version 0, we're guaranteed - // not to miss any updates - in fact we will always call the callback with - // the current value of the key immediately. - kvp, meta, err := kv.Get(key, &consul.QueryOptions{ - RequireConsistent: true, - WaitIndex: index, - WaitTime: longPollDuration, - }) - if err != nil { - return fmt.Errorf("Error getting %s: %v", key, err) - } - if kvp == nil { - return ErrNotFound - } - - if err := json.NewDecoder(bytes.NewReader(kvp.Value)).Decode(out); err != nil { - return err - } - if ok, err := f(out); !ok { - return nil - } else if err != nil { - return err - } - - index = meta.LastIndex - } - return fmt.Errorf("Timed out waiting on %s", key) -} - -func (c *consulClient) WatchPrefix(prefix string, out interface{}, f func(string, interface{}) bool) { - var ( - index = uint64(0) - kv = (*consul.Client)(c).KV() - ) +func (c *consulClient) WatchPrefix(prefix string, out interface{}, done chan struct{}, f func(string, interface{}) bool) { + index := uint64(0) for { - kvps, meta, err := kv.List(prefix, &consul.QueryOptions{ + select { + case <-done: + return + default: + } + + kvps, meta, err := c.kv.List(prefix, &consul.QueryOptions{ RequireConsistent: true, WaitIndex: index, WaitTime: longPollDuration, diff --git a/app/multitenant/consul_pipe_router.go b/app/multitenant/consul_pipe_router.go index e8592577a..52a150a79 100644 --- a/app/multitenant/consul_pipe_router.go +++ b/app/multitenant/consul_pipe_router.go @@ -15,7 +15,6 @@ import ( "github.com/weaveworks/scope/app" "github.com/weaveworks/scope/common/mtime" - "github.com/weaveworks/scope/common/network" "github.com/weaveworks/scope/common/xfer" ) @@ -23,8 +22,6 @@ const ( gcInterval = 30 * time.Second // we check all the pipes every 30s pipeTimeout = 1 * time.Minute // pipes are closed when a client hasn't been connected for 1 minute gcTimeout = 10 * time.Minute // after another 10 minutes, tombstoned pipes are forgotten - - privateAPIPort = 4444 ) var ( @@ -84,6 +81,7 @@ type consulPipeRouter struct { activePipes map[string]xfer.Pipe bridges map[string]*bridgeConnection actorChan chan func() + pipeWaiters map[string][]chan xfer.Pipe // Used by Stop() quit chan struct{} @@ -91,11 +89,7 @@ type consulPipeRouter struct { } // NewConsulPipeRouter returns a new consul based router -func NewConsulPipeRouter(client ConsulClient, prefix, inf string, userIDer UserIDer) (app.PipeRouter, error) { - advertise, err := network.GetFirstAddressOf(inf) - if err != nil { - return nil, err - } +func NewConsulPipeRouter(client ConsulClient, prefix, advertise string, userIDer UserIDer) app.PipeRouter { pipeRouter := &consulPipeRouter{ prefix: prefix, advertise: advertise, @@ -105,13 +99,15 @@ func NewConsulPipeRouter(client ConsulClient, prefix, inf string, userIDer UserI activePipes: map[string]xfer.Pipe{}, bridges: map[string]*bridgeConnection{}, actorChan: make(chan func()), - quit: make(chan struct{}), + pipeWaiters: map[string][]chan xfer.Pipe{}, + + quit: make(chan struct{}), } pipeRouter.wait.Add(2) go pipeRouter.watchAll() - go pipeRouter.privateAPI() go pipeRouter.actor() - return pipeRouter, nil + go pipeRouter.privateAPI() + return pipeRouter } func (pr *consulPipeRouter) Stop() { @@ -138,28 +134,25 @@ func (pr *consulPipeRouter) actor() { // trigger an event in this loop. func (pr *consulPipeRouter) watchAll() { defer pr.wait.Done() - pr.client.WatchPrefix(pr.prefix, &consulPipe{}, func(key string, value interface{}) bool { + pr.client.WatchPrefix(pr.prefix, &consulPipe{}, pr.quit, func(key string, value interface{}) bool { + cp := *value.(*consulPipe) select { + case pr.actorChan <- func() { pr.handlePipeUpdate(key, cp) }: + return true case <-pr.quit: return false - default: } - - pr.actorChan <- func() { pr.handlePipeUpdate(key, value.(*consulPipe)) } - return true }) } -func (pr *consulPipeRouter) handlePipeUpdate(key string, cp *consulPipe) { - log.Infof("Got update to pipe %s", key) - +func (pr *consulPipeRouter) handlePipeUpdate(key string, cp consulPipe) { // 1. If this pipe is closed, or we're not one of the ends, we // should ensure our local pipe (and bridge) is closed. if !cp.DeletedAt.IsZero() || !cp.eitherEndFor(pr.advertise) { - log.Infof("Pipe %s not in use on this node.", key) pipe, ok := pr.activePipes[key] delete(pr.activePipes, key) if ok { + log.Infof("Deleting pipe %s", key) pipe.Close() } @@ -178,8 +171,13 @@ func (pr *consulPipeRouter) handlePipeUpdate(key string, cp *consulPipe) { // 2. If this pipe if for us, we should have a pipe for it. pipe, ok := pr.activePipes[key] if !ok { + log.Infof("Creating pipe %s", key) pipe = xfer.NewPipe() pr.activePipes[key] = pipe + for _, pw := range pr.pipeWaiters[key] { + pw <- pipe + } + delete(pr.pipeWaiters, key) } // 3. Ensure there is a bridging connection for this pipe. @@ -193,7 +191,6 @@ func (pr *consulPipeRouter) handlePipeUpdate(key string, cp *consulPipe) { // If we shouldn't be bridging but are, or we should be bridging but are pointing // at the wrong place, stop the current bridge. if (!shouldBridge && ok) || (shouldBridge && ok && bridge.addr != cp.addrFor(app.ProbeEnd)) { - log.Infof("Stopping bridge connection for %s", key) delete(pr.bridges, key) bridge.stop() ok = false @@ -201,46 +198,69 @@ func (pr *consulPipeRouter) handlePipeUpdate(key string, cp *consulPipe) { // If we should be bridging and are not, start a new bridge if shouldBridge && !ok { - log.Infof("Starting bridge connection for %s", key) bridge = newBridgeConnection(key, cp.addrFor(app.ProbeEnd), pipe) pr.bridges[key] = bridge } } +func (pr *consulPipeRouter) getPipe(key string) xfer.Pipe { + pc := make(chan xfer.Pipe) + select { + case pr.actorChan <- func() { pc <- pr.activePipes[key] }: + return <-pc + case <-pr.quit: + return nil + } +} + +func (pr *consulPipeRouter) waitForPipe(key string) xfer.Pipe { + pc := make(chan xfer.Pipe) + select { + case pr.actorChan <- func() { + pipe, ok := pr.activePipes[key] + if ok { + pc <- pipe + } else { + pr.pipeWaiters[key] = append(pr.pipeWaiters[key], pc) + } + }: + return <-pc + case <-pr.quit: + return nil + } +} + func (pr *consulPipeRouter) privateAPI() { router := mux.NewRouter() router.Methods("GET"). MatcherFunc(app.URLMatcher("/private/api/pipe/{key}")). HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - var ( - key = mux.Vars(r)["key"] - pc = make(chan xfer.Pipe) - ) - pr.actorChan <- func() { - pc <- pr.activePipes[key] - } - pipe := <-pc + key := mux.Vars(r)["key"] + log.Infof("%s: Server bridge connection started", key) + defer log.Infof("%s: Server bridge connection stopped", key) + + pipe := pr.getPipe(key) if pipe == nil { - http.NotFound(w, r) + log.Errorf("%s: Server bridge connection; Unknown pipe!", key) + w.WriteHeader(http.StatusNotFound) return } conn, err := xfer.Upgrade(w, r, nil) if err != nil { - log.Errorf("Error upgrading pipe %s websocket: %v", key, err) + log.Errorf("%s: Server bridge connection; Error upgrading to websocket: %v", key, err) return } defer conn.Close() end, _ := pipe.Ends() if err := pipe.CopyToWebsocket(end, conn); err != nil && !xfer.IsExpectedWSCloseError(err) { - log.Printf("Error copying to pipe %s websocket: %v", key, err) + log.Errorf("%s: Server bridge connection; Error copying pipe to websocket: %v", key, err) } }) - addr := fmt.Sprintf("%s:%d", pr.advertise, privateAPIPort) - log.Infof("Serving private API on endpoint %s.", addr) - log.Infof("Private API terminated: %v", http.ListenAndServe(addr, router)) + log.Infof("Serving private API on endpoint %s.", pr.advertise) + log.Infof("Private API terminated: %v", http.ListenAndServe(pr.advertise, router)) } func (pr *consulPipeRouter) Exists(ctx context.Context, id string) (bool, error) { @@ -293,18 +313,7 @@ func (pr *consulPipeRouter) Get(ctx context.Context, id string, e app.End) (xfer return nil, nil, err } - // next see if we already have a active pipe - pc := make(chan xfer.Pipe) - pr.actorChan <- func() { - pipe, ok := pr.activePipes[key] - if !ok { - pipe = xfer.NewPipe() - pr.activePipes[key] = pipe - } - pc <- pipe - } - pipe := <-pc - + pipe := pr.waitForPipe(key) myEnd, _ := pipe.Ends() if e == app.ProbeEnd { _, myEnd = pipe.Ends() @@ -370,6 +379,7 @@ type bridgeConnection struct { } func newBridgeConnection(key, addr string, pipe xfer.Pipe) *bridgeConnection { + log.Infof("%s: Starting client bridge connection", key) result := &bridgeConnection{ key: key, addr: addr, @@ -381,22 +391,25 @@ func newBridgeConnection(key, addr string, pipe xfer.Pipe) *bridgeConnection { } func (bc *bridgeConnection) stop() { + log.Infof("%s: Stopping client bridge connection", bc.key) bc.mtx.Lock() bc.stopped = true if bc.conn != nil { bc.conn.Close() + end, _ := bc.pipe.Ends() + end.Write(nil) // this will cause the other end of wake up and exit } bc.mtx.Unlock() bc.wait.Wait() } func (bc *bridgeConnection) loop() { - log.Infof("Making bridge connection for pipe %s to %s", bc.key, bc.addr) + log.Infof("%s: Client bridge connection started", bc.key) defer bc.wait.Done() - defer log.Infof("Stopping bridge connection for pipe %s to %s", bc.key, bc.addr) + defer log.Infof("%s: Client bridge connection stopped", bc.key) _, end := bc.pipe.Ends() - url := fmt.Sprintf("ws://%s:%d/private/api/pipe/%s", bc.addr, privateAPIPort, url.QueryEscape(bc.key)) + url := fmt.Sprintf("ws://%s/private/api/pipe/%s", bc.addr, url.QueryEscape(bc.key)) for { bc.mtx.Lock() @@ -410,7 +423,7 @@ func (bc *bridgeConnection) loop() { // connect to other pipes instance conn, _, err := xfer.DialWS(wsDialer, url, http.Header{}) if err != nil { - log.Errorf("Error connecting to %s: %v", url, err) + log.Errorf("%s: Client bridge connection; Error connecting to %s: %v", bc.key, url, err) time.Sleep(time.Second) // TODO backoff continue } @@ -425,7 +438,8 @@ func (bc *bridgeConnection) loop() { bc.mtx.Unlock() if err := bc.pipe.CopyToWebsocket(end, conn); err != nil && !xfer.IsExpectedWSCloseError(err) { - log.Printf("Error copying to pipe %s websocket: %v", bc.key, err) + log.Errorf("%s: Client bridge connection; Error copying pipe to websocket: %v", bc.key, err) } + conn.Close() } } diff --git a/app/multitenant/consul_pipe_router_internal_test.go b/app/multitenant/consul_pipe_router_internal_test.go new file mode 100644 index 000000000..5fe4efbb3 --- /dev/null +++ b/app/multitenant/consul_pipe_router_internal_test.go @@ -0,0 +1,206 @@ +package multitenant + +import ( + "bytes" + "fmt" + "io" + "log" + "math/rand" + "sync" + "testing" + + "golang.org/x/net/context" + + "github.com/weaveworks/scope/app" + "github.com/weaveworks/scope/common/xfer" + "github.com/weaveworks/scope/probe/appclient" +) + +type adapter struct { + c appclient.AppClient +} + +func (a adapter) PipeConnection(_, pipeID string, pipe xfer.Pipe) error { + a.c.PipeConnection(pipeID, pipe) + return nil +} + +func (a adapter) PipeClose(_, pipeID string) error { + return a.c.PipeClose(pipeID) +} + +type pipeconn struct { + id string + uiPR, probePR app.PipeRouter + uiPipe, probePipe xfer.Pipe + uiIO, probeIO io.ReadWriter +} + +func (p *pipeconn) test(t *testing.T) { + msg := []byte("hello " + p.id) + wait := sync.WaitGroup{} + wait.Add(2) + + go func() { + defer wait.Done() + + // write something to the probe end + _, err := p.probeIO.Write(msg) + if err != nil { + t.Fatal(err) + } + }() + + go func() { + defer wait.Done() + + // read it back off the other end + buf := make([]byte, len(msg)) + n, err := p.uiIO.Read(buf) + if n != len(buf) { + t.Fatalf("only read %d", n) + } + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(buf, msg) { + t.Fatalf("Got: %v, Expected: %v", buf, msg) + } + }() + + wait.Wait() +} + +type pipeTest struct { + prs []app.PipeRouter + pipes []*pipeconn +} + +func (pt *pipeTest) newPipe(t *testing.T) { + // make a new pipe id + id := fmt.Sprintf("pipe-%d", rand.Int63()) + log.Printf(">>>> newPipe %s", id) + + // pick a random PR to connect app to + uiIndex := rand.Intn(len(pt.prs)) + uiPR := pt.prs[uiIndex] + uiPipe, uiIO, err := uiPR.Get(context.Background(), id, app.UIEnd) + if err != nil { + t.Fatal(err) + } + + // pick a random PR to connect probe to + probeIndex := rand.Intn(len(pt.prs)) + for probeIndex == uiIndex { + probeIndex = rand.Intn(len(pt.prs)) + } + probePR := pt.prs[probeIndex] + probePipe, probeIO, err := probePR.Get(context.Background(), id, app.ProbeEnd) + if err != nil { + t.Fatal(err) + } + + pipe := &pipeconn{ + id: id, + uiPR: uiPR, + uiPipe: uiPipe, + uiIO: uiIO, + probePR: probePR, + probePipe: probePipe, + probeIO: probeIO, + } + pipe.test(t) + pt.pipes = append(pt.pipes, pipe) +} + +func (pt *pipeTest) deletePipe(t *testing.T) { + // pick a random pipe + i := rand.Intn(len(pt.pipes)) + pipe := pt.pipes[i] + log.Printf(">>>> deletePipe %s", pipe.id) + + if err := pipe.uiPR.Release(context.Background(), pipe.id, app.UIEnd); err != nil { + t.Fatal(err) + } + + if err := pipe.probePR.Release(context.Background(), pipe.id, app.ProbeEnd); err != nil { + t.Fatal(err) + } + + // remove from list + pt.pipes = pt.pipes[:i+copy(pt.pipes[i:], pt.pipes[i+1:])] +} + +func (pt *pipeTest) reconnectPipe(t *testing.T) { + // pick a random pipe + pipe := pt.pipes[rand.Intn(len(pt.pipes))] + log.Printf(">>>> reconnectPipe %s", pipe.id) + + // pick a random PR to connect to + newPR := pt.prs[rand.Intn(len(pt.prs))] + + // pick a random end + if rand.Float32() < 0.5 { + if err := pipe.uiPR.Release(context.Background(), pipe.id, app.UIEnd); err != nil { + t.Fatal(err) + } + + uiPipe, uiIO, err := newPR.Get(context.Background(), pipe.id, app.UIEnd) + if err != nil { + t.Fatal(err) + } + + pipe.uiPR, pipe.uiPipe, pipe.uiIO = newPR, uiPipe, uiIO + } else { + if err := pipe.probePR.Release(context.Background(), pipe.id, app.ProbeEnd); err != nil { + t.Fatal(err) + } + + probePipe, probeIO, err := newPR.Get(context.Background(), pipe.id, app.ProbeEnd) + if err != nil { + t.Fatal(err) + } + + pipe.probePR, pipe.probePipe, pipe.probeIO = newPR, probePipe, probeIO + } +} + +func TestPipeRouter(t *testing.T) { + var ( + consul = newMockConsulClient() + replicas = 2 + iterations = 50 + pt = pipeTest{} + ) + + for i := 0; i < replicas; i++ { + pr := NewConsulPipeRouter(consul, "", fmt.Sprintf("127.0.0.1:44%02d", i), NoopUserIDer) + defer pr.Stop() + pt.prs = append(pt.prs, pr) + } + + for i := 0; i < iterations; i++ { + log.Printf("Iteration %d", i) + pt.newPipe(t) + pt.deletePipe(t) + } +} + +//func TestPipeHard(t *testing.T) { +// if len(pipes) <= 0 { +// newPipe() +// continue +// } else if len(pipes) >= 2 { +// deletePipe() +// continue +// } +// r := rand.Float32() +// switch { +// case 0.0 < r && r <= 0.3: +// newPipe() +// case 0.3 < r && r <= 0.6: +// deletePipe() +// case 0.6 < r && r <= 1.0: +// reconnectPipe() +// } +//} diff --git a/app/multitenant/mock_consul_client_internal_test.go b/app/multitenant/mock_consul_client_internal_test.go new file mode 100644 index 000000000..237f76aa0 --- /dev/null +++ b/app/multitenant/mock_consul_client_internal_test.go @@ -0,0 +1,98 @@ +package multitenant + +import ( + "sync" + "time" + + consul "github.com/hashicorp/consul/api" +) + +type mockKV struct { + mtx sync.Mutex + cond *sync.Cond + kvps map[string]*consul.KVPair + next uint64 // the next update will have this 'index in the the log' +} + +func newMockConsulClient() ConsulClient { + m := mockKV{ + kvps: map[string]*consul.KVPair{}, + } + m.cond = sync.NewCond(&m.mtx) + go m.loop() + return &consulClient{&m} +} + +func copyKVPair(in *consul.KVPair) *consul.KVPair { + value := make([]byte, len(in.Value)) + copy(value, in.Value) + return &consul.KVPair{ + Key: in.Key, + CreateIndex: in.CreateIndex, + ModifyIndex: in.ModifyIndex, + LockIndex: in.LockIndex, + Flags: in.Flags, + Value: value, + Session: in.Session, + } +} + +// periodic loop to wake people up, so they can honour timeouts +func (m *mockKV) loop() { + for range time.Tick(1 * time.Second) { + m.mtx.Lock() + m.cond.Broadcast() + m.mtx.Unlock() + } +} + +func (m *mockKV) CAS(p *consul.KVPair, q *consul.WriteOptions) (bool, *consul.WriteMeta, error) { + m.mtx.Lock() + defer m.mtx.Unlock() + existing, ok := m.kvps[p.Key] + if ok && existing.ModifyIndex != p.ModifyIndex { + return false, nil, nil + } + if ok { + existing.Value = p.Value + } else { + m.kvps[p.Key] = copyKVPair(p) + } + m.kvps[p.Key].ModifyIndex++ + m.kvps[p.Key].LockIndex = m.next + m.next++ + m.cond.Broadcast() + return true, nil, nil +} + +func (m *mockKV) Get(key string, q *consul.QueryOptions) (*consul.KVPair, *consul.QueryMeta, error) { + m.mtx.Lock() + defer m.mtx.Unlock() + value, ok := m.kvps[key] + if !ok { + return nil, nil, nil + } + for q.WaitIndex >= value.ModifyIndex { + m.cond.Wait() + } + return copyKVPair(value), nil, nil +} + +func (m *mockKV) List(prefix string, q *consul.QueryOptions) (consul.KVPairs, *consul.QueryMeta, error) { + m.mtx.Lock() + defer m.mtx.Unlock() + deadline := time.Now().Add(q.WaitTime) + for m.next <= q.WaitIndex && time.Now().Before(deadline) { + m.cond.Wait() + } + if time.Now().After(deadline) { + return nil, &consul.QueryMeta{LastIndex: q.WaitIndex}, nil + } + result := consul.KVPairs{} + for _, kvp := range m.kvps { + if kvp.LockIndex >= q.WaitIndex { + result = append(result, copyKVPair(kvp)) + } + } + return result, &consul.QueryMeta{LastIndex: m.next}, nil +} diff --git a/common/xfer/pipes.go b/common/xfer/pipes.go index 895b2bb33..be1f96a84 100644 --- a/common/xfer/pipes.go +++ b/common/xfer/pipes.go @@ -22,6 +22,7 @@ type pipe struct { mtx sync.Mutex wg sync.WaitGroup port, starboard io.ReadWriter + closers []io.Closer quit chan struct{} closed bool onClose func() @@ -53,6 +54,9 @@ func NewPipe() Pipe { }{ r2, w1, }, + closers: []io.Closer{ + r1, r2, w1, w2, + }, quit: make(chan struct{}), } } @@ -67,6 +71,9 @@ func (p *pipe) Close() error { if !p.closed { p.closed = true close(p.quit) + for _, c := range p.closers { + c.Close() + } onClose = p.onClose } p.mtx.Unlock() @@ -116,6 +123,10 @@ func (p *pipe) CopyToWebsocket(end io.ReadWriter, conn Websocket) error { return } + if p.Closed() { + return + } + if _, err := end.Write(buf); err != nil { errors <- err return @@ -133,6 +144,10 @@ func (p *pipe) CopyToWebsocket(end io.ReadWriter, conn Websocket) error { return } + if p.Closed() { + return + } + if err := conn.WriteMessage(websocket.BinaryMessage, buf[:n]); err != nil { errors <- err return diff --git a/common/xfer/websocket.go b/common/xfer/websocket.go index 019262d2e..4778d9c85 100644 --- a/common/xfer/websocket.go +++ b/common/xfer/websocket.go @@ -90,7 +90,7 @@ func (p *pingingWebsocket) ping() { defer p.writeLock.Unlock() if err := p.conn.WriteControl(websocket.PingMessage, nil, mtime.Now().Add(writeWait)); err != nil { log.Errorf("websocket ping error: %v", err) - p.Close() + p.conn.Close() return } p.pinger.Reset(pingPeriod) @@ -158,6 +158,8 @@ func (p *pingingWebsocket) ReadJSON(v interface{}) error { // Close closes the connection func (p *pingingWebsocket) Close() error { + p.writeLock.Lock() + defer p.writeLock.Unlock() p.pinger.Stop() return p.conn.Close() } diff --git a/prog/app.go b/prog/app.go index 31acf3f64..6fa06fb13 100644 --- a/prog/app.go +++ b/prog/app.go @@ -23,6 +23,7 @@ import ( "github.com/weaveworks/scope/app" "github.com/weaveworks/scope/app/multitenant" "github.com/weaveworks/scope/common/middleware" + "github.com/weaveworks/scope/common/network" "github.com/weaveworks/scope/common/weave" "github.com/weaveworks/scope/common/xfer" "github.com/weaveworks/scope/probe/docker" @@ -129,7 +130,12 @@ func pipeRouterFactory(userIDer multitenant.UserIDer, pipeRouterURL, consulInf s if err != nil { return nil, err } - return multitenant.NewConsulPipeRouter(consulClient, strings.TrimPrefix(parsed.Path, "/"), consulInf, userIDer) + advertise, err := network.GetFirstAddressOf(consulInf) + if err != nil { + return nil, err + } + addr := fmt.Sprintf("%s:4444", advertise) + return multitenant.NewConsulPipeRouter(consulClient, strings.TrimPrefix(parsed.Path, "/"), addr, userIDer), nil } return nil, fmt.Errorf("Invalid pipe router '%s'", pipeRouterURL)