Merge pull request #997 from weaveworks/multi-tenant-refactor

Refactor app for multitenancy
This commit is contained in:
Tom Wilkie
2016-02-22 17:01:16 +00:00
15 changed files with 422 additions and 323 deletions

View File

@@ -2,12 +2,13 @@ package app
import (
"net/http"
"golang.org/x/net/context"
)
// Raw report handler
func makeRawReportHandler(rep Reporter) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
// r.ParseForm()
respondWith(w, http.StatusOK, rep.Report())
func makeRawReportHandler(rep Reporter) CtxHandlerFunc {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
respondWith(w, http.StatusOK, rep.Report(ctx))
}
}

View File

@@ -6,6 +6,7 @@ import (
"sync"
"github.com/gorilla/mux"
"golang.org/x/net/context"
"github.com/weaveworks/scope/render"
"github.com/weaveworks/scope/report"
@@ -192,9 +193,9 @@ func (r *registry) walk(f func(APITopologyDesc)) {
}
// makeTopologyList returns a handler that yields an APITopologyList.
func (r *registry) makeTopologyList(rep Reporter) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, req *http.Request) {
topologies := r.renderTopologies(rep.Report(), req)
func (r *registry) makeTopologyList(rep Reporter) CtxHandlerFunc {
return func(ctx context.Context, w http.ResponseWriter, req *http.Request) {
topologies := r.renderTopologies(rep.Report(ctx), req)
respondWith(w, http.StatusOK, topologies)
}
}
@@ -252,27 +253,27 @@ func renderedForRequest(r *http.Request, topology APITopologyDesc) render.Render
return renderer
}
type reportRenderHandler func(Reporter, render.Renderer, http.ResponseWriter, *http.Request)
type reportRenderHandler func(context.Context, Reporter, render.Renderer, http.ResponseWriter, *http.Request)
func (r *registry) captureRenderer(rep Reporter, f reportRenderHandler) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
func (r *registry) captureRenderer(rep Reporter, f reportRenderHandler) CtxHandlerFunc {
return func(ctx context.Context, w http.ResponseWriter, req *http.Request) {
topology, ok := r.get(mux.Vars(req)["topology"])
if !ok {
http.NotFound(w, req)
return
}
renderer := renderedForRequest(req, topology)
f(rep, renderer, w, req)
f(ctx, rep, renderer, w, req)
}
}
func (r *registry) captureRendererWithoutFilters(rep Reporter, topologyID string, f reportRenderHandler) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
func (r *registry) captureRendererWithoutFilters(rep Reporter, topologyID string, f reportRenderHandler) CtxHandlerFunc {
return func(ctx context.Context, w http.ResponseWriter, req *http.Request) {
topology, ok := r.get(topologyID)
if !ok {
http.NotFound(w, req)
return
}
f(rep, topology.renderer, w, req)
f(ctx, rep, topology.renderer, w, req)
}
}

View File

