mirror of
https://github.com/weaveworks/scope.git
synced 2026-03-02 17:50:39 +00:00
- Add interfaces to allow for alternative implementations for Collector, ControlRouter and PipeRouter. - Pass contexts on http handlers to these interfaces. Although not used by the current (local, in-memory) implementations, the idea is this will be used to pass headers to implementations which support multitenancy (by, for instance, putting an authenticating reverse proxy in form of the app, and then inspecting the headers of the request for a used id).
181 lines
3.5 KiB
Go
181 lines
3.5 KiB
Go
package app
|
|
|
|
import (
|
|
"io"
|
|
"sync"
|
|
"time"
|
|
|
|
log "github.com/Sirupsen/logrus"
|
|
"golang.org/x/net/context"
|
|
|
|
"github.com/weaveworks/scope/common/mtime"
|
|
"github.com/weaveworks/scope/common/xfer"
|
|
)
|
|
|
|
const (
|
|
gcInterval = 30 * time.Second // we check all the pipes every 30s
|
|
pipeTimeout = 1 * time.Minute // pipes are closed when a client hasn't been connected for 1 minute
|
|
gcTimeout = 10 * time.Minute // after another 10 minutes, tombstoned pipes are forgotten
|
|
)
|
|
|
|
// End is an enum for either end of the pipe.
|
|
type End int
|
|
|
|
// Valid values of type End
|
|
const (
|
|
UIEnd = iota
|
|
ProbeEnd
|
|
)
|
|
|
|
// PipeRouter stores pipes and allows you to connect to either end of them.
|
|
type PipeRouter interface {
|
|
Get(context.Context, string, End) (xfer.Pipe, io.ReadWriter, bool)
|
|
Release(context.Context, string, End)
|
|
Delete(context.Context, string)
|
|
Stop()
|
|
}
|
|
|
|
// PipeRouter connects incoming and outgoing pipes.
|
|
type localPipeRouter struct {
|
|
sync.Mutex
|
|
wait sync.WaitGroup
|
|
quit chan struct{}
|
|
pipes map[string]*pipe
|
|
}
|
|
|
|
// for each end of the pipe, we keep a reference count & lastUsedTIme,
|
|
// such that we can timeout pipes when either end is inactive.
|
|
type pipe struct {
|
|
xfer.Pipe
|
|
|
|
tombstoneTime time.Time
|
|
|
|
ui, probe end
|
|
}
|
|
|
|
type end struct {
|
|
refCount int
|
|
lastUsedTime time.Time
|
|
}
|
|
|
|
func (p *pipe) end(end End) (*end, io.ReadWriter) {
|
|
ui, probe := p.Ends()
|
|
if end == UIEnd {
|
|
return &p.ui, ui
|
|
}
|
|
return &p.probe, probe
|
|
}
|
|
|
|
// NewLocalPipeRouter returns a new local (in-memory) pipe router.
|
|
func NewLocalPipeRouter() PipeRouter {
|
|
pipeRouter := &localPipeRouter{
|
|
quit: make(chan struct{}),
|
|
pipes: map[string]*pipe{},
|
|
}
|
|
pipeRouter.wait.Add(1)
|
|
go pipeRouter.gcLoop()
|
|
return pipeRouter
|
|
}
|
|
|
|
func (pr *localPipeRouter) Get(_ context.Context, id string, e End) (xfer.Pipe, io.ReadWriter, bool) {
|
|
pr.Lock()
|
|
defer pr.Unlock()
|
|
p, ok := pr.pipes[id]
|
|
if !ok {
|
|
log.Infof("Creating pipe id %s", id)
|
|
p = &pipe{
|
|
ui: end{lastUsedTime: mtime.Now()},
|
|
probe: end{lastUsedTime: mtime.Now()},
|
|
Pipe: xfer.NewPipe(),
|
|
}
|
|
pr.pipes[id] = p
|
|
}
|
|
if p.Closed() {
|
|
return nil, nil, false
|
|
}
|
|
end, endIO := p.end(e)
|
|
end.refCount++
|
|
return p, endIO, true
|
|
}
|
|
|
|
func (pr *localPipeRouter) Release(_ context.Context, id string, e End) {
|
|
pr.Lock()
|
|
defer pr.Unlock()
|
|
|
|
p, ok := pr.pipes[id]
|
|
if !ok {
|
|
// uh oh
|
|
return
|
|
}
|
|
|
|
end, _ := p.end(e)
|
|
end.refCount--
|
|
if end.refCount > 0 {
|
|
return
|
|
}
|
|
|
|
if !p.Closed() {
|
|
end.lastUsedTime = mtime.Now()
|
|
}
|
|
}
|
|
|
|
func (pr *localPipeRouter) Delete(_ context.Context, id string) {
|
|
pr.Lock()
|
|
defer pr.Unlock()
|
|
p, ok := pr.pipes[id]
|
|
if !ok {
|
|
return
|
|
}
|
|
p.Close()
|
|
p.tombstoneTime = mtime.Now()
|
|
}
|
|
|
|
func (pr *localPipeRouter) Stop() {
|
|
close(pr.quit)
|
|
pr.wait.Wait()
|
|
}
|
|
|
|
func (pr *localPipeRouter) gcLoop() {
|
|
defer pr.wait.Done()
|
|
ticker := time.Tick(gcInterval)
|
|
for {
|
|
select {
|
|
case <-pr.quit:
|
|
return
|
|
case <-ticker:
|
|
}
|
|
|
|
pr.timeout()
|
|
pr.garbageCollect()
|
|
}
|
|
}
|
|
|
|
func (pr *localPipeRouter) timeout() {
|
|
pr.Lock()
|
|
defer pr.Unlock()
|
|
now := mtime.Now()
|
|
for id, pipe := range pr.pipes {
|
|
if pipe.Closed() || (pipe.ui.refCount > 0 && pipe.probe.refCount > 0) {
|
|
continue
|
|
}
|
|
|
|
if (pipe.ui.refCount == 0 && now.Sub(pipe.ui.lastUsedTime) >= pipeTimeout) ||
|
|
(pipe.probe.refCount == 0 && now.Sub(pipe.probe.lastUsedTime) >= pipeTimeout) {
|
|
log.Infof("Timing out pipe %s", id)
|
|
pipe.Close()
|
|
pipe.tombstoneTime = now
|
|
}
|
|
}
|
|
}
|
|
|
|
func (pr *localPipeRouter) garbageCollect() {
|
|
pr.Lock()
|
|
defer pr.Unlock()
|
|
now := mtime.Now()
|
|
for pipeID, pipe := range pr.pipes {
|
|
if pipe.Closed() && now.Sub(pipe.tombstoneTime) >= gcTimeout {
|
|
delete(pr.pipes, pipeID)
|
|
}
|
|
}
|
|
}
|