Files
weave-scope/app/multitenant/consul_pipe_router.go
Bryan Boreham 323aa46d1c fix (pipes): check websocket errors inside CopyToWebsocket()
Previously we were treating EOF on the reader as no-error, meaning
that operations like Kubernetes Describe would retry endlessly when
finished.
2020-05-06 10:04:40 +00:00

455 lines
12 KiB
Go

package multitenant
import (
"fmt"
"io"
"net/http"
"net/url"
"sync"
"time"
"context"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
opentracing "github.com/opentracing/opentracing-go"
log "github.com/sirupsen/logrus"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/mtime"
"github.com/weaveworks/scope/app"
"github.com/weaveworks/scope/common/xfer"
)
const (
gcInterval = 30 * time.Second // we check all the pipes every 30s
pipeTimeout = 1 * time.Minute // pipes are closed when a client hasn't been connected for 1 minute
gcTimeout = 10 * time.Minute // after another 10 minutes, tombstoned pipes are forgotten
)
var (
wsDialer = &websocket.Dialer{}
)
// TODO deal with garbage collection
type consulPipe struct {
CreatedAt, DeletedAt time.Time
UIAddr, ProbeAddr string // Addrs where each end is connected
UIRef, ProbeRef int // Ref counts
}
func (c *consulPipe) setAddrFor(e app.End, addr string) {
if e == app.UIEnd {
c.UIAddr = addr
} else {
c.ProbeAddr = addr
}
}
func (c *consulPipe) addrFor(e app.End) string {
if e == app.UIEnd {
return c.UIAddr
}
return c.ProbeAddr
}
func (c *consulPipe) eitherEndFor(addr string) bool {
return c.addrFor(app.UIEnd) == addr || c.addrFor(app.ProbeEnd) == addr
}
func (c *consulPipe) acquire(e app.End) int {
if e == app.UIEnd {
c.UIRef++
return c.UIRef
}
c.ProbeRef++
return c.ProbeRef
}
func (c *consulPipe) release(e app.End) int {
if e == app.UIEnd {
c.UIRef--
return c.UIRef
}
c.ProbeRef--
return c.ProbeRef
}
type consulPipeRouter struct {
prefix string
advertise string // Address of this pipe router to advertise in consul
client ConsulClient
userIDer UserIDer
activePipes map[string]xfer.Pipe
bridges map[string]*bridgeConnection
actorChan chan func()
pipeWaiters map[string][]chan xfer.Pipe
// Used by Stop()
quit chan struct{}
wait sync.WaitGroup
}
// NewConsulPipeRouter returns a new consul based router
func NewConsulPipeRouter(client ConsulClient, prefix, advertise string, userIDer UserIDer) app.PipeRouter {
pipeRouter := &consulPipeRouter{
prefix: prefix,
advertise: advertise,
client: client,
userIDer: userIDer,
activePipes: map[string]xfer.Pipe{},
bridges: map[string]*bridgeConnection{},
actorChan: make(chan func()),
pipeWaiters: map[string][]chan xfer.Pipe{},
quit: make(chan struct{}),
}
pipeRouter.wait.Add(2)
go pipeRouter.watchAll()
go pipeRouter.actor()
go pipeRouter.privateAPI()
return pipeRouter
}
func (pr *consulPipeRouter) Stop() {
close(pr.quit)
pr.wait.Wait()
}
func (pr *consulPipeRouter) actor() {
defer pr.wait.Done()
for {
select {
case f := <-pr.actorChan:
f()
case <-pr.quit:
return
}
}
}
// watchAll listens to all pipe updates from consul.
// This is effectively a distributed, consistent actor routine.
// All state changes for this pipe router happen in this loop,
// and all the methods are implemented as CAS's on consul, to
// trigger an event in this loop.
func (pr *consulPipeRouter) watchAll() {
defer pr.wait.Done()
pr.client.WatchPrefix(pr.prefix, &consulPipe{}, pr.quit, func(key string, value interface{}) bool {
cp := *value.(*consulPipe)
select {
case pr.actorChan <- func() { pr.handlePipeUpdate(key, cp) }:
return true
case <-pr.quit:
return false
}
})
}
func (pr *consulPipeRouter) handlePipeUpdate(key string, cp consulPipe) {
// 1. If this pipe is closed, or we're not one of the ends, we
// should ensure our local pipe (and bridge) is closed.
if !cp.DeletedAt.IsZero() || !cp.eitherEndFor(pr.advertise) {
pipe, ok := pr.activePipes[key]
delete(pr.activePipes, key)
if ok {
log.Infof("Deleting pipe %s", key)
pipe.Close()
}
bridge, ok := pr.bridges[key]
delete(pr.bridges, key)
if ok {
bridge.stop()
}
return
}
if !cp.eitherEndFor(pr.advertise) {
return
}
// 2. If this pipe if for us, we should have a pipe for it.
pipe, ok := pr.activePipes[key]
if !ok {
log.Infof("Creating pipe %s", key)
pipe = xfer.NewPipe()
pr.activePipes[key] = pipe
for _, pw := range pr.pipeWaiters[key] {
pw <- pipe
}
delete(pr.pipeWaiters, key)
}
// 3. Ensure there is a bridging connection for this pipe.
// Semantics are the owner of the UIEnd connects to the owner of the ProbeEnd
shouldBridge := cp.DeletedAt.IsZero() &&
cp.addrFor(app.UIEnd) != cp.addrFor(app.ProbeEnd) &&
cp.addrFor(app.UIEnd) == pr.advertise &&
cp.addrFor(app.ProbeEnd) != ""
bridge, ok := pr.bridges[key]
// If we shouldn't be bridging but are, or we should be bridging but are pointing
// at the wrong place, stop the current bridge.
if (!shouldBridge && ok) || (shouldBridge && ok && bridge.addr != cp.addrFor(app.ProbeEnd)) {
delete(pr.bridges, key)
bridge.stop()
ok = false
}
// If we should be bridging and are not, start a new bridge
if shouldBridge && !ok {
bridge = newBridgeConnection(key, cp.addrFor(app.ProbeEnd), pipe)
pr.bridges[key] = bridge
}
}
func (pr *consulPipeRouter) getPipe(key string) xfer.Pipe {
pc := make(chan xfer.Pipe)
select {
case pr.actorChan <- func() { pc <- pr.activePipes[key] }:
return <-pc
case <-pr.quit:
return nil
}
}
func (pr *consulPipeRouter) waitForPipe(key string) xfer.Pipe {
pc := make(chan xfer.Pipe)
select {
case pr.actorChan <- func() {
pipe, ok := pr.activePipes[key]
if ok {
pc <- pipe
} else {
pr.pipeWaiters[key] = append(pr.pipeWaiters[key], pc)
}
}:
return <-pc
case <-pr.quit:
return nil
}
}
func (pr *consulPipeRouter) privateAPI() {
router := mux.NewRouter()
router.Methods("GET").
MatcherFunc(app.URLMatcher("/private/api/pipe/{key}")).
HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
key := mux.Vars(r)["key"]
log.Infof("%s: Server bridge connection started", key)
defer log.Infof("%s: Server bridge connection stopped", key)
pipe := pr.getPipe(key)
if pipe == nil {
log.Errorf("%s: Server bridge connection; Unknown pipe!", key)
w.WriteHeader(http.StatusNotFound)
return
}
conn, err := xfer.Upgrade(w, r, nil)
if err != nil {
log.Errorf("%s: Server bridge connection; Error upgrading to websocket: %v", key, err)
return
}
defer conn.Close()
end, _ := pipe.Ends()
if _, err := pipe.CopyToWebsocket(end, conn); err != nil {
log.Errorf("%s: Server bridge connection; Error copying pipe to websocket: %v", key, err)
}
})
handler := middleware.Tracer{RouteMatcher: router}.Wrap(router)
log.Infof("Serving private API on endpoint %s.", pr.advertise)
log.Infof("Private API terminated: %v", http.ListenAndServe(pr.advertise, handler))
}
func (pr *consulPipeRouter) Exists(ctx context.Context, id string) (bool, error) {
userID, err := pr.userIDer(ctx)
if err != nil {
return false, err
}
key := fmt.Sprintf("%s%s-%s", pr.prefix, userID, id)
consulPipe := consulPipe{}
err = pr.client.Get(ctx, key, &consulPipe)
if err == ErrNotFound {
return false, nil
} else if err != nil {
return false, err
}
return consulPipe.DeletedAt.IsZero(), nil
}
func (pr *consulPipeRouter) Get(ctx context.Context, id string, e app.End) (xfer.Pipe, io.ReadWriter, error) {
userID, err := pr.userIDer(ctx)
if err != nil {
return nil, nil, err
}
key := fmt.Sprintf("%s%s-%s", pr.prefix, userID, id)
log.Infof("Get %s:%s", key, e)
span, ctx := opentracing.StartSpanFromContext(ctx, "PipeRouter Get", opentracing.Tag{"key", key})
defer span.Finish()
// Try to ensure the given end of the given pipe
// is 'owned' by this pipe service replica in consul.
err = pr.client.CAS(ctx, key, &consulPipe{}, func(in interface{}) (interface{}, bool, error) {
var pipe *consulPipe
if in == nil {
pipe = &consulPipe{
CreatedAt: mtime.Now(),
}
} else {
pipe = in.(*consulPipe)
}
if !pipe.DeletedAt.IsZero() {
return nil, false, fmt.Errorf("Pipe %s has been deleted", key)
}
end := pipe.addrFor(e)
if end != "" && end != pr.advertise {
return nil, true, fmt.Errorf("Error: Pipe %s has existing connection to %s", key, end)
}
pipe.setAddrFor(e, pr.advertise)
pipe.acquire(e)
return pipe, false, nil
})
if err != nil {
return nil, nil, err
}
pipe := pr.waitForPipe(key)
myEnd, _ := pipe.Ends()
if e == app.ProbeEnd {
_, myEnd = pipe.Ends()
}
return pipe, myEnd, nil
}
func (pr *consulPipeRouter) Release(ctx context.Context, id string, e app.End) error {
userID, err := pr.userIDer(ctx)
if err != nil {
return err
}
key := fmt.Sprintf("%s%s-%s", pr.prefix, userID, id)
log.Infof("Release %s:%s", key, e)
span, ctx := opentracing.StartSpanFromContext(ctx, "PipeRouter Release", opentracing.Tag{"key", key})
defer span.Finish()
// atomically clear my end of the pipe in consul
return pr.client.CAS(ctx, key, &consulPipe{}, func(in interface{}) (interface{}, bool, error) {
if in == nil {
return nil, false, fmt.Errorf("pipe %s not found", id)
}
p := in.(*consulPipe)
if p.addrFor(e) != pr.advertise {
return nil, false, fmt.Errorf("pipe %s not owned by us", id)
}
refs := p.release(e)
if refs == 0 {
p.setAddrFor(e, "")
}
return p, true, nil
})
}
func (pr *consulPipeRouter) Delete(ctx context.Context, id string) error {
userID, err := pr.userIDer(ctx)
if err != nil {
return err
}
key := fmt.Sprintf("%s%s-%s", pr.prefix, userID, id)
log.Infof("Delete %s", key)
span, ctx := opentracing.StartSpanFromContext(ctx, "PipeRouter Delete", opentracing.Tag{"key", key})
defer span.Finish()
return pr.client.CAS(ctx, key, &consulPipe{}, func(in interface{}) (interface{}, bool, error) {
if in == nil {
return nil, false, fmt.Errorf("Pipe %s not found", id)
}
p := in.(*consulPipe)
p.DeletedAt = mtime.Now()
return p, false, nil
})
}
// A bridgeConnection represents a connection between two pipe router replicas.
// They are created & destroyed in response to events from consul, which in turn
// are triggered when UIs or Probes connect to various pipe routers.
type bridgeConnection struct {
key string
addr string // address to connect to
pipe xfer.Pipe
mtx sync.Mutex
conn xfer.Websocket
stopped bool
wait sync.WaitGroup
}
func newBridgeConnection(key, addr string, pipe xfer.Pipe) *bridgeConnection {
log.Infof("%s: Starting client bridge connection", key)
result := &bridgeConnection{
key: key,
addr: addr,
pipe: pipe,
}
result.wait.Add(1)
go result.loop()
return result
}
func (bc *bridgeConnection) stop() {
log.Infof("%s: Stopping client bridge connection", bc.key)
bc.mtx.Lock()
bc.stopped = true
if bc.conn != nil {
bc.conn.Close()
end, _ := bc.pipe.Ends()
end.Write(nil) // this will cause the other end of wake up and exit
}
bc.mtx.Unlock()
bc.wait.Wait()
}
func (bc *bridgeConnection) loop() {
log.Infof("%s: Client bridge connection started", bc.key)
defer bc.wait.Done()
defer log.Infof("%s: Client bridge connection stopped", bc.key)
_, end := bc.pipe.Ends()
url := fmt.Sprintf("ws://%s/private/api/pipe/%s", bc.addr, url.QueryEscape(bc.key))
for {
bc.mtx.Lock()
bc.conn = nil
if bc.stopped {
bc.mtx.Unlock()
return
}
bc.mtx.Unlock()
// connect to other pipes instance
conn, _, err := xfer.DialWS(wsDialer, url, http.Header{})
if err != nil {
log.Errorf("%s: Client bridge connection; Error connecting to %s: %v", bc.key, url, err)
time.Sleep(time.Second) // TODO backoff
continue
}
bc.mtx.Lock()
if bc.stopped {
bc.mtx.Unlock()
conn.Close()
return
}
bc.conn = conn
bc.mtx.Unlock()
if _, err := bc.pipe.CopyToWebsocket(end, conn); err != nil {
log.Errorf("%s: Client bridge connection; Error copying pipe to websocket: %v", bc.key, err)
}
conn.Close()
}
}