mirror of
https://github.com/weaveworks/scope.git
synced 2026-03-03 18:20:27 +00:00
Merge pull request #1172 from weaveworks/ws-pipe-fixes
Various websocket and pipe fixes.
This commit is contained in:
@@ -28,8 +28,16 @@ const (
|
||||
ProbeEnd
|
||||
)
|
||||
|
||||
func (e End) String() string {
|
||||
if e == UIEnd {
|
||||
return "ui"
|
||||
}
|
||||
return "probe"
|
||||
}
|
||||
|
||||
// PipeRouter stores pipes and allows you to connect to either end of them.
|
||||
type PipeRouter interface {
|
||||
Exists(context.Context, string) (bool, error)
|
||||
Get(context.Context, string, End) (xfer.Pipe, io.ReadWriter, error)
|
||||
Release(context.Context, string, End) error
|
||||
Delete(context.Context, string) error
|
||||
@@ -78,6 +86,16 @@ func NewLocalPipeRouter() PipeRouter {
|
||||
return pipeRouter
|
||||
}
|
||||
|
||||
func (pr *localPipeRouter) Exists(_ context.Context, id string) (bool, error) {
|
||||
pr.Lock()
|
||||
defer pr.Unlock()
|
||||
p, ok := pr.pipes[id]
|
||||
if !ok {
|
||||
return true, nil
|
||||
}
|
||||
return !p.Closed(), nil
|
||||
}
|
||||
|
||||
func (pr *localPipeRouter) Get(_ context.Context, id string, e End) (xfer.Pipe, io.ReadWriter, error) {
|
||||
pr.Lock()
|
||||
defer pr.Unlock()
|
||||
|
||||
12
app/pipes.go
12
app/pipes.go
@@ -32,12 +32,14 @@ func RegisterPipeRoutes(router *mux.Router, pr PipeRouter) {
|
||||
func checkPipe(pr PipeRouter, end End) CtxHandlerFunc {
|
||||
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
|
||||
id := mux.Vars(r)["pipeID"]
|
||||
_, _, err := pr.Get(ctx, id, end)
|
||||
exists, err := pr.Exists(ctx, id)
|
||||
if err != nil {
|
||||
respondWith(w, http.StatusInternalServerError, err.Error())
|
||||
} else if exists {
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return
|
||||
} else {
|
||||
http.NotFound(w, r)
|
||||
}
|
||||
pr.Release(ctx, id, end)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -59,7 +61,7 @@ func handlePipeWs(pr PipeRouter, end End) CtxHandlerFunc {
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
log.Infof("Pipe success %s (%d)", id, end)
|
||||
log.Infof("Success got pipe %s:%s", id, end)
|
||||
if err := pipe.CopyToWebsocket(endIO, conn); err != nil && !xfer.IsExpectedWSCloseError(err) {
|
||||
log.Printf("Error copying to pipe %s (%d) websocket: %v", id, end, err)
|
||||
}
|
||||
@@ -69,7 +71,7 @@ func handlePipeWs(pr PipeRouter, end End) CtxHandlerFunc {
|
||||
func deletePipe(pr PipeRouter) CtxHandlerFunc {
|
||||
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
|
||||
pipeID := mux.Vars(r)["pipeID"]
|
||||
log.Infof("Closing pipe %s", pipeID)
|
||||
log.Infof("Deleting pipe %s", pipeID)
|
||||
if err := pr.Delete(ctx, pipeID); err != nil {
|
||||
respondWith(w, http.StatusInternalServerError, err.Error())
|
||||
}
|
||||
|
||||
@@ -225,8 +225,8 @@ export function getPipeStatus(pipeId) {
|
||||
},
|
||||
success: function(res) {
|
||||
const status = {
|
||||
200: 'PIPE_ALIVE',
|
||||
204: 'PIPE_DELETED'
|
||||
204: 'PIPE_ALIVE',
|
||||
404: 'PIPE_DELETED'
|
||||
}[res.status];
|
||||
|
||||
if (!status) {
|
||||
|
||||
@@ -71,7 +71,7 @@ type WSDialer interface {
|
||||
func DialWS(d WSDialer, urlStr string, requestHeader http.Header) (Websocket, *http.Response, error) {
|
||||
wsConn, resp, err := d.Dial(urlStr, requestHeader)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, resp, err
|
||||
}
|
||||
return Ping(wsConn), resp, nil
|
||||
}
|
||||
@@ -91,6 +91,7 @@ func (p *pingingWebsocket) ping() {
|
||||
if err := p.conn.WriteControl(websocket.PingMessage, nil, mtime.Now().Add(writeWait)); err != nil {
|
||||
log.Errorf("websocket ping error: %v", err)
|
||||
p.Close()
|
||||
return
|
||||
}
|
||||
p.pinger.Reset(pingPeriod)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user