Files
weave-scope/app/pipe_router.go
Tom Wilkie 5f7f74bf1b Refactor app for multitenancy
- Add interfaces to allow for alternative implementations for Collector, ControlRouter
  and PipeRouter.
- Pass contexts on http handlers to these interfaces.  Although not used by the current
  (local, in-memory) implementations, the idea is this will be used to pass headers to
  implementations which support multitenancy (by, for instance, putting an authenticating
  reverse proxy in form of the app, and then inspecting the headers of the request for
  a used id).
2016-02-22 14:54:04 +00:00

181 lines
3.5 KiB
Go

package app
import (
"io"
"sync"
"time"
log "github.com/Sirupsen/logrus"
"golang.org/x/net/context"
"github.com/weaveworks/scope/common/mtime"
"github.com/weaveworks/scope/common/xfer"
)
const (
gcInterval = 30 * time.Second // we check all the pipes every 30s
pipeTimeout = 1 * time.Minute // pipes are closed when a client hasn't been connected for 1 minute
gcTimeout = 10 * time.Minute // after another 10 minutes, tombstoned pipes are forgotten
)
// End is an enum for either end of the pipe.
type End int
// Valid values of type End
const (
UIEnd = iota
ProbeEnd
)
// PipeRouter stores pipes and allows you to connect to either end of them.
type PipeRouter interface {
Get(context.Context, string, End) (xfer.Pipe, io.ReadWriter, bool)
Release(context.Context, string, End)
Delete(context.Context, string)
Stop()
}
// PipeRouter connects incoming and outgoing pipes.
type localPipeRouter struct {
sync.Mutex
wait sync.WaitGroup
quit chan struct{}
pipes map[string]*pipe
}
// for each end of the pipe, we keep a reference count & lastUsedTIme,
// such that we can timeout pipes when either end is inactive.
type pipe struct {
xfer.Pipe
tombstoneTime time.Time
ui, probe end
}
type end struct {
refCount int
lastUsedTime time.Time
}
func (p *pipe) end(end End) (*end, io.ReadWriter) {
ui, probe := p.Ends()
if end == UIEnd {
return &p.ui, ui
}
return &p.probe, probe
}
// NewLocalPipeRouter returns a new local (in-memory) pipe router.
func NewLocalPipeRouter() PipeRouter {
pipeRouter := &localPipeRouter{
quit: make(chan struct{}),
pipes: map[string]*pipe{},
}
pipeRouter.wait.Add(1)
go pipeRouter.gcLoop()
return pipeRouter
}
func (pr *localPipeRouter) Get(_ context.Context, id string, e End) (xfer.Pipe, io.ReadWriter, bool) {
pr.Lock()
defer pr.Unlock()
p, ok := pr.pipes[id]
if !ok {
log.Infof("Creating pipe id %s", id)
p = &pipe{
ui: end{lastUsedTime: mtime.Now()},
probe: end{lastUsedTime: mtime.Now()},
Pipe: xfer.NewPipe(),
}
pr.pipes[id] = p
}
if p.Closed() {
return nil, nil, false
}
end, endIO := p.end(e)
end.refCount++
return p, endIO, true
}
func (pr *localPipeRouter) Release(_ context.Context, id string, e End) {
pr.Lock()
defer pr.Unlock()
p, ok := pr.pipes[id]
if !ok {
// uh oh
return
}
end, _ := p.end(e)
end.refCount--
if end.refCount > 0 {
return
}
if !p.Closed() {
end.lastUsedTime = mtime.Now()
}
}
func (pr *localPipeRouter) Delete(_ context.Context, id string) {
pr.Lock()
defer pr.Unlock()
p, ok := pr.pipes[id]
if !ok {
return
}
p.Close()
p.tombstoneTime = mtime.Now()
}
func (pr *localPipeRouter) Stop() {
close(pr.quit)
pr.wait.Wait()
}
func (pr *localPipeRouter) gcLoop() {
defer pr.wait.Done()
ticker := time.Tick(gcInterval)
for {
select {
case <-pr.quit:
return
case <-ticker:
}
pr.timeout()
pr.garbageCollect()
}
}
func (pr *localPipeRouter) timeout() {
pr.Lock()
defer pr.Unlock()
now := mtime.Now()
for id, pipe := range pr.pipes {
if pipe.Closed() || (pipe.ui.refCount > 0 && pipe.probe.refCount > 0) {
continue
}
if (pipe.ui.refCount == 0 && now.Sub(pipe.ui.lastUsedTime) >= pipeTimeout) ||
(pipe.probe.refCount == 0 && now.Sub(pipe.probe.lastUsedTime) >= pipeTimeout) {
log.Infof("Timing out pipe %s", id)
pipe.Close()
pipe.tombstoneTime = now
}
}
}
func (pr *localPipeRouter) garbageCollect() {
pr.Lock()
defer pr.Unlock()
now := mtime.Now()
for pipeID, pipe := range pr.pipes {
if pipe.Closed() && now.Sub(pipe.tombstoneTime) >= gcTimeout {
delete(pr.pipes, pipeID)
}
}
}