mirror of
https://github.com/weaveworks/scope.git
synced 2026-02-14 18:09:59 +00:00
Backend review feedback
This commit is contained in:
@@ -24,7 +24,6 @@ func RegisterControlRoutes(router *mux.Router) {
|
||||
type controlHandler struct {
|
||||
id int64
|
||||
client *rpc.Client
|
||||
codec *xfer.JSONWebsocketCodec
|
||||
}
|
||||
|
||||
type controlRouter struct {
|
||||
@@ -111,7 +110,6 @@ func (cr *controlRouter) handleProbeWS(w http.ResponseWriter, r *http.Request) {
|
||||
client := rpc.NewClientWithCodec(codec)
|
||||
handler := controlHandler{
|
||||
id: rand.Int63(),
|
||||
codec: codec,
|
||||
client: client,
|
||||
}
|
||||
|
||||
|
||||
43
app/pipes.go
43
app/pipes.go
@@ -16,7 +16,7 @@ import (
|
||||
const (
|
||||
gcInterval = 30 * time.Second // we check all the pipes every 30s
|
||||
pipeTimeout = 1 * time.Minute // pipes are closed when a client hasn't been connected for 1 minute
|
||||
gcTimeout = 1 * time.Minute // after another 1 minute, tombstoned pipes are forgotten
|
||||
gcTimeout = 10 * time.Minute // after another 10 minutes, tombstoned pipes are forgotten
|
||||
)
|
||||
|
||||
// PipeRouter connects incoming and outgoing pipes.
|
||||
@@ -49,16 +49,21 @@ func RegisterPipeRoutes(router *mux.Router) *PipeRouter {
|
||||
}
|
||||
pipeRouter.wait.Add(1)
|
||||
go pipeRouter.gcLoop()
|
||||
router.Methods("GET").Path("/api/pipe/{pipeID}").HandlerFunc(pipeRouter.handleWs(func(p *pipe) (*end, io.ReadWriter) {
|
||||
router.Methods("GET").
|
||||
Path("/api/pipe/{pipeID}").
|
||||
HandlerFunc(pipeRouter.handleWs(func(p *pipe) (*end, io.ReadWriter) {
|
||||
uiEnd, _ := p.Ends()
|
||||
return &p.ui, uiEnd
|
||||
}))
|
||||
router.Methods("GET").Path("/api/pipe/{pipeID}/probe").HandlerFunc(pipeRouter.handleWs(func(p *pipe) (*end, io.ReadWriter) {
|
||||
router.Methods("GET").
|
||||
Path("/api/pipe/{pipeID}/probe").
|
||||
HandlerFunc(pipeRouter.handleWs(func(p *pipe) (*end, io.ReadWriter) {
|
||||
_, probeEnd := p.Ends()
|
||||
return &p.probe, probeEnd
|
||||
}))
|
||||
router.Methods("DELETE").Path("/api/pipe/{pipeID}").HandlerFunc(pipeRouter.deletePipe)
|
||||
router.Methods("POST").Path("/api/pipe/{pipeID}").HandlerFunc(pipeRouter.deletePipe)
|
||||
router.Methods("DELETE", "POST").
|
||||
Path("/api/pipe/{pipeID}").
|
||||
HandlerFunc(pipeRouter.delete)
|
||||
return pipeRouter
|
||||
}
|
||||
|
||||
@@ -78,12 +83,12 @@ func (pr *PipeRouter) gcLoop() {
|
||||
case <-ticker:
|
||||
}
|
||||
|
||||
pr.timeoutPipes()
|
||||
pr.gcPipes()
|
||||
pr.timeout()
|
||||
pr.garbageCollect()
|
||||
}
|
||||
}
|
||||
|
||||
func (pr *PipeRouter) timeoutPipes() {
|
||||
func (pr *PipeRouter) timeout() {
|
||||
pr.Lock()
|
||||
defer pr.Unlock()
|
||||
now := mtime.Now()
|
||||
@@ -101,7 +106,7 @@ func (pr *PipeRouter) timeoutPipes() {
|
||||
}
|
||||
}
|
||||
|
||||
func (pr *PipeRouter) gcPipes() {
|
||||
func (pr *PipeRouter) garbageCollect() {
|
||||
pr.Lock()
|
||||
defer pr.Unlock()
|
||||
now := mtime.Now()
|
||||
@@ -112,7 +117,7 @@ func (pr *PipeRouter) gcPipes() {
|
||||
}
|
||||
}
|
||||
|
||||
func (pr *PipeRouter) getOrCreatePipe(id string) (*pipe, bool) {
|
||||
func (pr *PipeRouter) getOrCreate(id string) (*pipe, bool) {
|
||||
pr.Lock()
|
||||
defer pr.Unlock()
|
||||
p, ok := pr.pipes[id]
|
||||
@@ -131,7 +136,7 @@ func (pr *PipeRouter) getOrCreatePipe(id string) (*pipe, bool) {
|
||||
return p, true
|
||||
}
|
||||
|
||||
func (pr *PipeRouter) getPipeRef(id string, pipe *pipe, end *end) bool {
|
||||
func (pr *PipeRouter) retain(id string, pipe *pipe, end *end) bool {
|
||||
pr.Lock()
|
||||
defer pr.Unlock()
|
||||
if pipe.Closed() {
|
||||
@@ -141,7 +146,7 @@ func (pr *PipeRouter) getPipeRef(id string, pipe *pipe, end *end) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (pr *PipeRouter) putPipeRef(id string, pipe *pipe, end *end) {
|
||||
func (pr *PipeRouter) release(id string, pipe *pipe, end *end) {
|
||||
pr.Lock()
|
||||
defer pr.Unlock()
|
||||
|
||||
@@ -158,18 +163,18 @@ func (pr *PipeRouter) putPipeRef(id string, pipe *pipe, end *end) {
|
||||
func (pr *PipeRouter) handleWs(endSelector func(*pipe) (*end, io.ReadWriter)) func(http.ResponseWriter, *http.Request) {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
pipeID := mux.Vars(r)["pipeID"]
|
||||
pipe, ok := pr.getOrCreatePipe(pipeID)
|
||||
pipe, ok := pr.getOrCreate(pipeID)
|
||||
if !ok {
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
endRef, endIO := endSelector(pipe)
|
||||
if !pr.getPipeRef(pipeID, pipe, endRef) {
|
||||
if !pr.retain(pipeID, pipe, endRef) {
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
defer pr.putPipeRef(pipeID, pipe, endRef)
|
||||
defer pr.release(pipeID, pipe, endRef)
|
||||
|
||||
conn, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
@@ -182,13 +187,13 @@ func (pr *PipeRouter) handleWs(endSelector func(*pipe) (*end, io.ReadWriter)) fu
|
||||
}
|
||||
}
|
||||
|
||||
func (pr *PipeRouter) deletePipe(w http.ResponseWriter, r *http.Request) {
|
||||
func (pr *PipeRouter) delete(w http.ResponseWriter, r *http.Request) {
|
||||
pipeID := mux.Vars(r)["pipeID"]
|
||||
pipe, ok := pr.getOrCreatePipe(pipeID)
|
||||
if ok && pr.getPipeRef(pipeID, pipe, &pipe.ui) {
|
||||
pipe, ok := pr.getOrCreate(pipeID)
|
||||
if ok && pr.retain(pipeID, pipe, &pipe.ui) {
|
||||
log.Printf("Closing pipe %s", pipeID)
|
||||
pipe.Close()
|
||||
pipe.tombstoneTime = mtime.Now()
|
||||
pr.putPipeRef(pipeID, pipe, &pipe.ui)
|
||||
pr.release(pipeID, pipe, &pipe.ui)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,21 +29,21 @@ func TestPipeTimeout(t *testing.T) {
|
||||
|
||||
// create a new pipe.
|
||||
id := "foo"
|
||||
pipe, ok := pr.getOrCreatePipe(id)
|
||||
pipe, ok := pr.getOrCreate(id)
|
||||
if !ok {
|
||||
t.Fatalf("not ok")
|
||||
}
|
||||
|
||||
// move time forward such that the new pipe should timeout
|
||||
mtime.NowForce(mtime.Now().Add(pipeTimeout))
|
||||
pr.timeoutPipes()
|
||||
pr.timeout()
|
||||
if !pipe.Closed() {
|
||||
t.Fatalf("pipe didn't timeout")
|
||||
}
|
||||
|
||||
// move time forward such that the pipe should be GCd
|
||||
mtime.NowForce(mtime.Now().Add(gcTimeout))
|
||||
pr.gcPipes()
|
||||
pr.garbageCollect()
|
||||
if _, ok := pr.pipes[id]; ok {
|
||||
t.Fatalf("pipe not gc'd")
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@ func HandleControlRequest(req xfer.Request) xfer.Response {
|
||||
handler, ok := handlers[req.Control]
|
||||
mtx.Unlock()
|
||||
if !ok {
|
||||
return xfer.ResponseErrorf("Control '%s' not recognised", req.Control)
|
||||
return xfer.ResponseErrorf("Control %q not recognised", req.Control)
|
||||
}
|
||||
|
||||
return handler(req)
|
||||
|
||||
@@ -30,7 +30,7 @@ func TestControls(t *testing.T) {
|
||||
|
||||
func TestControlsNotFound(t *testing.T) {
|
||||
want := xfer.Response{
|
||||
Error: "Control 'baz' not recognised",
|
||||
Error: "Control \"baz\" not recognised",
|
||||
}
|
||||
have := controls.HandleControlRequest(xfer.Request{
|
||||
Control: "baz",
|
||||
|
||||
@@ -13,19 +13,15 @@ var Client interface {
|
||||
PipeClose(string, string) error
|
||||
}
|
||||
|
||||
// Pipe the probe-local type for a pipe, extending
|
||||
// pipe is the probe-local type for a pipe, extending
|
||||
// xfer.Pipe with the appID and a custom closer method.
|
||||
type Pipe interface {
|
||||
xfer.Pipe
|
||||
}
|
||||
|
||||
type pipe struct {
|
||||
xfer.Pipe
|
||||
id, appID string
|
||||
}
|
||||
|
||||
// NewPipe creats a new pipe and connects it to the app.
|
||||
var NewPipe = func(appID string) (string, Pipe, error) {
|
||||
var NewPipe = func(appID string) (string, xfer.Pipe, error) {
|
||||
pipeID := fmt.Sprintf("pipe-%d", rand.Int63())
|
||||
pipe := &pipe{
|
||||
Pipe: xfer.NewPipe(),
|
||||
|
||||
@@ -52,7 +52,7 @@ func (mockPipe) OnClose(func()) {}
|
||||
func TestPipes(t *testing.T) {
|
||||
oldNewPipe := controls.NewPipe
|
||||
defer func() { controls.NewPipe = oldNewPipe }()
|
||||
controls.NewPipe = func(_ string) (string, controls.Pipe, error) {
|
||||
controls.NewPipe = func(_ string) (string, xfer.Pipe, error) {
|
||||
return "pipeid", mockPipe{}, nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user