mirror of
https://github.com/weaveworks/scope.git
synced 2026-03-02 17:50:39 +00:00
175 lines
4.1 KiB
Go
175 lines
4.1 KiB
Go
package xfer
|
|
|
|
import (
|
|
"fmt"
|
|
"net/rpc"
|
|
"sync"
|
|
)
|
|
|
|
// ErrInvalidMessage is the error returned when the on-wire message is unexpected.
|
|
var ErrInvalidMessage = fmt.Errorf("Invalid Message")
|
|
|
|
// Request is the UI -> App -> Probe message type for control RPCs
|
|
type Request struct {
|
|
AppID string
|
|
NodeID string
|
|
Control string
|
|
}
|
|
|
|
// Response is the Probe -> App -> UI message type for the control RPCs.
|
|
type Response struct {
|
|
Value interface{} `json:"value,omitempty"`
|
|
Error string `json:"error,omitempty"`
|
|
Pipe string `json:"pipe,omitempty"`
|
|
RawTTY bool `json:"raw_tty,omitempty"`
|
|
}
|
|
|
|
// Message is the unions of Request, Response and arbitrary Value.
|
|
type Message struct {
|
|
Request *rpc.Request
|
|
Response *rpc.Response
|
|
Value interface{}
|
|
}
|
|
|
|
// ControlHandler is interface used in the app and the probe to represent
|
|
// a control RPC.
|
|
type ControlHandler interface {
|
|
Handle(req Request, res *Response) error
|
|
}
|
|
|
|
// ControlHandlerFunc is a adapter (ala golang's http RequestHandlerFunc)
|
|
// for ControlHandler
|
|
type ControlHandlerFunc func(Request) Response
|
|
|
|
// Handle is an adapter method to make ControlHandlers exposable via golang rpc
|
|
func (c ControlHandlerFunc) Handle(req Request, res *Response) error {
|
|
*res = c(req)
|
|
return nil
|
|
}
|
|
|
|
// ResponseErrorf creates a new Response with the given formatted error string.
|
|
func ResponseErrorf(format string, a ...interface{}) Response {
|
|
return Response{
|
|
Error: fmt.Sprintf(format, a...),
|
|
}
|
|
}
|
|
|
|
// ResponseError creates a new Response with the given error.
|
|
func ResponseError(err error) Response {
|
|
if err != nil {
|
|
return Response{
|
|
Error: err.Error(),
|
|
}
|
|
}
|
|
return Response{}
|
|
}
|
|
|
|
// JSONWebsocketCodec is golang rpc compatible Server and Client Codec
|
|
// that transmits and receives RPC messages over a websocker, as JSON.
|
|
type JSONWebsocketCodec struct {
|
|
sync.Mutex
|
|
conn Websocket
|
|
err chan error
|
|
}
|
|
|
|
// NewJSONWebsocketCodec makes a new JSONWebsocketCodec
|
|
func NewJSONWebsocketCodec(conn Websocket) *JSONWebsocketCodec {
|
|
return &JSONWebsocketCodec{
|
|
conn: conn,
|
|
err: make(chan error, 1),
|
|
}
|
|
}
|
|
|
|
// WaitForReadError blocks until any read on this codec returns an error.
|
|
// This is useful to know when the server has disconnected from the client.
|
|
func (j *JSONWebsocketCodec) WaitForReadError() error {
|
|
return <-j.err
|
|
}
|
|
|
|
// WriteRequest implements rpc.ClientCodec
|
|
func (j *JSONWebsocketCodec) WriteRequest(r *rpc.Request, v interface{}) error {
|
|
j.Lock()
|
|
defer j.Unlock()
|
|
|
|
if err := j.conn.WriteJSON(Message{Request: r}); err != nil {
|
|
return err
|
|
}
|
|
return j.conn.WriteJSON(Message{Value: v})
|
|
}
|
|
|
|
// WriteResponse implements rpc.ServerCodec
|
|
func (j *JSONWebsocketCodec) WriteResponse(r *rpc.Response, v interface{}) error {
|
|
j.Lock()
|
|
defer j.Unlock()
|
|
|
|
if err := j.conn.WriteJSON(Message{Response: r}); err != nil {
|
|
return err
|
|
}
|
|
return j.conn.WriteJSON(Message{Value: v})
|
|
}
|
|
|
|
func (j *JSONWebsocketCodec) readMessage(v interface{}) (*Message, error) {
|
|
m := Message{Value: v}
|
|
if err := j.conn.ReadJSON(&m); err != nil {
|
|
j.err <- err
|
|
close(j.err)
|
|
return nil, err
|
|
}
|
|
return &m, nil
|
|
}
|
|
|
|
// ReadResponseHeader implements rpc.ClientCodec
|
|
func (j *JSONWebsocketCodec) ReadResponseHeader(r *rpc.Response) error {
|
|
m, err := j.readMessage(nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if m.Response == nil {
|
|
return ErrInvalidMessage
|
|
}
|
|
*r = *m.Response
|
|
return nil
|
|
}
|
|
|
|
// ReadResponseBody implements rpc.ClientCodec
|
|
func (j *JSONWebsocketCodec) ReadResponseBody(v interface{}) error {
|
|
_, err := j.readMessage(v)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if v == nil {
|
|
return ErrInvalidMessage
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Close implements rpc.ClientCodec and rpc.ServerCodec
|
|
func (j *JSONWebsocketCodec) Close() error {
|
|
return j.conn.Close()
|
|
}
|
|
|
|
// ReadRequestHeader implements rpc.ServerCodec
|
|
func (j *JSONWebsocketCodec) ReadRequestHeader(r *rpc.Request) error {
|
|
m, err := j.readMessage(nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if m.Request == nil {
|
|
return ErrInvalidMessage
|
|
}
|
|
*r = *m.Request
|
|
return nil
|
|
}
|
|
|
|
// ReadRequestBody implements rpc.ServerCodec
|
|
func (j *JSONWebsocketCodec) ReadRequestBody(v interface{}) error {
|
|
_, err := j.readMessage(v)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if v == nil {
|
|
return ErrInvalidMessage
|
|
}
|
|
return nil
|
|
}
|