diff --git a/app/api_report.go b/app/api_report.go index f96cdc030..0aa046f04 100644 --- a/app/api_report.go +++ b/app/api_report.go @@ -2,12 +2,13 @@ package app import ( "net/http" + + "golang.org/x/net/context" ) // Raw report handler -func makeRawReportHandler(rep Reporter) func(http.ResponseWriter, *http.Request) { - return func(w http.ResponseWriter, r *http.Request) { - // r.ParseForm() - respondWith(w, http.StatusOK, rep.Report()) +func makeRawReportHandler(rep Reporter) CtxHandlerFunc { + return func(ctx context.Context, w http.ResponseWriter, r *http.Request) { + respondWith(w, http.StatusOK, rep.Report(ctx)) } } diff --git a/app/api_topologies.go b/app/api_topologies.go index 37f4f20eb..4d2e424e8 100644 --- a/app/api_topologies.go +++ b/app/api_topologies.go @@ -6,6 +6,7 @@ import ( "sync" "github.com/gorilla/mux" + "golang.org/x/net/context" "github.com/weaveworks/scope/render" "github.com/weaveworks/scope/report" @@ -192,9 +193,9 @@ func (r *registry) walk(f func(APITopologyDesc)) { } // makeTopologyList returns a handler that yields an APITopologyList. -func (r *registry) makeTopologyList(rep Reporter) func(w http.ResponseWriter, r *http.Request) { - return func(w http.ResponseWriter, req *http.Request) { - topologies := r.renderTopologies(rep.Report(), req) +func (r *registry) makeTopologyList(rep Reporter) CtxHandlerFunc { + return func(ctx context.Context, w http.ResponseWriter, req *http.Request) { + topologies := r.renderTopologies(rep.Report(ctx), req) respondWith(w, http.StatusOK, topologies) } } @@ -252,27 +253,27 @@ func renderedForRequest(r *http.Request, topology APITopologyDesc) render.Render return renderer } -type reportRenderHandler func(Reporter, render.Renderer, http.ResponseWriter, *http.Request) +type reportRenderHandler func(context.Context, Reporter, render.Renderer, http.ResponseWriter, *http.Request) -func (r *registry) captureRenderer(rep Reporter, f reportRenderHandler) http.HandlerFunc { - return func(w http.ResponseWriter, req *http.Request) { +func (r *registry) captureRenderer(rep Reporter, f reportRenderHandler) CtxHandlerFunc { + return func(ctx context.Context, w http.ResponseWriter, req *http.Request) { topology, ok := r.get(mux.Vars(req)["topology"]) if !ok { http.NotFound(w, req) return } renderer := renderedForRequest(req, topology) - f(rep, renderer, w, req) + f(ctx, rep, renderer, w, req) } } -func (r *registry) captureRendererWithoutFilters(rep Reporter, topologyID string, f reportRenderHandler) http.HandlerFunc { - return func(w http.ResponseWriter, req *http.Request) { +func (r *registry) captureRendererWithoutFilters(rep Reporter, topologyID string, f reportRenderHandler) CtxHandlerFunc { + return func(ctx context.Context, w http.ResponseWriter, req *http.Request) { topology, ok := r.get(topologyID) if !ok { http.NotFound(w, req) return } - f(rep, topology.renderer, w, req) + f(ctx, rep, topology.renderer, w, req) } } diff --git a/app/api_topology.go b/app/api_topology.go index 65aba207b..855e671fe 100644 --- a/app/api_topology.go +++ b/app/api_topology.go @@ -6,6 +6,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/gorilla/websocket" + "golang.org/x/net/context" "github.com/weaveworks/scope/common/xfer" "github.com/weaveworks/scope/render" @@ -28,14 +29,14 @@ type APINode struct { } // Full topology. -func handleTopology(rep Reporter, renderer render.Renderer, w http.ResponseWriter, r *http.Request) { +func handleTopology(ctx context.Context, rep Reporter, renderer render.Renderer, w http.ResponseWriter, r *http.Request) { respondWith(w, http.StatusOK, APITopology{ - Nodes: renderer.Render(rep.Report()).Prune(), + Nodes: renderer.Render(rep.Report(ctx)).Prune(), }) } // Websocket for the full topology. This route overlaps with the next. -func handleWs(rep Reporter, renderer render.Renderer, w http.ResponseWriter, r *http.Request) { +func handleWs(ctx context.Context, rep Reporter, renderer render.Renderer, w http.ResponseWriter, r *http.Request) { if err := r.ParseForm(); err != nil { respondWith(w, http.StatusInternalServerError, err.Error()) return @@ -48,15 +49,15 @@ func handleWs(rep Reporter, renderer render.Renderer, w http.ResponseWriter, r * return } } - handleWebsocket(w, r, rep, renderer, loop) + handleWebsocket(ctx, w, r, rep, renderer, loop) } // Individual nodes. -func handleNode(nodeID string) func(Reporter, render.Renderer, http.ResponseWriter, *http.Request) { - return func(rep Reporter, renderer render.Renderer, w http.ResponseWriter, r *http.Request) { +func handleNode(nodeID string) func(context.Context, Reporter, render.Renderer, http.ResponseWriter, *http.Request) { + return func(ctx context.Context, rep Reporter, renderer render.Renderer, w http.ResponseWriter, r *http.Request) { var ( - rpt = rep.Report() - node, ok = renderer.Render(rep.Report())[nodeID] + rpt = rep.Report(ctx) + node, ok = renderer.Render(rep.Report(ctx))[nodeID] ) if !ok { http.NotFound(w, r) @@ -71,6 +72,7 @@ var upgrader = websocket.Upgrader{ } func handleWebsocket( + ctx context.Context, w http.ResponseWriter, r *http.Request, rep Reporter, @@ -88,6 +90,7 @@ func handleWebsocket( go func(c *websocket.Conn) { for { // just discard everything the browser sends if _, _, err := c.NextReader(); err != nil { + log.Println("err:", err) close(quit) break } @@ -99,15 +102,16 @@ func handleWebsocket( tick = time.Tick(loop) wait = make(chan struct{}, 1) ) - rep.WaitOn(wait) - defer rep.UnWait(wait) + rep.WaitOn(ctx, wait) + defer rep.UnWait(ctx, wait) for { - newTopo := renderer.Render(rep.Report()).Prune() + newTopo := renderer.Render(rep.Report(ctx)).Prune() diff := render.TopoDiff(previousTopo, newTopo) previousTopo = newTopo if err := conn.SetWriteDeadline(time.Now().Add(websocketTimeout)); err != nil { + log.Println("err:", err) return } diff --git a/app/collector.go b/app/collector.go index fff99788c..6c2f72730 100644 --- a/app/collector.go +++ b/app/collector.go @@ -6,6 +6,7 @@ import ( "time" "github.com/spaolacci/murmur3" + "golang.org/x/net/context" "github.com/weaveworks/scope/report" ) @@ -13,15 +14,15 @@ import ( // Reporter is something that can produce reports on demand. It's a convenient // interface for parts of the app, and several experimental components. type Reporter interface { - Report() report.Report - WaitOn(chan struct{}) - UnWait(chan struct{}) + Report(context.Context) report.Report + WaitOn(context.Context, chan struct{}) + UnWait(context.Context, chan struct{}) } // Adder is something that can accept reports. It's a convenient interface for // parts of the app, and several experimental components. type Adder interface { - Add(report.Report) + Add(context.Context, report.Report) } // A Collector is a Reporter and an Adder @@ -45,13 +46,13 @@ type waitableCondition struct { waiters map[chan struct{}]struct{} } -func (wc *waitableCondition) WaitOn(waiter chan struct{}) { +func (wc *waitableCondition) WaitOn(_ context.Context, waiter chan struct{}) { wc.Lock() wc.waiters[waiter] = struct{}{} wc.Unlock() } -func (wc *waitableCondition) UnWait(waiter chan struct{}) { +func (wc *waitableCondition) UnWait(_ context.Context, waiter chan struct{}) { wc.Lock() delete(wc.waiters, waiter) wc.Unlock() @@ -82,7 +83,7 @@ func NewCollector(window time.Duration) Collector { var now = time.Now // Add adds a report to the collector's internal state. It implements Adder. -func (c *collector) Add(rpt report.Report) { +func (c *collector) Add(_ context.Context, rpt report.Report) { c.mtx.Lock() defer c.mtx.Unlock() c.reports = append(c.reports, timestampReport{now(), rpt}) @@ -95,7 +96,7 @@ func (c *collector) Add(rpt report.Report) { // Report returns a merged report over all added reports. It implements // Reporter. -func (c *collector) Report() report.Report { +func (c *collector) Report(_ context.Context) report.Report { c.mtx.Lock() defer c.mtx.Unlock() diff --git a/app/collector_test.go b/app/collector_test.go index 672e4f433..4e8400cdf 100644 --- a/app/collector_test.go +++ b/app/collector_test.go @@ -4,6 +4,8 @@ import ( "testing" "time" + "golang.org/x/net/context" + "github.com/weaveworks/scope/app" "github.com/weaveworks/scope/report" "github.com/weaveworks/scope/test" @@ -11,6 +13,7 @@ import ( ) func TestCollector(t *testing.T) { + ctx := context.Background() window := time.Millisecond c := app.NewCollector(window) @@ -20,32 +23,33 @@ func TestCollector(t *testing.T) { r2 := report.MakeReport() r2.Endpoint.AddNode("bar", report.MakeNode()) - if want, have := report.MakeReport(), c.Report(); !reflect.DeepEqual(want, have) { + if want, have := report.MakeReport(), c.Report(ctx); !reflect.DeepEqual(want, have) { t.Error(test.Diff(want, have)) } - c.Add(r1) - if want, have := r1, c.Report(); !reflect.DeepEqual(want, have) { + c.Add(ctx, r1) + if want, have := r1, c.Report(ctx); !reflect.DeepEqual(want, have) { t.Error(test.Diff(want, have)) } - c.Add(r2) + c.Add(ctx, r2) merged := report.MakeReport() merged = merged.Merge(r1) merged = merged.Merge(r2) - if want, have := merged, c.Report(); !reflect.DeepEqual(want, have) { + if want, have := merged, c.Report(ctx); !reflect.DeepEqual(want, have) { t.Error(test.Diff(want, have)) } } func TestCollectorWait(t *testing.T) { + ctx := context.Background() window := time.Millisecond c := app.NewCollector(window) waiter := make(chan struct{}, 1) - c.WaitOn(waiter) - defer c.UnWait(waiter) + c.WaitOn(ctx, waiter) + defer c.UnWait(ctx, waiter) c.(interface { Broadcast() }).Broadcast() diff --git a/app/control_router.go b/app/control_router.go new file mode 100644 index 000000000..dd28febec --- /dev/null +++ b/app/control_router.go @@ -0,0 +1,69 @@ +package app + +import ( + "fmt" + "math/rand" + "sync" + + "golang.org/x/net/context" + + "github.com/weaveworks/scope/common/xfer" +) + +// ControlRouter is a thing that can route control requests and responses +// between the UI and a probe. +type ControlRouter interface { + Handle(ctx context.Context, probeID string, req xfer.Request) (xfer.Response, error) + Register(ctx context.Context, probeID string, handler xfer.ControlHandlerFunc) (int64, error) + Deregister(ctx context.Context, probeID string, id int64) error +} + +// NewLocalControlRouter creates a new ControlRouter that does everything +// locally, in memory. +func NewLocalControlRouter() ControlRouter { + return &localControlRouter{ + probes: map[string]probe{}, + } +} + +type localControlRouter struct { + sync.Mutex + probes map[string]probe +} + +type probe struct { + id int64 + handler xfer.ControlHandlerFunc +} + +func (l *localControlRouter) Handle(_ context.Context, probeID string, req xfer.Request) (xfer.Response, error) { + l.Lock() + probe, ok := l.probes[probeID] + l.Unlock() + if !ok { + return xfer.Response{}, fmt.Errorf("Probe %s is not connected right now...", probeID) + } + return probe.handler(req), nil +} + +func (l *localControlRouter) Register(_ context.Context, probeID string, handler xfer.ControlHandlerFunc) (int64, error) { + l.Lock() + defer l.Unlock() + id := rand.Int63() + l.probes[probeID] = probe{ + id: id, + handler: handler, + } + return id, nil +} + +func (l *localControlRouter) Deregister(_ context.Context, probeID string, id int64) error { + l.Lock() + defer l.Unlock() + // NB probe might have reconnected in the mean time, need to ensure we do not + // delete new connection! Also, it might have connected then deleted itself! + if l.probes[probeID].id == id { + delete(l.probes, probeID) + } + return nil +} diff --git a/app/controls.go b/app/controls.go index 0f5c38f6b..791d922c7 100644 --- a/app/controls.go +++ b/app/controls.go @@ -1,122 +1,84 @@ package app import ( - "math/rand" "net/http" "net/rpc" - "sync" log "github.com/Sirupsen/logrus" "github.com/gorilla/mux" + "golang.org/x/net/context" "github.com/weaveworks/scope/common/xfer" ) // RegisterControlRoutes registers the various control routes with a http mux. -func RegisterControlRoutes(router *mux.Router) { - controlRouter := &controlRouter{ - probes: map[string]controlHandler{}, - } - router.Methods("GET").Path("/api/control/ws").HandlerFunc(controlRouter.handleProbeWS) - router.Methods("POST").MatcherFunc(URLMatcher("/api/control/{probeID}/{nodeID}/{control}")).HandlerFunc(controlRouter.handleControl) -} - -type controlHandler struct { - id int64 - client *rpc.Client -} - -type controlRouter struct { - sync.Mutex - probes map[string]controlHandler -} - -func (ch *controlHandler) handle(req xfer.Request) xfer.Response { - var res xfer.Response - if err := ch.client.Call("control.Handle", req, &res); err != nil { - return xfer.ResponseError(err) - } - return res -} - -func (cr *controlRouter) get(probeID string) (controlHandler, bool) { - cr.Lock() - defer cr.Unlock() - handler, ok := cr.probes[probeID] - return handler, ok -} - -func (cr *controlRouter) set(probeID string, handler controlHandler) { - cr.Lock() - defer cr.Unlock() - cr.probes[probeID] = handler -} - -func (cr *controlRouter) rm(probeID string, handler controlHandler) { - cr.Lock() - defer cr.Unlock() - // NB probe might have reconnected in the mean time, need to ensure we do not - // delete new connection! Also, it might have connected then deleted itself! - if cr.probes[probeID].id == handler.id { - delete(cr.probes, probeID) - } +func RegisterControlRoutes(router *mux.Router, cr ControlRouter) { + router.Methods("GET").Path("/api/control/ws"). + HandlerFunc(requestContextDecorator(handleProbeWS(cr))) + router.Methods("POST").MatcherFunc(URLMatcher("/api/control/{probeID}/{nodeID}/{control}")). + HandlerFunc(requestContextDecorator(handleControl(cr))) } // handleControl routes control requests from the client to the appropriate // probe. Its is blocking. -func (cr *controlRouter) handleControl(w http.ResponseWriter, r *http.Request) { - var ( - vars = mux.Vars(r) - probeID = vars["probeID"] - nodeID = vars["nodeID"] - control = vars["control"] - ) - handler, ok := cr.get(probeID) - if !ok { - log.Errorf("Probe %s is not connected right now...", probeID) - http.NotFound(w, r) - return +func handleControl(cr ControlRouter) CtxHandlerFunc { + return func(ctx context.Context, w http.ResponseWriter, r *http.Request) { + var ( + vars = mux.Vars(r) + probeID = vars["probeID"] + nodeID = vars["nodeID"] + control = vars["control"] + ) + result, err := cr.Handle(ctx, probeID, xfer.Request{ + AppID: UniqueID, + NodeID: nodeID, + Control: control, + }) + if err != nil { + respondWith(w, http.StatusBadRequest, err.Error()) + return + } + if result.Error != "" { + respondWith(w, http.StatusBadRequest, result.Error) + return + } + respondWith(w, http.StatusOK, result) } - - result := handler.handle(xfer.Request{ - AppID: UniqueID, - NodeID: nodeID, - Control: control, - }) - if result.Error != "" { - respondWith(w, http.StatusBadRequest, result.Error) - return - } - respondWith(w, http.StatusOK, result) } // handleProbeWS accepts websocket connections from the probe and registers // them in the control router, such that HandleControl calls can find them. -func (cr *controlRouter) handleProbeWS(w http.ResponseWriter, r *http.Request) { - probeID := r.Header.Get(xfer.ScopeProbeIDHeader) - if probeID == "" { - respondWith(w, http.StatusBadRequest, xfer.ScopeProbeIDHeader) - return +func handleProbeWS(cr ControlRouter) CtxHandlerFunc { + return func(ctx context.Context, w http.ResponseWriter, r *http.Request) { + probeID := r.Header.Get(xfer.ScopeProbeIDHeader) + if probeID == "" { + respondWith(w, http.StatusBadRequest, xfer.ScopeProbeIDHeader) + return + } + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Printf("Error upgrading control websocket: %v", err) + return + } + defer conn.Close() + + codec := xfer.NewJSONWebsocketCodec(conn) + client := rpc.NewClientWithCodec(codec) + defer client.Close() + + id, err := cr.Register(ctx, probeID, func(req xfer.Request) xfer.Response { + var res xfer.Response + if err := client.Call("control.Handle", req, &res); err != nil { + return xfer.ResponseError(err) + } + return res + }) + if err != nil { + respondWith(w, http.StatusBadRequest, err.Error()) + return + } + defer cr.Deregister(ctx, probeID, id) + codec.WaitForReadError() } - - conn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - log.Errorf("Error upgrading to websocket: %v", err) - return - } - defer conn.Close() - - codec := xfer.NewJSONWebsocketCodec(conn) - client := rpc.NewClientWithCodec(codec) - handler := controlHandler{ - id: rand.Int63(), - client: client, - } - - cr.set(probeID, handler) - - codec.WaitForReadError() - - cr.rm(probeID, handler) - client.Close() } diff --git a/app/controls_test.go b/app/controls_test.go index ff01023e8..2e1a7c520 100644 --- a/app/controls_test.go +++ b/app/controls_test.go @@ -18,7 +18,7 @@ import ( func TestControl(t *testing.T) { router := mux.NewRouter() - app.RegisterControlRoutes(router) + app.RegisterControlRoutes(router, app.NewLocalControlRouter()) server := httptest.NewServer(router) defer server.Close() diff --git a/app/mock_reporter_test.go b/app/mock_reporter_test.go index 625791dec..b5efe5e5b 100644 --- a/app/mock_reporter_test.go +++ b/app/mock_reporter_test.go @@ -3,12 +3,14 @@ package app_test import ( "github.com/weaveworks/scope/report" "github.com/weaveworks/scope/test/fixture" + + "golang.org/x/net/context" ) // StaticReport is used as a fixture in tests. It emulates an xfer.Collector. type StaticReport struct{} -func (s StaticReport) Report() report.Report { return fixture.Report } -func (s StaticReport) Add(report.Report) {} -func (s StaticReport) WaitOn(chan struct{}) {} -func (s StaticReport) UnWait(chan struct{}) {} +func (s StaticReport) Report(context.Context) report.Report { return fixture.Report } +func (s StaticReport) Add(context.Context, report.Report) {} +func (s StaticReport) WaitOn(context.Context, chan struct{}) {} +func (s StaticReport) UnWait(context.Context, chan struct{}) {} diff --git a/app/pipe_router.go b/app/pipe_router.go new file mode 100644 index 000000000..5bdfdf9c4 --- /dev/null +++ b/app/pipe_router.go @@ -0,0 +1,180 @@ +package app + +import ( + "io" + "sync" + "time" + + log "github.com/Sirupsen/logrus" + "golang.org/x/net/context" + + "github.com/weaveworks/scope/common/mtime" + "github.com/weaveworks/scope/common/xfer" +) + +const ( + gcInterval = 30 * time.Second // we check all the pipes every 30s + pipeTimeout = 1 * time.Minute // pipes are closed when a client hasn't been connected for 1 minute + gcTimeout = 10 * time.Minute // after another 10 minutes, tombstoned pipes are forgotten +) + +// End is an enum for either end of the pipe. +type End int + +// Valid values of type End +const ( + UIEnd = iota + ProbeEnd +) + +// PipeRouter stores pipes and allows you to connect to either end of them. +type PipeRouter interface { + Get(context.Context, string, End) (xfer.Pipe, io.ReadWriter, bool) + Release(context.Context, string, End) + Delete(context.Context, string) + Stop() +} + +// PipeRouter connects incoming and outgoing pipes. +type localPipeRouter struct { + sync.Mutex + wait sync.WaitGroup + quit chan struct{} + pipes map[string]*pipe +} + +// for each end of the pipe, we keep a reference count & lastUsedTIme, +// such that we can timeout pipes when either end is inactive. +type pipe struct { + xfer.Pipe + + tombstoneTime time.Time + + ui, probe end +} + +type end struct { + refCount int + lastUsedTime time.Time +} + +func (p *pipe) end(end End) (*end, io.ReadWriter) { + ui, probe := p.Ends() + if end == UIEnd { + return &p.ui, ui + } + return &p.probe, probe +} + +// NewLocalPipeRouter returns a new local (in-memory) pipe router. +func NewLocalPipeRouter() PipeRouter { + pipeRouter := &localPipeRouter{ + quit: make(chan struct{}), + pipes: map[string]*pipe{}, + } + pipeRouter.wait.Add(1) + go pipeRouter.gcLoop() + return pipeRouter +} + +func (pr *localPipeRouter) Get(_ context.Context, id string, e End) (xfer.Pipe, io.ReadWriter, bool) { + pr.Lock() + defer pr.Unlock() + p, ok := pr.pipes[id] + if !ok { + log.Infof("Creating pipe id %s", id) + p = &pipe{ + ui: end{lastUsedTime: mtime.Now()}, + probe: end{lastUsedTime: mtime.Now()}, + Pipe: xfer.NewPipe(), + } + pr.pipes[id] = p + } + if p.Closed() { + return nil, nil, false + } + end, endIO := p.end(e) + end.refCount++ + return p, endIO, true +} + +func (pr *localPipeRouter) Release(_ context.Context, id string, e End) { + pr.Lock() + defer pr.Unlock() + + p, ok := pr.pipes[id] + if !ok { + // uh oh + return + } + + end, _ := p.end(e) + end.refCount-- + if end.refCount > 0 { + return + } + + if !p.Closed() { + end.lastUsedTime = mtime.Now() + } +} + +func (pr *localPipeRouter) Delete(_ context.Context, id string) { + pr.Lock() + defer pr.Unlock() + p, ok := pr.pipes[id] + if !ok { + return + } + p.Close() + p.tombstoneTime = mtime.Now() +} + +func (pr *localPipeRouter) Stop() { + close(pr.quit) + pr.wait.Wait() +} + +func (pr *localPipeRouter) gcLoop() { + defer pr.wait.Done() + ticker := time.Tick(gcInterval) + for { + select { + case <-pr.quit: + return + case <-ticker: + } + + pr.timeout() + pr.garbageCollect() + } +} + +func (pr *localPipeRouter) timeout() { + pr.Lock() + defer pr.Unlock() + now := mtime.Now() + for id, pipe := range pr.pipes { + if pipe.Closed() || (pipe.ui.refCount > 0 && pipe.probe.refCount > 0) { + continue + } + + if (pipe.ui.refCount == 0 && now.Sub(pipe.ui.lastUsedTime) >= pipeTimeout) || + (pipe.probe.refCount == 0 && now.Sub(pipe.probe.lastUsedTime) >= pipeTimeout) { + log.Infof("Timing out pipe %s", id) + pipe.Close() + pipe.tombstoneTime = now + } + } +} + +func (pr *localPipeRouter) garbageCollect() { + pr.Lock() + defer pr.Unlock() + now := mtime.Now() + for pipeID, pipe := range pr.pipes { + if pipe.Closed() && now.Sub(pipe.tombstoneTime) >= gcTimeout { + delete(pr.pipes, pipeID) + } + } +} diff --git a/app/pipes.go b/app/pipes.go index bed35309b..9f5cb71cc 100644 --- a/app/pipes.go +++ b/app/pipes.go @@ -1,199 +1,54 @@ package app import ( - "io" "net/http" - "sync" - "time" log "github.com/Sirupsen/logrus" "github.com/gorilla/mux" - - "github.com/weaveworks/scope/common/mtime" - "github.com/weaveworks/scope/common/xfer" + "golang.org/x/net/context" ) -const ( - gcInterval = 30 * time.Second // we check all the pipes every 30s - pipeTimeout = 1 * time.Minute // pipes are closed when a client hasn't been connected for 1 minute - gcTimeout = 10 * time.Minute // after another 10 minutes, tombstoned pipes are forgotten -) - -// PipeRouter connects incoming and outgoing pipes. -type PipeRouter struct { - sync.Mutex - wait sync.WaitGroup - quit chan struct{} - pipes map[string]*pipe -} - -// for each end of the pipe, we keep a reference count & lastUsedTIme, -// such that we can timeout pipes when either end is inactive. -type end struct { - refCount int - lastUsedTime time.Time -} - -type pipe struct { - ui, probe end - tombstoneTime time.Time - - xfer.Pipe -} - // RegisterPipeRoutes registers the pipe routes -func RegisterPipeRoutes(router *mux.Router) *PipeRouter { - pipeRouter := &PipeRouter{ - quit: make(chan struct{}), - pipes: map[string]*pipe{}, - } - pipeRouter.wait.Add(1) - go pipeRouter.gcLoop() +func RegisterPipeRoutes(router *mux.Router, pr PipeRouter) { router.Methods("GET"). Path("/api/pipe/{pipeID}"). - HandlerFunc(pipeRouter.handleWs(func(p *pipe) (*end, io.ReadWriter) { - uiEnd, _ := p.Ends() - return &p.ui, uiEnd - })) + HandlerFunc(requestContextDecorator(handlePipeWs(pr, UIEnd))) + router.Methods("GET"). Path("/api/pipe/{pipeID}/probe"). - HandlerFunc(pipeRouter.handleWs(func(p *pipe) (*end, io.ReadWriter) { - _, probeEnd := p.Ends() - return &p.probe, probeEnd - })) + HandlerFunc(requestContextDecorator(handlePipeWs(pr, ProbeEnd))) + router.Methods("DELETE", "POST"). Path("/api/pipe/{pipeID}"). - HandlerFunc(pipeRouter.delete) - return pipeRouter + HandlerFunc(requestContextDecorator(deletePipe(pr))) } -// Stop stops the pipeRouter -func (pr *PipeRouter) Stop() { - close(pr.quit) - pr.wait.Wait() -} - -func (pr *PipeRouter) gcLoop() { - defer pr.wait.Done() - ticker := time.Tick(gcInterval) - for { - select { - case <-pr.quit: - return - case <-ticker: - } - - pr.timeout() - pr.garbageCollect() - } -} - -func (pr *PipeRouter) timeout() { - pr.Lock() - defer pr.Unlock() - now := mtime.Now() - for id, pipe := range pr.pipes { - if pipe.Closed() || (pipe.ui.refCount > 0 && pipe.probe.refCount > 0) { - continue - } - - if (pipe.ui.refCount == 0 && now.Sub(pipe.ui.lastUsedTime) >= pipeTimeout) || - (pipe.probe.refCount == 0 && now.Sub(pipe.probe.lastUsedTime) >= pipeTimeout) { - log.Infof("Timing out pipe %s", id) - pipe.Close() - pipe.tombstoneTime = now - } - } -} - -func (pr *PipeRouter) garbageCollect() { - pr.Lock() - defer pr.Unlock() - now := mtime.Now() - for pipeID, pipe := range pr.pipes { - if pipe.Closed() && now.Sub(pipe.tombstoneTime) >= gcTimeout { - delete(pr.pipes, pipeID) - } - } -} - -func (pr *PipeRouter) getOrCreate(id string) (*pipe, bool) { - pr.Lock() - defer pr.Unlock() - p, ok := pr.pipes[id] - if !ok { - log.Infof("Creating pipe id %s", id) - p = &pipe{ - ui: end{lastUsedTime: mtime.Now()}, - probe: end{lastUsedTime: mtime.Now()}, - Pipe: xfer.NewPipe(), - } - pr.pipes[id] = p - } - if p.Closed() { - return nil, false - } - return p, true -} - -func (pr *PipeRouter) retain(id string, pipe *pipe, end *end) bool { - pr.Lock() - defer pr.Unlock() - if pipe.Closed() { - return false - } - end.refCount++ - return true -} - -func (pr *PipeRouter) release(id string, pipe *pipe, end *end) { - pr.Lock() - defer pr.Unlock() - - end.refCount-- - if end.refCount != 0 { - return - } - - if !pipe.Closed() { - end.lastUsedTime = mtime.Now() - } -} - -func (pr *PipeRouter) handleWs(endSelector func(*pipe) (*end, io.ReadWriter)) func(http.ResponseWriter, *http.Request) { - return func(w http.ResponseWriter, r *http.Request) { - pipeID := mux.Vars(r)["pipeID"] - pipe, ok := pr.getOrCreate(pipeID) +func handlePipeWs(pr PipeRouter, end End) CtxHandlerFunc { + return func(ctx context.Context, w http.ResponseWriter, r *http.Request) { + id := mux.Vars(r)["pipeID"] + pipe, endIO, ok := pr.Get(ctx, id, end) if !ok { http.NotFound(w, r) return } - - endRef, endIO := endSelector(pipe) - if !pr.retain(pipeID, pipe, endRef) { - http.NotFound(w, r) - return - } - defer pr.release(pipeID, pipe, endRef) + defer pr.Release(ctx, id, end) conn, err := upgrader.Upgrade(w, r, nil) if err != nil { - log.Errorf("Error upgrading to websocket: %v", err) + log.Errorf("Error upgrading pipe %s (%d) websocket: %v", id, end, err) return } defer conn.Close() + log.Infof("Pipe success %s (%d)", id, end) pipe.CopyToWebsocket(endIO, conn) } } -func (pr *PipeRouter) delete(w http.ResponseWriter, r *http.Request) { - pipeID := mux.Vars(r)["pipeID"] - pipe, ok := pr.getOrCreate(pipeID) - if ok && pr.retain(pipeID, pipe, &pipe.ui) { +func deletePipe(pr PipeRouter) CtxHandlerFunc { + return func(ctx context.Context, w http.ResponseWriter, r *http.Request) { + pipeID := mux.Vars(r)["pipeID"] log.Infof("Closing pipe %s", pipeID) - pipe.Close() - pipe.tombstoneTime = mtime.Now() - pr.release(pipeID, pipe, &pipe.ui) + pr.Delete(ctx, pipeID) } } diff --git a/app/pipes_internal_test.go b/app/pipes_internal_test.go index 69615c5ac..1a6643933 100644 --- a/app/pipes_internal_test.go +++ b/app/pipes_internal_test.go @@ -12,6 +12,7 @@ import ( "github.com/gorilla/mux" "github.com/gorilla/websocket" + "golang.org/x/net/context" "github.com/weaveworks/scope/common/mtime" "github.com/weaveworks/scope/common/xfer" @@ -22,7 +23,8 @@ import ( func TestPipeTimeout(t *testing.T) { router := mux.NewRouter() - pr := RegisterPipeRoutes(router) + pr := NewLocalPipeRouter().(*localPipeRouter) + RegisterPipeRoutes(router, pr) pr.Stop() // we don't want the loop running in the background mtime.NowForce(time.Now()) @@ -30,7 +32,8 @@ func TestPipeTimeout(t *testing.T) { // create a new pipe. id := "foo" - pipe, ok := pr.getOrCreate(id) + ctx := context.Background() + pipe, _, ok := pr.Get(ctx, id, UIEnd) if !ok { t.Fatalf("not ok") } @@ -65,7 +68,8 @@ func (a adapter) PipeClose(_, pipeID string) error { func TestPipeClose(t *testing.T) { router := mux.NewRouter() - pr := RegisterPipeRoutes(router) + pr := NewLocalPipeRouter() + RegisterPipeRoutes(router, pr) defer pr.Stop() server := httptest.NewServer(router) diff --git a/app/router.go b/app/router.go index e8c25abff..a414950fd 100644 --- a/app/router.go +++ b/app/router.go @@ -10,6 +10,7 @@ import ( "github.com/PuerkitoBio/ghost/handlers" "github.com/gorilla/mux" "github.com/ugorji/go/codec" + "golang.org/x/net/context" "github.com/weaveworks/scope/common/hostname" "github.com/weaveworks/scope/common/xfer" @@ -24,6 +25,19 @@ var ( UniqueID = "0" ) +// RequestCtxKey is key used for request entry in context +const RequestCtxKey = "request" + +// CtxHandlerFunc is a http.HandlerFunc, with added contexts +type CtxHandlerFunc func(context.Context, http.ResponseWriter, *http.Request) + +func requestContextDecorator(f CtxHandlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + ctx := context.WithValue(context.Background(), RequestCtxKey, r) + f(ctx, w, r) + } +} + // URLMatcher uses request.RequestURI (the raw, unparsed request) to attempt // to match pattern. It does this as go's URL.Parse method is broken, and // mistakenly unescapes the Path before parsing it. This breaks %2F (encoded @@ -76,13 +90,13 @@ func gzipHandler(h http.HandlerFunc) http.HandlerFunc { // routes should be added to a router and passed to postRoutes. func TopologyHandler(c Reporter, preRoutes *mux.Router, postRoutes http.Handler) http.Handler { get := preRoutes.Methods("GET").Subrouter() - get.HandleFunc("/api", gzipHandler(apiHandler)) - get.HandleFunc("/api/topology", gzipHandler(topologyRegistry.makeTopologyList(c))) + get.HandleFunc("/api", gzipHandler(requestContextDecorator(apiHandler))) + get.HandleFunc("/api/topology", gzipHandler(requestContextDecorator(topologyRegistry.makeTopologyList(c)))) get.HandleFunc("/api/topology/{topology}", - gzipHandler(topologyRegistry.captureRenderer(c, handleTopology))) + gzipHandler(requestContextDecorator(topologyRegistry.captureRenderer(c, handleTopology)))) get.HandleFunc("/api/topology/{topology}/ws", - topologyRegistry.captureRenderer(c, handleWs)) // NB not gzip! - get.HandleFunc("/api/report", gzipHandler(makeRawReportHandler(c))) + requestContextDecorator(topologyRegistry.captureRenderer(c, handleWs))) // NB not gzip! + get.HandleFunc("/api/report", gzipHandler(requestContextDecorator(makeRawReportHandler(c)))) // We pull in the http.DefaultServeMux to get the pprof routes preRoutes.PathPrefix("/debug/pprof").Handler(http.DefaultServeMux) @@ -107,9 +121,9 @@ func TopologyHandler(c Reporter, preRoutes *mux.Router, postRoutes http.Handler) return } - handler := gzipHandler(topologyRegistry.captureRendererWithoutFilters( + handler := gzipHandler(requestContextDecorator(topologyRegistry.captureRendererWithoutFilters( c, topologyID, handleNode(nodeID), - )) + ))) handler.ServeHTTP(w, r) }) } @@ -117,7 +131,7 @@ func TopologyHandler(c Reporter, preRoutes *mux.Router, postRoutes http.Handler) // RegisterReportPostHandler registers the handler for report submission func RegisterReportPostHandler(a Adder, router *mux.Router) { post := router.Methods("POST").Subrouter() - post.HandleFunc("/api/report", func(w http.ResponseWriter, r *http.Request) { + post.HandleFunc("/api/report", requestContextDecorator(func(ctx context.Context, w http.ResponseWriter, r *http.Request) { var ( rpt report.Report reader = r.Body @@ -142,15 +156,15 @@ func RegisterReportPostHandler(a Adder, router *mux.Router) { http.Error(w, err.Error(), http.StatusBadRequest) return } - a.Add(rpt) + a.Add(ctx, rpt) if len(rpt.Pod.Nodes) > 0 { topologyRegistry.enableKubernetesTopologies() } w.WriteHeader(http.StatusOK) - }) + })) } -func apiHandler(w http.ResponseWriter, r *http.Request) { +func apiHandler(_ context.Context, w http.ResponseWriter, r *http.Request) { respondWith(w, http.StatusOK, xfer.Details{ ID: UniqueID, Version: Version, diff --git a/app/router_test.go b/app/router_test.go index 6a369497a..cc80b522c 100644 --- a/app/router_test.go +++ b/app/router_test.go @@ -12,6 +12,7 @@ import ( "github.com/gorilla/mux" "github.com/ugorji/go/codec" + "golang.org/x/net/context" "github.com/weaveworks/scope/app" "github.com/weaveworks/scope/test" @@ -73,7 +74,8 @@ func TestReportPostHandler(t *testing.T) { t.Fatalf("Error posting report: %d", resp.StatusCode) } - if want, have := fixture.Report.Endpoint.Nodes, c.Report().Endpoint.Nodes; len(have) == 0 || len(want) != len(have) { + ctx := context.Background() + if want, have := fixture.Report.Endpoint.Nodes, c.Report(ctx).Endpoint.Nodes; len(have) == 0 || len(want) != len(have) { t.Fatalf("Content-Type %s: %v", contentType, test.Diff(have, want)) } } diff --git a/prog/app.go b/prog/app.go index a53ec62f0..fc71d7657 100644 --- a/prog/app.go +++ b/prog/app.go @@ -23,8 +23,8 @@ import ( func router(c app.Collector) http.Handler { router := mux.NewRouter() app.RegisterReportPostHandler(c, router) - app.RegisterControlRoutes(router) - app.RegisterPipeRoutes(router) + app.RegisterControlRoutes(router, app.NewLocalControlRouter()) + app.RegisterPipeRoutes(router, app.NewLocalPipeRouter()) return app.TopologyHandler(c, router, http.FileServer(FS(false))) }