@@ -6,6 +6,7 @@ import (
log "github.com/Sirupsen/logrus"
"github.com/gorilla/websocket"
"golang.org/x/net/context"
"github.com/weaveworks/scope/common/xfer"
"github.com/weaveworks/scope/render"
@@ -28,14 +29,14 @@ type APINode struct {
}
// Full topology.
func handleTopology(rep Reporter, renderer render.Renderer, w http.ResponseWriter, r *http.Request) {
func handleTopology(ctx context.Context, rep Reporter, renderer render.Renderer, w http.ResponseWriter, r *http.Request) {
respondWith(w, http.StatusOK, APITopology{
Nodes: renderer.Render(rep.Report()).Prune(),
Nodes: renderer.Render(rep.Report(ctx)).Prune(),
})
}
// Websocket for the full topology. This route overlaps with the next.
func handleWs(rep Reporter, renderer render.Renderer, w http.ResponseWriter, r *http.Request) {
func handleWs(ctx context.Context, rep Reporter, renderer render.Renderer, w http.ResponseWriter, r *http.Request) {
if err := r.ParseForm(); err != nil {
respondWith(w, http.StatusInternalServerError, err.Error())
return
@@ -48,15 +49,15 @@ func handleWs(rep Reporter, renderer render.Renderer, w http.ResponseWriter, r *
return
}
}
handleWebsocket(w, r, rep, renderer, loop)
handleWebsocket(ctx, w, r, rep, renderer, loop)
}
// Individual nodes.
func handleNode(nodeID string) func(Reporter, render.Renderer, http.ResponseWriter, *http.Request) {
return func(rep Reporter, renderer render.Renderer, w http.ResponseWriter, r *http.Request) {
func handleNode(nodeID string) func(context.Context, Reporter, render.Renderer, http.ResponseWriter, *http.Request) {
return func(ctx context.Context, rep Reporter, renderer render.Renderer, w http.ResponseWriter, r *http.Request) {
var (
rpt = rep.Report()
node, ok = renderer.Render(rep.Report())[nodeID]
rpt = rep.Report(ctx)
node, ok = renderer.Render(rep.Report(ctx))[nodeID]
)
if !ok {
http.NotFound(w, r)
@@ -71,6 +72,7 @@ var upgrader = websocket.Upgrader{
}
func handleWebsocket(
ctx context.Context,
w http.ResponseWriter,
r *http.Request,
rep Reporter,
@@ -88,6 +90,7 @@ func handleWebsocket(
go func(c *websocket.Conn) {
for { // just discard everything the browser sends
if _, _, err := c.NextReader(); err != nil {
log.Println("err:", err)
close(quit)
break
}
@@ -99,15 +102,16 @@ func handleWebsocket(
tick = time.Tick(loop)
wait = make(chan struct{}, 1)
)
rep.WaitOn(wait)
defer rep.UnWait(wait)
rep.WaitOn(ctx, wait)
defer rep.UnWait(ctx, wait)
for {
newTopo := renderer.Render(rep.Report()).Prune()
newTopo := renderer.Render(rep.Report(ctx)).Prune()
diff := render.TopoDiff(previousTopo, newTopo)
previousTopo = newTopo
if err := conn.SetWriteDeadline(time.Now().Add(websocketTimeout)); err != nil {
log.Println("err:", err)
return
}

View File

@@ -6,6 +6,7 @@ import (
"time"
"github.com/spaolacci/murmur3"
"golang.org/x/net/context"
"github.com/weaveworks/scope/report"
)
@@ -13,15 +14,15 @@ import (
// Reporter is something that can produce reports on demand. It's a convenient
// interface for parts of the app, and several experimental components.
type Reporter interface {
Report() report.Report
WaitOn(chan struct{})
UnWait(chan struct{})
Report(context.Context) report.Report
WaitOn(context.Context, chan struct{})
UnWait(context.Context, chan struct{})
}
// Adder is something that can accept reports. It's a convenient interface for
// parts of the app, and several experimental components.
type Adder interface {
Add(report.Report)
Add(context.Context, report.Report)
}
// A Collector is a Reporter and an Adder
@@ -45,13 +46,13 @@ type waitableCondition struct {
waiters map[chan struct{}]struct{}
}
func (wc *waitableCondition) WaitOn(waiter chan struct{}) {
func (wc *waitableCondition) WaitOn(_ context.Context, waiter chan struct{}) {
wc.Lock()
wc.waiters[waiter] = struct{}{}
wc.Unlock()
}
func (wc *waitableCondition) UnWait(waiter chan struct{}) {
func (wc *waitableCondition) UnWait(_ context.Context, waiter chan struct{}) {
wc.Lock()
delete(wc.waiters, waiter)
wc.Unlock()
@@ -82,7 +83,7 @@ func NewCollector(window time.Duration) Collector {
var now = time.Now
// Add adds a report to the collector's internal state. It implements Adder.
func (c *collector) Add(rpt report.Report) {
func (c *collector) Add(_ context.Context, rpt report.Report) {
c.mtx.Lock()
defer c.mtx.Unlock()
c.reports = append(c.reports, timestampReport{now(), rpt})
@@ -95,7 +96,7 @@ func (c *collector) Add(rpt report.Report) {
// Report returns a merged report over all added reports. It implements
// Reporter.
func (c *collector) Report() report.Report {
func (c *collector) Report(_ context.Context) report.Report {
c.mtx.Lock()
defer c.mtx.Unlock()

View File

@@ -4,6 +4,8 @@ import (
"testing"
"time"
"golang.org/x/net/context"
"github.com/weaveworks/scope/app"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/test"
@@ -11,6 +13,7 @@ import (
)
func TestCollector(t *testing.T) {
ctx := context.Background()
window := time.Millisecond
c := app.NewCollector(window)
@@ -20,32 +23,33 @@ func TestCollector(t *testing.T) {
r2 := report.MakeReport()
r2.Endpoint.AddNode("bar", report.MakeNode())
if want, have := report.MakeReport(), c.Report(); !reflect.DeepEqual(want, have) {
if want, have := report.MakeReport(), c.Report(ctx); !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
}
c.Add(r1)
if want, have := r1, c.Report(); !reflect.DeepEqual(want, have) {
c.Add(ctx, r1)
if want, have := r1, c.Report(ctx); !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
}
c.Add(r2)
c.Add(ctx, r2)
merged := report.MakeReport()
merged = merged.Merge(r1)
merged = merged.Merge(r2)
if want, have := merged, c.Report(); !reflect.DeepEqual(want, have) {
if want, have := merged, c.Report(ctx); !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
}
}
func TestCollectorWait(t *testing.T) {
ctx := context.Background()
window := time.Millisecond
c := app.NewCollector(window)
waiter := make(chan struct{}, 1)
c.WaitOn(waiter)
defer c.UnWait(waiter)
c.WaitOn(ctx, waiter)
defer c.UnWait(ctx, waiter)
c.(interface {
Broadcast()
}).Broadcast()

69
app/control_router.go Normal file
View File

@@ -0,0 +1,69 @@
package app
import (
"fmt"
"math/rand"
"sync"
"golang.org/x/net/context"
"github.com/weaveworks/scope/common/xfer"
)
// ControlRouter is a thing that can route control requests and responses
// between the UI and a probe.
type ControlRouter interface {
Handle(ctx context.Context, probeID string, req xfer.Request) (xfer.Response, error)
Register(ctx context.Context, probeID string, handler xfer.ControlHandlerFunc) (int64, error)
Deregister(ctx context.Context, probeID string, id int64) error
}
// NewLocalControlRouter creates a new ControlRouter that does everything
// locally, in memory.
func NewLocalControlRouter() ControlRouter {
return &localControlRouter{
probes: map[string]probe{},
}
}
type localControlRouter struct {
sync.Mutex
probes map[string]probe
}
type probe struct {
id int64
handler xfer.ControlHandlerFunc
}
func (l *localControlRouter) Handle(_ context.Context, probeID string, req xfer.Request) (xfer.Response, error) {
l.Lock()
probe, ok := l.probes[probeID]
l.Unlock()
if !ok {
return xfer.Response{}, fmt.Errorf("Probe %s is not connected right now...", probeID)
}
return probe.handler(req), nil
}
func (l *localControlRouter) Register(_ context.Context, probeID string, handler xfer.ControlHandlerFunc) (int64, error) {
l.Lock()
defer l.Unlock()
id := rand.Int63()
l.probes[probeID] = probe{
id: id,
handler: handler,
}
return id, nil
}
func (l *localControlRouter) Deregister(_ context.Context, probeID string, id int64) error {
l.Lock()
defer l.Unlock()
// NB probe might have reconnected in the mean time, need to ensure we do not
// delete new connection! Also, it might have connected then deleted itself!
if l.probes[probeID].id == id {
delete(l.probes, probeID)
}
return nil
}

View File

@@ -1,122 +1,84 @@
package app
import (
"math/rand"
"net/http"
"net/rpc"
"sync"
log "github.com/Sirupsen/logrus"
"github.com/gorilla/mux"
"golang.org/x/net/context"
"github.com/weaveworks/scope/common/xfer"
)
// RegisterControlRoutes registers the various control routes with a http mux.
func RegisterControlRoutes(router *mux.Router) {
controlRouter := &controlRouter{
probes: map[string]controlHandler{},
}
router.Methods("GET").Path("/api/control/ws").HandlerFunc(controlRouter.handleProbeWS)
router.Methods("POST").MatcherFunc(URLMatcher("/api/control/{probeID}/{nodeID}/{control}")).HandlerFunc(controlRouter.handleControl)
}
type controlHandler struct {
id int64
client *rpc.Client
}
type controlRouter struct {
sync.Mutex
probes map[string]controlHandler
}
func (ch *controlHandler) handle(req xfer.Request) xfer.Response {
var res xfer.Response
if err := ch.client.Call("control.Handle", req, &res); err != nil {
return xfer.ResponseError(err)
}
return res
}
func (cr *controlRouter) get(probeID string) (controlHandler, bool) {
cr.Lock()
defer cr.Unlock()
handler, ok := cr.probes[probeID]
return handler, ok
}
func (cr *controlRouter) set(probeID string, handler controlHandler) {
cr.Lock()
defer cr.Unlock()
cr.probes[probeID] = handler
}
func (cr *controlRouter) rm(probeID string, handler controlHandler) {
cr.Lock()
defer cr.Unlock()
// NB probe might have reconnected in the mean time, need to ensure we do not
// delete new connection! Also, it might have connected then deleted itself!
if cr.probes[probeID].id == handler.id {
delete(cr.probes, probeID)
}
func RegisterControlRoutes(router *mux.Router, cr ControlRouter) {
router.Methods("GET").Path("/api/control/ws").
HandlerFunc(requestContextDecorator(handleProbeWS(cr)))
router.Methods("POST").MatcherFunc(URLMatcher("/api/control/{probeID}/{nodeID}/{control}")).
HandlerFunc(requestContextDecorator(handleControl(cr)))
}
// handleControl routes control requests from the client to the appropriate
// probe. Its is blocking.
func (cr *controlRouter) handleControl(w http.ResponseWriter, r *http.Request) {
var (
vars = mux.Vars(r)
probeID = vars["probeID"]
nodeID = vars["nodeID"]
control = vars["control"]
)
handler, ok := cr.get(probeID)
if !ok {
log.Errorf("Probe %s is not connected right now...", probeID)
http.NotFound(w, r)
return
func handleControl(cr ControlRouter) CtxHandlerFunc {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
var (
vars = mux.Vars(r)
probeID = vars["probeID"]
nodeID = vars["nodeID"]
control = vars["control"]
)
result, err := cr.Handle(ctx, probeID, xfer.Request{
AppID: UniqueID,
NodeID: nodeID,
Control: control,
})
if err != nil {
respondWith(w, http.StatusBadRequest, err.Error())
return
}
if result.Error != "" {
respondWith(w, http.StatusBadRequest, result.Error)
return
}
respondWith(w, http.StatusOK, result)
}
result := handler.handle(xfer.Request{
AppID: UniqueID,
NodeID: nodeID,
Control: control,
})
if result.Error != "" {
respondWith(w, http.StatusBadRequest, result.Error)
return
}
respondWith(w, http.StatusOK, result)
}
// handleProbeWS accepts websocket connections from the probe and registers
// them in the control router, such that HandleControl calls can find them.
func (cr *controlRouter) handleProbeWS(w http.ResponseWriter, r *http.Request) {
probeID := r.Header.Get(xfer.ScopeProbeIDHeader)
if probeID == "" {
respondWith(w, http.StatusBadRequest, xfer.ScopeProbeIDHeader)
return
func handleProbeWS(cr ControlRouter) CtxHandlerFunc {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
probeID := r.Header.Get(xfer.ScopeProbeIDHeader)
if probeID == "" {
respondWith(w, http.StatusBadRequest, xfer.ScopeProbeIDHeader)
return
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("Error upgrading control websocket: %v", err)
return
}
defer conn.Close()
codec := xfer.NewJSONWebsocketCodec(conn)
client := rpc.NewClientWithCodec(codec)
defer client.Close()
id, err := cr.Register(ctx, probeID, func(req xfer.Request) xfer.Response {
var res xfer.Response
if err := client.Call("control.Handle", req, &res); err != nil {
return xfer.ResponseError(err)
}
return res
})
if err != nil {
respondWith(w, http.StatusBadRequest, err.Error())
return
}
defer cr.Deregister(ctx, probeID, id)
codec.WaitForReadError()
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Errorf("Error upgrading to websocket: %v", err)
return
}
defer conn.Close()
codec := xfer.NewJSONWebsocketCodec(conn)
client := rpc.NewClientWithCodec(codec)
handler := controlHandler{
id: rand.Int63(),
client: client,
}
cr.set(probeID, handler)
codec.WaitForReadError()
cr.rm(probeID, handler)
client.Close()
}

View File

@@ -18,7 +18,7 @@ import (
func TestControl(t *testing.T) {
router := mux.NewRouter()
app.RegisterControlRoutes(router)
app.RegisterControlRoutes(router, app.NewLocalControlRouter())
server := httptest.NewServer(router)
defer server.Close()

View File

@@ -3,12 +3,14 @@ package app_test
import (
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/test/fixture"
"golang.org/x/net/context"
)
// StaticReport is used as a fixture in tests. It emulates an xfer.Collector.
type StaticReport struct{}
func (s StaticReport) Report() report.Report { return fixture.Report }
func (s StaticReport) Add(report.Report) {}
func (s StaticReport) WaitOn(chan struct{}) {}
func (s StaticReport) UnWait(chan struct{}) {}
func (s StaticReport) Report(context.Context) report.Report { return fixture.Report }
func (s StaticReport) Add(context.Context, report.Report) {}
func (s StaticReport) WaitOn(context.Context, chan struct{}) {}
func (s StaticReport) UnWait(context.Context, chan struct{}) {}

180
app/pipe_router.go Normal file
View File

@@ -0,0 +1,180 @@
package app
import (
"io"
"sync"
"time"
log "github.com/Sirupsen/logrus"
"golang.org/x/net/context"
"github.com/weaveworks/scope/common/mtime"
"github.com/weaveworks/scope/common/xfer"
)
const (
gcInterval = 30 * time.Second // we check all the pipes every 30s
pipeTimeout = 1 * time.Minute // pipes are closed when a client hasn't been connected for 1 minute
gcTimeout = 10 * time.Minute // after another 10 minutes, tombstoned pipes are forgotten
)
// End is an enum for either end of the pipe.
type End int
// Valid values of type End
const (
UIEnd = iota
ProbeEnd
)
// PipeRouter stores pipes and allows you to connect to either end of them.
type PipeRouter interface {
Get(context.Context, string, End) (xfer.Pipe, io.ReadWriter, bool)
Release(context.Context, string, End)
Delete(context.Context, string)
Stop()
}
// PipeRouter connects incoming and outgoing pipes.
type localPipeRouter struct {
sync.Mutex
wait sync.WaitGroup
quit chan struct{}
pipes map[string]*pipe
}
// for each end of the pipe, we keep a reference count & lastUsedTIme,
// such that we can timeout pipes when either end is inactive.
type pipe struct {
xfer.Pipe
tombstoneTime time.Time
ui, probe end
}
type end struct {
refCount int
lastUsedTime time.Time
}
func (p *pipe) end(end End) (*end, io.ReadWriter) {
ui, probe := p.Ends()
if end == UIEnd {
return &p.ui, ui
}
return &p.probe, probe
}
// NewLocalPipeRouter returns a new local (in-memory) pipe router.
func NewLocalPipeRouter() PipeRouter {
pipeRouter := &localPipeRouter{
quit: make(chan struct{}),
pipes: map[string]*pipe{},
}
pipeRouter.wait.Add(1)
go pipeRouter.gcLoop()
return pipeRouter
}
func (pr *localPipeRouter) Get(_ context.Context, id string, e End) (xfer.Pipe, io.ReadWriter, bool) {
pr.Lock()
defer pr.Unlock()
p, ok := pr.pipes[id]
if !ok {
log.Infof("Creating pipe id %s", id)
p = &pipe{
ui: end{lastUsedTime: mtime.Now()},
probe: end{lastUsedTime: mtime.Now()},
Pipe: xfer.NewPipe(),
}
pr.pipes[id] = p
}
if p.Closed() {
return nil, nil, false
}
end, endIO := p.end(e)
end.refCount++
return p, endIO, true
}
func (pr *localPipeRouter) Release(_ context.Context, id string, e End) {
pr.Lock()
defer pr.Unlock()
p, ok := pr.pipes[id]
if !ok {
// uh oh
return
}
end, _ := p.end(e)
end.refCount--
if end.refCount > 0 {
return
}
if !p.Closed() {
end.lastUsedTime = mtime.Now()
}
}
func (pr *localPipeRouter) Delete(_ context.Context, id string) {
pr.Lock()
defer pr.Unlock()
p, ok := pr.pipes[id]
if !ok {
return
}
p.Close()
p.tombstoneTime = mtime.Now()
}
func (pr *localPipeRouter) Stop() {
close(pr.quit)
pr.wait.Wait()
}
func (pr *localPipeRouter) gcLoop() {
defer pr.wait.Done()
ticker := time.Tick(gcInterval)
for {
select {
case <-pr.quit:
return
case <-ticker:
}
pr.timeout()
pr.garbageCollect()
}
}
func (pr *localPipeRouter) timeout() {
pr.Lock()
defer pr.Unlock()
now := mtime.Now()
for id, pipe := range pr.pipes {
if pipe.Closed() || (pipe.ui.refCount > 0 && pipe.probe.refCount > 0) {
continue
}
if (pipe.ui.refCount == 0 && now.Sub(pipe.ui.lastUsedTime) >= pipeTimeout) ||
(pipe.probe.refCount == 0 && now.Sub(pipe.probe.lastUsedTime) >= pipeTimeout) {
log.Infof("Timing out pipe %s", id)
pipe.Close()
pipe.tombstoneTime = now
}
}
}
func (pr *localPipeRouter) garbageCollect() {
pr.Lock()
defer pr.Unlock()
now := mtime.Now()
for pipeID, pipe := range pr.pipes {
if pipe.Closed() && now.Sub(pipe.tombstoneTime) >= gcTimeout {
delete(pr.pipes, pipeID)
}
}
}

View File

@@ -1,199 +1,54 @@
package app
import (
"io"
"net/http"
"sync"
"time"
log "github.com/Sirupsen/logrus"
"github.com/gorilla/mux"
"github.com/weaveworks/scope/common/mtime"
"github.com/weaveworks/scope/common/xfer"
"golang.org/x/net/context"
)
const (
gcInterval = 30 * time.Second // we check all the pipes every 30s
pipeTimeout = 1 * time.Minute // pipes are closed when a client hasn't been connected for 1 minute
gcTimeout = 10 * time.Minute // after another 10 minutes, tombstoned pipes are forgotten
)
// PipeRouter connects incoming and outgoing pipes.
type PipeRouter struct {
sync.Mutex
wait sync.WaitGroup
quit chan struct{}
pipes map[string]*pipe
}
// for each end of the pipe, we keep a reference count & lastUsedTIme,
// such that we can timeout pipes when either end is inactive.
type end struct {
refCount int
lastUsedTime time.Time
}
type pipe struct {
ui, probe end
tombstoneTime time.Time
xfer.Pipe
}
// RegisterPipeRoutes registers the pipe routes
func RegisterPipeRoutes(router *mux.Router) *PipeRouter {
pipeRouter := &PipeRouter{
quit: make(chan struct{}),
pipes: map[string]*pipe{},
}
pipeRouter.wait.Add(1)
go pipeRouter.gcLoop()
func RegisterPipeRoutes(router *mux.Router, pr PipeRouter) {
router.Methods("GET").
Path("/api/pipe/{pipeID}").
HandlerFunc(pipeRouter.handleWs(func(p *pipe) (*end, io.ReadWriter) {
uiEnd, _ := p.Ends()
return &p.ui, uiEnd
}))
HandlerFunc(requestContextDecorator(handlePipeWs(pr, UIEnd)))
router.Methods("GET").
Path("/api/pipe/{pipeID}/probe").
HandlerFunc(pipeRouter.handleWs(func(p *pipe) (*end, io.ReadWriter) {
_, probeEnd := p.Ends()
return &p.probe, probeEnd
}))
HandlerFunc(requestContextDecorator(handlePipeWs(pr, ProbeEnd)))
router.Methods("DELETE", "POST").
Path("/api/pipe/{pipeID}").
HandlerFunc(pipeRouter.delete)
return pipeRouter
HandlerFunc(requestContextDecorator(deletePipe(pr)))
}
// Stop stops the pipeRouter
func (pr *PipeRouter) Stop() {
close(pr.quit)
pr.wait.Wait()
}
func (pr *PipeRouter) gcLoop() {
defer pr.wait.Done()
ticker := time.Tick(gcInterval)
for {
select {
case <-pr.quit:
return
case <-ticker:
}
pr.timeout()
pr.garbageCollect()
}
}
func (pr *PipeRouter) timeout() {
pr.Lock()
defer pr.Unlock()
now := mtime.Now()
for id, pipe := range pr.pipes {
if pipe.Closed() || (pipe.ui.refCount > 0 && pipe.probe.refCount > 0) {
continue
}
if (pipe.ui.refCount == 0 && now.Sub(pipe.ui.lastUsedTime) >= pipeTimeout) ||
(pipe.probe.refCount == 0 && now.Sub(pipe.probe.lastUsedTime) >= pipeTimeout) {
log.Infof("Timing out pipe %s", id)
pipe.Close()
pipe.tombstoneTime = now
}
}
}
func (pr *PipeRouter) garbageCollect() {
pr.Lock()
defer pr.Unlock()
now := mtime.Now()
for pipeID, pipe := range pr.pipes {
if pipe.Closed() && now.Sub(pipe.tombstoneTime) >= gcTimeout {
delete(pr.pipes, pipeID)
}
}
}
func (pr *PipeRouter) getOrCreate(id string) (*pipe, bool) {
pr.Lock()
defer pr.Unlock()
p, ok := pr.pipes[id]
if !ok {
log.Infof("Creating pipe id %s", id)
p = &pipe{
ui: end{lastUsedTime: mtime.Now()},
probe: end{lastUsedTime: mtime.Now()},
Pipe: xfer.NewPipe(),
}
pr.pipes[id] = p
}
if p.Closed() {
return nil, false
}
return p, true
}
func (pr *PipeRouter) retain(id string, pipe *pipe, end *end) bool {
pr.Lock()
defer pr.Unlock()
if pipe.Closed() {
return false
}
end.refCount++
return true
}
func (pr *PipeRouter) release(id string, pipe *pipe, end *end) {
pr.Lock()
defer pr.Unlock()
end.refCount--
if end.refCount != 0 {
return
}
if !pipe.Closed() {
end.lastUsedTime = mtime.Now()
}
}
func (pr *PipeRouter) handleWs(endSelector func(*pipe) (*end, io.ReadWriter)) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
pipeID := mux.Vars(r)["pipeID"]
pipe, ok := pr.getOrCreate(pipeID)
func handlePipeWs(pr PipeRouter, end End) CtxHandlerFunc {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["pipeID"]
pipe, endIO, ok := pr.Get(ctx, id, end)
if !ok {
http.NotFound(w, r)
return
}
endRef, endIO := endSelector(pipe)
if !pr.retain(pipeID, pipe, endRef) {
http.NotFound(w, r)
return
}
defer pr.release(pipeID, pipe, endRef)
defer pr.Release(ctx, id, end)
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Errorf("Error upgrading to websocket: %v", err)
log.Errorf("Error upgrading pipe %s (%d) websocket: %v", id, end, err)
return
}
defer conn.Close()
log.Infof("Pipe success %s (%d)", id, end)
pipe.CopyToWebsocket(endIO, conn)
}
}
func (pr *PipeRouter) delete(w http.ResponseWriter, r *http.Request) {
pipeID := mux.Vars(r)["pipeID"]
pipe, ok := pr.getOrCreate(pipeID)
if ok && pr.retain(pipeID, pipe, &pipe.ui) {
func deletePipe(pr PipeRouter) CtxHandlerFunc {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
pipeID := mux.Vars(r)["pipeID"]
log.Infof("Closing pipe %s", pipeID)
pipe.Close()
pipe.tombstoneTime = mtime.Now()
pr.release(pipeID, pipe, &pipe.ui)
pr.Delete(ctx, pipeID)
}
}

View File

@@ -12,6 +12,7 @@ import (
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"golang.org/x/net/context"
"github.com/weaveworks/scope/common/mtime"
"github.com/weaveworks/scope/common/xfer"
@@ -22,7 +23,8 @@ import (
func TestPipeTimeout(t *testing.T) {
router := mux.NewRouter()
pr := RegisterPipeRoutes(router)
pr := NewLocalPipeRouter().(*localPipeRouter)
RegisterPipeRoutes(router, pr)
pr.Stop() // we don't want the loop running in the background
mtime.NowForce(time.Now())
@@ -30,7 +32,8 @@ func TestPipeTimeout(t *testing.T) {
// create a new pipe.
id := "foo"
pipe, ok := pr.getOrCreate(id)
ctx := context.Background()
pipe, _, ok := pr.Get(ctx, id, UIEnd)
if !ok {
t.Fatalf("not ok")
}
@@ -65,7 +68,8 @@ func (a adapter) PipeClose(_, pipeID string) error {
func TestPipeClose(t *testing.T) {
router := mux.NewRouter()
pr := RegisterPipeRoutes(router)
pr := NewLocalPipeRouter()
RegisterPipeRoutes(router, pr)
defer pr.Stop()
server := httptest.NewServer(router)

View File

@@ -10,6 +10,7 @@ import (
"github.com/PuerkitoBio/ghost/handlers"
"github.com/gorilla/mux"
"github.com/ugorji/go/codec"
"golang.org/x/net/context"
"github.com/weaveworks/scope/common/hostname"
"github.com/weaveworks/scope/common/xfer"
@@ -24,6 +25,19 @@ var (
UniqueID = "0"
)
// RequestCtxKey is key used for request entry in context
const RequestCtxKey = "request"
// CtxHandlerFunc is a http.HandlerFunc, with added contexts
type CtxHandlerFunc func(context.Context, http.ResponseWriter, *http.Request)
func requestContextDecorator(f CtxHandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ctx := context.WithValue(context.Background(), RequestCtxKey, r)
f(ctx, w, r)
}
}
// URLMatcher uses request.RequestURI (the raw, unparsed request) to attempt
// to match pattern. It does this as go's URL.Parse method is broken, and
// mistakenly unescapes the Path before parsing it. This breaks %2F (encoded
@@ -76,13 +90,13 @@ func gzipHandler(h http.HandlerFunc) http.HandlerFunc {
// routes should be added to a router and passed to postRoutes.
func TopologyHandler(c Reporter, preRoutes *mux.Router, postRoutes http.Handler) http.Handler {
get := preRoutes.Methods("GET").Subrouter()
get.HandleFunc("/api", gzipHandler(apiHandler))
get.HandleFunc("/api/topology", gzipHandler(topologyRegistry.makeTopologyList(c)))
get.HandleFunc("/api", gzipHandler(requestContextDecorator(apiHandler)))
get.HandleFunc("/api/topology", gzipHandler(requestContextDecorator(topologyRegistry.makeTopologyList(c))))
get.HandleFunc("/api/topology/{topology}",
gzipHandler(topologyRegistry.captureRenderer(c, handleTopology)))
gzipHandler(requestContextDecorator(topologyRegistry.captureRenderer(c, handleTopology))))
get.HandleFunc("/api/topology/{topology}/ws",
topologyRegistry.captureRenderer(c, handleWs)) // NB not gzip!
get.HandleFunc("/api/report", gzipHandler(makeRawReportHandler(c)))
requestContextDecorator(topologyRegistry.captureRenderer(c, handleWs))) // NB not gzip!
get.HandleFunc("/api/report", gzipHandler(requestContextDecorator(makeRawReportHandler(c))))
// We pull in the http.DefaultServeMux to get the pprof routes
preRoutes.PathPrefix("/debug/pprof").Handler(http.DefaultServeMux)
@@ -107,9 +121,9 @@ func TopologyHandler(c Reporter, preRoutes *mux.Router, postRoutes http.Handler)
return
}
handler := gzipHandler(topologyRegistry.captureRendererWithoutFilters(
handler := gzipHandler(requestContextDecorator(topologyRegistry.captureRendererWithoutFilters(
c, topologyID, handleNode(nodeID),
))
)))
handler.ServeHTTP(w, r)
})
}
@@ -117,7 +131,7 @@ func TopologyHandler(c Reporter, preRoutes *mux.Router, postRoutes http.Handler)
// RegisterReportPostHandler registers the handler for report submission
func RegisterReportPostHandler(a Adder, router *mux.Router) {
post := router.Methods("POST").Subrouter()
post.HandleFunc("/api/report", func(w http.ResponseWriter, r *http.Request) {
post.HandleFunc("/api/report", requestContextDecorator(func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
var (
rpt report.Report
reader = r.Body
@@ -142,15 +156,15 @@ func RegisterReportPostHandler(a Adder, router *mux.Router) {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
a.Add(rpt)
a.Add(ctx, rpt)
if len(rpt.Pod.Nodes) > 0 {
topologyRegistry.enableKubernetesTopologies()
}
w.WriteHeader(http.StatusOK)
})
}))
}
func apiHandler(w http.ResponseWriter, r *http.Request) {
func apiHandler(_ context.Context, w http.ResponseWriter, r *http.Request) {
respondWith(w, http.StatusOK, xfer.Details{
ID: UniqueID,
Version: Version,

View File

@@ -12,6 +12,7 @@ import (
"github.com/gorilla/mux"
"github.com/ugorji/go/codec"
"golang.org/x/net/context"
"github.com/weaveworks/scope/app"
"github.com/weaveworks/scope/test"
@@ -73,7 +74,8 @@ func TestReportPostHandler(t *testing.T) {
t.Fatalf("Error posting report: %d", resp.StatusCode)
}
if want, have := fixture.Report.Endpoint.Nodes, c.Report().Endpoint.Nodes; len(have) == 0 || len(want) != len(have) {
ctx := context.Background()
if want, have := fixture.Report.Endpoint.Nodes, c.Report(ctx).Endpoint.Nodes; len(have) == 0 || len(want) != len(have) {
t.Fatalf("Content-Type %s: %v", contentType, test.Diff(have, want))
}
}

View File

@@ -23,8 +23,8 @@ import (
func router(c app.Collector) http.Handler {
router := mux.NewRouter()
app.RegisterReportPostHandler(c, router)
app.RegisterControlRoutes(router)
app.RegisterPipeRoutes(router)
app.RegisterControlRoutes(router, app.NewLocalControlRouter())
app.RegisterPipeRoutes(router, app.NewLocalPipeRouter())
return app.TopologyHandler(c, router, http.FileServer(FS(false)))
}