Files
weave-scope/app/pipe_router.go
Marc Carré 2ba50b8b3d Update golang.org/x/net/context to latest
```
$ gvt delete golang.org/x/net/context
$ gvt fetch golang.org/x/net/context
2018/07/23 18:03:49 Fetching: golang.org/x/net/context
$ git grep -l "golang.org/x/net/context" | grep -v vendor | xargs sed -i '' 's:golang.org/x/net/context:context:g'
$ git grep -l "context/ctxhttp" | grep -v vendor | xargs sed -i '' 's:context/ctxhttp:golang.org/x/net/context/ctxhttp:g'
$ gofmt -s -w app
$ gofmt -s -w common
$ gofmt -s -w probe
$ gofmt -s -w prog
$ gofmt -s -w tools
```
fixed a bunch of:
```
cannot use func literal (type func("github.com/weaveworks/scope/vendor/golang.org/x/net/context".Context) error) as type func("context".Context) error
```
2018-07-23 20:10:18 +02:00

202 lines
3.9 KiB
Go

package app
import (
"fmt"
"io"
"sync"
"time"
"context"
log "github.com/sirupsen/logrus"
"github.com/weaveworks/common/mtime"
"github.com/weaveworks/scope/common/xfer"
)
const (
gcInterval = 30 * time.Second // we check all the pipes every 30s
pipeTimeout = 1 * time.Minute // pipes are closed when a client hasn't been connected for 1 minute
gcTimeout = 10 * time.Minute // after another 10 minutes, tombstoned pipes are forgotten
)
// End is an enum for either end of the pipe.
type End int
// Valid values of type End
const (
UIEnd = iota
ProbeEnd
)
func (e End) String() string {
if e == UIEnd {
return "ui"
}
return "probe"
}
// PipeRouter stores pipes and allows you to connect to either end of them.
type PipeRouter interface {
Exists(context.Context, string) (bool, error)
Get(context.Context, string, End) (xfer.Pipe, io.ReadWriter, error)
Release(context.Context, string, End) error
Delete(context.Context, string) error
Stop()
}
// PipeRouter connects incoming and outgoing pipes.
type localPipeRouter struct {
sync.Mutex
wait sync.WaitGroup
quit chan struct{}
pipes map[string]*pipe
}
// for each end of the pipe, we keep a reference count & lastUsedTIme,
// such that we can timeout pipes when either end is inactive.
type pipe struct {
xfer.Pipe
tombstoneTime time.Time
ui, probe end
}
type end struct {
refCount int
lastUsedTime time.Time
}
func (p *pipe) end(end End) (*end, io.ReadWriter) {
ui, probe := p.Ends()
if end == UIEnd {
return &p.ui, ui
}
return &p.probe, probe
}
// NewLocalPipeRouter returns a new local (in-memory) pipe router.
func NewLocalPipeRouter() PipeRouter {
pipeRouter := &localPipeRouter{
quit: make(chan struct{}),
pipes: map[string]*pipe{},
}
pipeRouter.wait.Add(1)
go pipeRouter.gcLoop()
return pipeRouter
}
func (pr *localPipeRouter) Exists(_ context.Context, id string) (bool, error) {
pr.Lock()
defer pr.Unlock()
p, ok := pr.pipes[id]
if !ok {
return true, nil
}
return !p.Closed(), nil
}
func (pr *localPipeRouter) Get(_ context.Context, id string, e End) (xfer.Pipe, io.ReadWriter, error) {
pr.Lock()
defer pr.Unlock()
p, ok := pr.pipes[id]
if !ok {
log.Debugf("Creating pipe id %s", id)
p = &pipe{
ui: end{lastUsedTime: mtime.Now()},
probe: end{lastUsedTime: mtime.Now()},
Pipe: xfer.NewPipe(),
}
pr.pipes[id] = p
}
if p.Closed() {
return nil, nil, fmt.Errorf("Pipe %s closed", id)
}
end, endIO := p.end(e)
end.refCount++
return p, endIO, nil
}
func (pr *localPipeRouter) Release(_ context.Context, id string, e End) error {
pr.Lock()
defer pr.Unlock()
p, ok := pr.pipes[id]
if !ok {
return fmt.Errorf("Pipe %s not found", id)
}
end, _ := p.end(e)
end.refCount--
if end.refCount > 0 {
return nil
}
if !p.Closed() {
end.lastUsedTime = mtime.Now()
}
return nil
}
func (pr *localPipeRouter) Delete(_ context.Context, id string) error {
pr.Lock()
defer pr.Unlock()
p, ok := pr.pipes[id]
if !ok {
return nil
}
p.Close()
p.tombstoneTime = mtime.Now()
return nil
}
func (pr *localPipeRouter) Stop() {
close(pr.quit)
pr.wait.Wait()
}
func (pr *localPipeRouter) gcLoop() {
defer pr.wait.Done()
ticker := time.Tick(gcInterval)
for {
select {
case <-pr.quit:
return
case <-ticker:
}
pr.timeout()
pr.garbageCollect()
}
}
func (pr *localPipeRouter) timeout() {
pr.Lock()
defer pr.Unlock()
now := mtime.Now()
for id, pipe := range pr.pipes {
if pipe.Closed() || (pipe.ui.refCount > 0 && pipe.probe.refCount > 0) {
continue
}
if (pipe.ui.refCount == 0 && now.Sub(pipe.ui.lastUsedTime) >= pipeTimeout) ||
(pipe.probe.refCount == 0 && now.Sub(pipe.probe.lastUsedTime) >= pipeTimeout) {
log.Infof("Timing out pipe %s", id)
pipe.Close()
pipe.tombstoneTime = now
}
}
}
func (pr *localPipeRouter) garbageCollect() {
pr.Lock()
defer pr.Unlock()
now := mtime.Now()
for pipeID, pipe := range pr.pipes {
if pipe.Closed() && now.Sub(pipe.tombstoneTime) >= gcTimeout {
delete(pr.pipes, pipeID)
}
}
}