Actor model for pipe router

This commit is contained in:
Tom Wilkie
2016-03-17 14:47:39 +00:00
parent 9885706402
commit 0a8fd8a3a6

View File

@@ -108,10 +108,9 @@ type consulPipeRouter struct {
client *consul.Client
userIDer UserIDer
mtx sync.Mutex
cond *sync.Cond
pipes map[string]xfer.Pipe // Active pipes
bridges map[string]*bridgeConnection
pipes map[string]xfer.Pipe // Active pipes
bridges map[string]*bridgeConnection
actorChan chan func()
// Used by Stop()
quit chan struct{}
@@ -137,14 +136,15 @@ func NewConsulPipeRouter(addr, prefix, inf string, userIDer UserIDer) (app.PipeR
client: client,
userIDer: userIDer,
pipes: map[string]xfer.Pipe{},
bridges: map[string]*bridgeConnection{},
quit: make(chan struct{}),
pipes: map[string]xfer.Pipe{},
bridges: map[string]*bridgeConnection{},
actorChan: make(chan func()),
quit: make(chan struct{}),
}
pipeRouter.cond = sync.NewCond(&pipeRouter.mtx)
pipeRouter.wait.Add(1)
pipeRouter.wait.Add(2)
go pipeRouter.watchAll()
go pipeRouter.privateAPI()
go pipeRouter.actor()
return pipeRouter, nil
}
@@ -153,6 +153,18 @@ func (pr *consulPipeRouter) Stop() {
pr.wait.Wait()
}
func (pr *consulPipeRouter) actor() {
defer pr.wait.Done()
for {
select {
case f := <-pr.actorChan:
f()
case <-pr.quit:
return
}
}
}
// watchAll listens to all pipe updates from consul.
// This is effectively a distributed, consistent actor routine.
// All state changes for this pipe router happen in this loop,
@@ -194,7 +206,7 @@ func (pr *consulPipeRouter) watchAll() {
continue
}
pr.handlePipeUpdate(kvp.Key, cp)
pr.actorChan <- func() { pr.handlePipeUpdate(kvp.Key, cp) }
}
}
}
@@ -206,13 +218,8 @@ func (pr *consulPipeRouter) handlePipeUpdate(key string, cp consulPipe) {
// should ensure our local pipe (and bridge) is closed.
if !cp.DeletedAt.IsZero() || !cp.eitherEndFor(pr.advertise) {
log.Infof("Pipe %s not in use on this node.", key)
pr.mtx.Lock()
pipe, ok := pr.pipes[key]
delete(pr.pipes, key)
pr.mtx.Unlock()
// These could block, so don't hold the locks.
if ok {
pipe.Close()
}
@@ -230,17 +237,14 @@ func (pr *consulPipeRouter) handlePipeUpdate(key string, cp consulPipe) {
}
// 2. If this pipe if for us, we should have a pipe for it.
pr.mtx.Lock()
pipe, ok := pr.pipes[key]
if !ok {
pipe = xfer.NewPipe()
pr.pipes[key] = pipe
}
pr.mtx.Unlock()
// 3. Ensure there is a bridging connection for this pipe.
// Semantics are the owner of the UIEnd connects to the owner of the ProbeEnd
// NB no need to hold lock for pr.bridged, this is the only place we use it.
shouldBridge := cp.DeletedAt.IsZero() &&
cp.end(app.UIEnd) != cp.end(app.ProbeEnd) &&
cp.end(app.UIEnd) == pr.advertise &&
@@ -269,11 +273,15 @@ func (pr *consulPipeRouter) privateAPI() {
router.Methods("GET").
MatcherFunc(app.URLMatcher("/private/api/pipe/{key}")).
HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
key := mux.Vars(r)["key"]
pr.mtx.Lock()
pipe, ok := pr.pipes[key]
pr.mtx.Unlock()
if !ok {
var (
key = mux.Vars(r)["key"]
pc = make(chan xfer.Pipe)
)
pr.actorChan <- func() {
pc <- pr.pipes[key]
}
pipe := <-pc
if pipe == nil {
http.NotFound(w, r)
return
}
@@ -410,42 +418,6 @@ func (pr *consulPipeRouter) watch(key string, deadline time.Time, f func(*consul
return nil, fmt.Errorf("Timed out waiting on %s", key)
}
// negotiate tries to ensure the given end of the given pipe
// is 'owned' by this pipe service replica in consul.
func (pr *consulPipeRouter) negotiate(key string, e app.End) (*consulPipe, error) {
// First try and establish a record with us owning one end
_, err := pr.cas(key, func(p *consulPipe) (*consulPipe, bool, error) {
if p == nil {
p = &consulPipe{
CreatedAt: mtime.Now(),
}
}
if !p.DeletedAt.IsZero() {
return nil, false, fmt.Errorf("Pipe %s has been deleted", key)
}
end := p.end(e)
if end != "" && end != pr.advertise {
return nil, true, fmt.Errorf("Error: Pipe %s has existing connection to %s", key, end)
}
p.setEnd(e, pr.advertise)
p.incr(e)
return p, false, nil
})
if err != nil {
return nil, err
}
// Next wait for other end to connect
// at this point we 'own' one end
pipe, err := pr.watch(key, mtime.Now().Add(10*longPollDuration), func(p *consulPipe) (bool, error) {
return p.otherEnd(e) == "", nil
})
if err != nil {
return nil, err
}
return pipe, nil
}
func (pr *consulPipeRouter) Exists(ctx context.Context, id string) (bool, error) {
userID, err := pr.userIDer(ctx)
if err != nil {
@@ -467,20 +439,40 @@ func (pr *consulPipeRouter) Get(ctx context.Context, id string, e app.End) (xfer
key := fmt.Sprintf("%s%s-%s", pr.prefix, userID, id)
log.Infof("Get %s:%s", key, e)
// first check we own the pipe in consul
_, err = pr.negotiate(key, e)
// Try to ensure the given end of the given pipe
// is 'owned' by this pipe service replica in consul.
_, err = pr.cas(key, func(p *consulPipe) (*consulPipe, bool, error) {
if p == nil {
p = &consulPipe{
CreatedAt: mtime.Now(),
}
}
if !p.DeletedAt.IsZero() {
return nil, false, fmt.Errorf("Pipe %s has been deleted", key)
}
end := p.end(e)
if end != "" && end != pr.advertise {
return nil, true, fmt.Errorf("Error: Pipe %s has existing connection to %s", key, end)
}
p.setEnd(e, pr.advertise)
p.incr(e)
return p, false, nil
})
if err != nil {
return nil, nil, err
}
// next see if we already have a active pipe
pr.mtx.Lock()
pipe, ok := pr.pipes[key]
if !ok {
pipe = xfer.NewPipe()
pr.pipes[key] = pipe
pc := make(chan xfer.Pipe)
pr.actorChan <- func() {
pipe, ok := pr.pipes[key]
if !ok {
pipe = xfer.NewPipe()
pr.pipes[key] = pipe
}
pc <- pipe
}
pr.mtx.Unlock()
pipe := <-pc
myEnd, _ := pipe.Ends()
if e == app.ProbeEnd {