diff --git a/app/multitenant/consul_pipe_router.go b/app/multitenant/consul_pipe_router.go index 7452d14ab..2d94925d3 100644 --- a/app/multitenant/consul_pipe_router.go +++ b/app/multitenant/consul_pipe_router.go @@ -108,10 +108,9 @@ type consulPipeRouter struct { client *consul.Client userIDer UserIDer - mtx sync.Mutex - cond *sync.Cond - pipes map[string]xfer.Pipe // Active pipes - bridges map[string]*bridgeConnection + pipes map[string]xfer.Pipe // Active pipes + bridges map[string]*bridgeConnection + actorChan chan func() // Used by Stop() quit chan struct{} @@ -137,14 +136,15 @@ func NewConsulPipeRouter(addr, prefix, inf string, userIDer UserIDer) (app.PipeR client: client, userIDer: userIDer, - pipes: map[string]xfer.Pipe{}, - bridges: map[string]*bridgeConnection{}, - quit: make(chan struct{}), + pipes: map[string]xfer.Pipe{}, + bridges: map[string]*bridgeConnection{}, + actorChan: make(chan func()), + quit: make(chan struct{}), } - pipeRouter.cond = sync.NewCond(&pipeRouter.mtx) - pipeRouter.wait.Add(1) + pipeRouter.wait.Add(2) go pipeRouter.watchAll() go pipeRouter.privateAPI() + go pipeRouter.actor() return pipeRouter, nil } @@ -153,6 +153,18 @@ func (pr *consulPipeRouter) Stop() { pr.wait.Wait() } +func (pr *consulPipeRouter) actor() { + defer pr.wait.Done() + for { + select { + case f := <-pr.actorChan: + f() + case <-pr.quit: + return + } + } +} + // watchAll listens to all pipe updates from consul. // This is effectively a distributed, consistent actor routine. // All state changes for this pipe router happen in this loop, @@ -194,7 +206,7 @@ func (pr *consulPipeRouter) watchAll() { continue } - pr.handlePipeUpdate(kvp.Key, cp) + pr.actorChan <- func() { pr.handlePipeUpdate(kvp.Key, cp) } } } } @@ -206,13 +218,8 @@ func (pr *consulPipeRouter) handlePipeUpdate(key string, cp consulPipe) { // should ensure our local pipe (and bridge) is closed. if !cp.DeletedAt.IsZero() || !cp.eitherEndFor(pr.advertise) { log.Infof("Pipe %s not in use on this node.", key) - - pr.mtx.Lock() pipe, ok := pr.pipes[key] delete(pr.pipes, key) - pr.mtx.Unlock() - - // These could block, so don't hold the locks. if ok { pipe.Close() } @@ -230,17 +237,14 @@ func (pr *consulPipeRouter) handlePipeUpdate(key string, cp consulPipe) { } // 2. If this pipe if for us, we should have a pipe for it. - pr.mtx.Lock() pipe, ok := pr.pipes[key] if !ok { pipe = xfer.NewPipe() pr.pipes[key] = pipe } - pr.mtx.Unlock() // 3. Ensure there is a bridging connection for this pipe. // Semantics are the owner of the UIEnd connects to the owner of the ProbeEnd - // NB no need to hold lock for pr.bridged, this is the only place we use it. shouldBridge := cp.DeletedAt.IsZero() && cp.end(app.UIEnd) != cp.end(app.ProbeEnd) && cp.end(app.UIEnd) == pr.advertise && @@ -269,11 +273,15 @@ func (pr *consulPipeRouter) privateAPI() { router.Methods("GET"). MatcherFunc(app.URLMatcher("/private/api/pipe/{key}")). HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - key := mux.Vars(r)["key"] - pr.mtx.Lock() - pipe, ok := pr.pipes[key] - pr.mtx.Unlock() - if !ok { + var ( + key = mux.Vars(r)["key"] + pc = make(chan xfer.Pipe) + ) + pr.actorChan <- func() { + pc <- pr.pipes[key] + } + pipe := <-pc + if pipe == nil { http.NotFound(w, r) return } @@ -410,42 +418,6 @@ func (pr *consulPipeRouter) watch(key string, deadline time.Time, f func(*consul return nil, fmt.Errorf("Timed out waiting on %s", key) } -// negotiate tries to ensure the given end of the given pipe -// is 'owned' by this pipe service replica in consul. -func (pr *consulPipeRouter) negotiate(key string, e app.End) (*consulPipe, error) { - // First try and establish a record with us owning one end - _, err := pr.cas(key, func(p *consulPipe) (*consulPipe, bool, error) { - if p == nil { - p = &consulPipe{ - CreatedAt: mtime.Now(), - } - } - if !p.DeletedAt.IsZero() { - return nil, false, fmt.Errorf("Pipe %s has been deleted", key) - } - end := p.end(e) - if end != "" && end != pr.advertise { - return nil, true, fmt.Errorf("Error: Pipe %s has existing connection to %s", key, end) - } - p.setEnd(e, pr.advertise) - p.incr(e) - return p, false, nil - }) - if err != nil { - return nil, err - } - - // Next wait for other end to connect - // at this point we 'own' one end - pipe, err := pr.watch(key, mtime.Now().Add(10*longPollDuration), func(p *consulPipe) (bool, error) { - return p.otherEnd(e) == "", nil - }) - if err != nil { - return nil, err - } - return pipe, nil -} - func (pr *consulPipeRouter) Exists(ctx context.Context, id string) (bool, error) { userID, err := pr.userIDer(ctx) if err != nil { @@ -467,20 +439,40 @@ func (pr *consulPipeRouter) Get(ctx context.Context, id string, e app.End) (xfer key := fmt.Sprintf("%s%s-%s", pr.prefix, userID, id) log.Infof("Get %s:%s", key, e) - // first check we own the pipe in consul - _, err = pr.negotiate(key, e) + // Try to ensure the given end of the given pipe + // is 'owned' by this pipe service replica in consul. + _, err = pr.cas(key, func(p *consulPipe) (*consulPipe, bool, error) { + if p == nil { + p = &consulPipe{ + CreatedAt: mtime.Now(), + } + } + if !p.DeletedAt.IsZero() { + return nil, false, fmt.Errorf("Pipe %s has been deleted", key) + } + end := p.end(e) + if end != "" && end != pr.advertise { + return nil, true, fmt.Errorf("Error: Pipe %s has existing connection to %s", key, end) + } + p.setEnd(e, pr.advertise) + p.incr(e) + return p, false, nil + }) if err != nil { return nil, nil, err } // next see if we already have a active pipe - pr.mtx.Lock() - pipe, ok := pr.pipes[key] - if !ok { - pipe = xfer.NewPipe() - pr.pipes[key] = pipe + pc := make(chan xfer.Pipe) + pr.actorChan <- func() { + pipe, ok := pr.pipes[key] + if !ok { + pipe = xfer.NewPipe() + pr.pipes[key] = pipe + } + pc <- pipe } - pr.mtx.Unlock() + pipe := <-pc myEnd, _ := pipe.Ends() if e == app.ProbeEnd {