mirror of
https://github.com/weaveworks/scope.git
synced 2026-03-04 18:51:17 +00:00
322 lines
7.0 KiB
Go
322 lines
7.0 KiB
Go
package xfer
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net/http"
|
|
"net/rpc"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gorilla/websocket"
|
|
|
|
"github.com/weaveworks/scope/common/sanitize"
|
|
)
|
|
|
|
const (
|
|
initialBackoff = 1 * time.Second
|
|
maxBackoff = 60 * time.Second
|
|
)
|
|
|
|
// Details are some generic details that can be fetched from /api
|
|
type Details struct {
|
|
ID string `json:"id"`
|
|
Version string `json:"version"`
|
|
Hostname string `json:"hostname"`
|
|
}
|
|
|
|
// AppClient is a client to an app for dealing with controls.
|
|
type AppClient interface {
|
|
Details() (Details, error)
|
|
ControlConnection()
|
|
PipeConnection(string, Pipe)
|
|
PipeClose(string) error
|
|
Publish(r io.Reader) error
|
|
Stop()
|
|
}
|
|
|
|
// appClient is a client to an app for dealing with controls.
|
|
type appClient struct {
|
|
ProbeConfig
|
|
|
|
quit chan struct{}
|
|
mtx sync.Mutex
|
|
target string
|
|
client http.Client
|
|
|
|
// Track all the background goroutines, ensure they all stop
|
|
backgroundWait sync.WaitGroup
|
|
|
|
// Track ongoing websocket connections
|
|
conns map[string]*websocket.Conn
|
|
|
|
// For publish
|
|
publishLoop sync.Once
|
|
readers chan io.Reader
|
|
|
|
// For controls
|
|
control ControlHandler
|
|
}
|
|
|
|
// NewAppClient makes a new appClient.
|
|
func NewAppClient(pc ProbeConfig, hostname, target string, control ControlHandler) (AppClient, error) {
|
|
httpTransport, err := pc.getHTTPTransport(hostname)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &appClient{
|
|
ProbeConfig: pc,
|
|
quit: make(chan struct{}),
|
|
target: target,
|
|
client: http.Client{
|
|
Transport: httpTransport,
|
|
},
|
|
conns: map[string]*websocket.Conn{},
|
|
readers: make(chan io.Reader),
|
|
control: control,
|
|
}, nil
|
|
}
|
|
|
|
func (c *appClient) hasQuit() bool {
|
|
select {
|
|
case <-c.quit:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (c *appClient) registerConn(id string, conn *websocket.Conn) bool {
|
|
c.mtx.Lock()
|
|
defer c.mtx.Unlock()
|
|
if c.hasQuit() {
|
|
conn.Close()
|
|
return false
|
|
}
|
|
c.conns[id] = conn
|
|
return true
|
|
}
|
|
|
|
func (c *appClient) closeConn(id string) {
|
|
c.mtx.Lock()
|
|
defer c.mtx.Unlock()
|
|
if conn, ok := c.conns[id]; ok {
|
|
conn.Close()
|
|
delete(c.conns, id)
|
|
}
|
|
}
|
|
|
|
func (c *appClient) retainGoroutine() bool {
|
|
c.mtx.Lock()
|
|
defer c.mtx.Unlock()
|
|
if c.hasQuit() {
|
|
return false
|
|
}
|
|
c.backgroundWait.Add(1)
|
|
return true
|
|
}
|
|
|
|
func (c *appClient) releaseGoroutine() {
|
|
c.backgroundWait.Done()
|
|
}
|
|
|
|
// Stop stops the appClient.
|
|
func (c *appClient) Stop() {
|
|
c.mtx.Lock()
|
|
close(c.readers)
|
|
close(c.quit)
|
|
for _, conn := range c.conns {
|
|
conn.Close()
|
|
}
|
|
c.conns = map[string]*websocket.Conn{}
|
|
c.mtx.Unlock()
|
|
|
|
c.backgroundWait.Wait()
|
|
c.client.Transport.(*http.Transport).CloseIdleConnections()
|
|
return
|
|
}
|
|
|
|
// Details fetches the details (version, id) of the app.
|
|
func (c *appClient) Details() (Details, error) {
|
|
result := Details{}
|
|
req, err := c.ProbeConfig.authorizedRequest("GET", sanitize.URL("", 0, "/api")(c.target), nil)
|
|
if err != nil {
|
|
return result, err
|
|
}
|
|
resp, err := c.client.Do(req)
|
|
if err != nil {
|
|
return result, err
|
|
}
|
|
defer resp.Body.Close()
|
|
return result, json.NewDecoder(resp.Body).Decode(&result)
|
|
}
|
|
|
|
func (c *appClient) doWithBackoff(msg string, f func() (bool, error)) {
|
|
if !c.retainGoroutine() {
|
|
return
|
|
}
|
|
defer c.releaseGoroutine()
|
|
|
|
backoff := initialBackoff
|
|
|
|
for {
|
|
done, err := f()
|
|
if done {
|
|
return
|
|
}
|
|
if err == nil {
|
|
backoff = initialBackoff
|
|
continue
|
|
}
|
|
|
|
log.Printf("Error doing %s for %s, backing off %s: %v", msg, c.target, backoff, err)
|
|
select {
|
|
case <-time.After(backoff):
|
|
case <-c.quit:
|
|
return
|
|
}
|
|
backoff *= 2
|
|
if backoff > maxBackoff {
|
|
backoff = maxBackoff
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *appClient) controlConnection() (bool, error) {
|
|
dialer := websocket.Dialer{}
|
|
headers := http.Header{}
|
|
c.ProbeConfig.authorizeHeaders(headers)
|
|
// TODO(twilkie) need to update sanitize to work with wss
|
|
url := sanitize.URL("ws://", 0, "/api/control/ws")(c.target)
|
|
conn, _, err := dialer.Dial(url, headers)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
defer func() {
|
|
log.Printf("Closing control connection to %s", c.target)
|
|
conn.Close()
|
|
}()
|
|
|
|
codec := NewJSONWebsocketCodec(conn)
|
|
server := rpc.NewServer()
|
|
if err := server.RegisterName("control", c.control); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
// Will return false if we are exiting
|
|
if !c.registerConn("control", conn) {
|
|
return true, nil
|
|
}
|
|
defer c.closeConn("control")
|
|
|
|
server.ServeCodec(codec)
|
|
return false, nil
|
|
}
|
|
|
|
func (c *appClient) ControlConnection() {
|
|
go func() {
|
|
log.Printf("Control connection to %s starting", c.target)
|
|
defer log.Printf("Control connection to %s exiting", c.target)
|
|
c.doWithBackoff("controls", c.controlConnection)
|
|
}()
|
|
}
|
|
|
|
func (c *appClient) publish(r io.Reader) error {
|
|
url := sanitize.URL("", 0, "/api/report")(c.target)
|
|
req, err := c.ProbeConfig.authorizedRequest("POST", url, r)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
req.Header.Set("Content-Encoding", "gzip")
|
|
// req.Header.Set("Content-Type", "application/binary") // TODO: we should use http.DetectContentType(..) on the gob'ed
|
|
resp, err := c.client.Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
return fmt.Errorf(resp.Status)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *appClient) startPublishing() {
|
|
go func() {
|
|
log.Printf("Publish loop for %s starting", c.target)
|
|
defer log.Printf("Publish loop for %s exiting", c.target)
|
|
c.doWithBackoff("publish", func() (bool, error) {
|
|
r := <-c.readers
|
|
if r == nil {
|
|
return true, nil
|
|
}
|
|
return false, c.publish(r)
|
|
})
|
|
}()
|
|
}
|
|
|
|
// Publish implements Publisher
|
|
func (c *appClient) Publish(r io.Reader) error {
|
|
// Lazily start the background publishing loop.
|
|
c.publishLoop.Do(c.startPublishing)
|
|
select {
|
|
case c.readers <- r:
|
|
default:
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *appClient) pipeConnection(id string, pipe Pipe) (bool, error) {
|
|
dialer := websocket.Dialer{}
|
|
headers := http.Header{}
|
|
c.ProbeConfig.authorizeHeaders(headers)
|
|
// TODO(twilkie) need to update sanitize to work with wss
|
|
url := sanitize.URL("ws://", 0, fmt.Sprintf("/api/pipe/%s/probe", id))(c.target)
|
|
conn, resp, err := dialer.Dial(url, headers)
|
|
if resp != nil && resp.StatusCode == http.StatusNotFound {
|
|
// Special handling - 404 means the app/user has closed the pipe
|
|
pipe.Close()
|
|
return true, nil
|
|
}
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
// Will return false if we are exiting
|
|
if !c.registerConn(id, conn) {
|
|
return true, nil
|
|
}
|
|
defer c.closeConn(id)
|
|
|
|
_, remote := pipe.Ends()
|
|
return false, pipe.CopyToWebsocket(remote, conn)
|
|
}
|
|
|
|
func (c *appClient) PipeConnection(id string, pipe Pipe) {
|
|
go func() {
|
|
log.Printf("Pipe %s connection to %s starting", id, c.target)
|
|
defer log.Printf("Pipe %s connection to %s exiting", id, c.target)
|
|
c.doWithBackoff(id, func() (bool, error) {
|
|
return c.pipeConnection(id, pipe)
|
|
})
|
|
}()
|
|
}
|
|
|
|
// PipeClose closes the given pipe id on the app.
|
|
func (c *appClient) PipeClose(id string) error {
|
|
url := sanitize.URL("", 0, fmt.Sprintf("/api/pipe/%s", id))(c.target)
|
|
req, err := c.ProbeConfig.authorizedRequest("DELETE", url, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
resp, err := c.client.Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
resp.Body.Close()
|
|
return nil
|
|
}
|