Merge pull request #598 from weaveworks/315-capabilities

Add controls form container start, stop, pause, etc
This commit is contained in:
Tom Wilkie
2015-11-06 18:22:05 +00:00
64 changed files with 1926 additions and 518 deletions

View File

@@ -23,7 +23,7 @@ var (
renderer: render.PodRenderer,
Name: "Pods",
Options: map[string][]APITopologyOption{"system": {
{"show", "System containers shown", false, nop},
{"show", "System containers shown", false, render.FilterNoop},
{"hide", "System containers hidden", true, render.FilterSystem},
}},
},
@@ -33,7 +33,7 @@ var (
renderer: render.PodServiceRenderer,
Name: "by service",
Options: map[string][]APITopologyOption{"system": {
{"show", "System containers shown", false, nop},
{"show", "System containers shown", false, render.FilterNoop},
{"hide", "System containers hidden", true, render.FilterSystem},
}},
},
@@ -41,6 +41,17 @@ var (
)
func init() {
containerFilters := map[string][]APITopologyOption{
"system": {
{"show", "System containers shown", false, render.FilterNoop},
{"hide", "System containers hidden", true, render.FilterSystem},
},
"stopped": {
{"show", "Stopped containers shown", false, render.FilterNoop},
{"hide", "Stopped containers hidden", true, render.FilterStopped},
},
}
// Topology option labels should tell the current state. The first item must
// be the verb to get to that state
topologyRegistry.add(
@@ -51,7 +62,7 @@ func init() {
Options: map[string][]APITopologyOption{"unconnected": {
// Show the user why there are filtered nodes in this view.
// Don't give them the option to show those nodes.
{"hide", "Unconnected nodes hidden", true, nop},
{"hide", "Unconnected nodes hidden", true, render.FilterNoop},
}},
},
APITopologyDesc{
@@ -61,37 +72,28 @@ func init() {
Name: "by name",
Options: map[string][]APITopologyOption{"unconnected": {
// Ditto above.
{"hide", "Unconnected nodes hidden", true, nop},
{"hide", "Unconnected nodes hidden", true, render.FilterNoop},
}},
},
APITopologyDesc{
id: "containers",
renderer: render.ContainerWithImageNameRenderer,
Name: "Containers",
Options: map[string][]APITopologyOption{"system": {
{"show", "System containers shown", false, nop},
{"hide", "System containers hidden", true, render.FilterSystem},
}},
Options: containerFilters,
},
APITopologyDesc{
id: "containers-by-image",
parent: "containers",
renderer: render.ContainerImageRenderer,
Name: "by image",
Options: map[string][]APITopologyOption{"system": {
{"show", "System containers shown", false, nop},
{"hide", "System containers hidden", true, render.FilterSystem},
}},
Options: containerFilters,
},
APITopologyDesc{
id: "containers-by-hostname",
parent: "containers",
renderer: render.ContainerHostnameRenderer,
Name: "by hostname",
Options: map[string][]APITopologyOption{"system": {
{"show", "System containers shown", false, nop},
{"hide", "System containers hidden", true, render.FilterSystem},
}},
Options: containerFilters,
},
APITopologyDesc{
id: "hosts",
@@ -226,8 +228,6 @@ func decorateWithStats(rpt report.Report, renderer render.Renderer) topologyStat
}
}
func nop(r render.Renderer) render.Renderer { return r }
func (r *registry) enableKubernetesTopologies() {
r.add(kubernetesTopologies...)
}

View File

@@ -60,8 +60,13 @@ func TestAPITopologyContainers(t *testing.T) {
if err := json.Unmarshal(body, &topo); err != nil {
t.Fatal(err)
}
want := expected.RenderedContainers.Copy()
for id, node := range want {
node.ControlNode = ""
want[id] = node
}
if want, have := expected.RenderedContainers, topo.Nodes.Prune(); !reflect.DeepEqual(want, have) {
if have := topo.Nodes.Prune(); !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
}
}

121
app/controls.go Normal file
View File

@@ -0,0 +1,121 @@
package main
import (
"log"
"math/rand"
"net/http"
"net/rpc"
"sync"
"github.com/gorilla/mux"
"github.com/weaveworks/scope/xfer"
)
func registerControlRoutes(router *mux.Router) {
controlRouter := &controlRouter{
probes: map[string]controlHandler{},
}
router.Methods("GET").Path("/api/control/ws").HandlerFunc(controlRouter.handleProbeWS)
router.Methods("POST").MatcherFunc(URLMatcher("/api/control/{probeID}/{nodeID}/{control}")).HandlerFunc(controlRouter.handleControl)
}
type controlHandler struct {
id int64
client *rpc.Client
}
func (ch *controlHandler) handle(req xfer.Request) xfer.Response {
var res xfer.Response
if err := ch.client.Call("control.Handle", req, &res); err != nil {
return xfer.ResponseError(err)
}
return res
}
type controlRouter struct {
sync.Mutex
probes map[string]controlHandler
}
func (cr *controlRouter) get(probeID string) (controlHandler, bool) {
cr.Lock()
defer cr.Unlock()
handler, ok := cr.probes[probeID]
return handler, ok
}
func (cr *controlRouter) set(probeID string, handler controlHandler) {
cr.Lock()
defer cr.Unlock()
cr.probes[probeID] = handler
}
func (cr *controlRouter) rm(probeID string, handler controlHandler) {
cr.Lock()
defer cr.Unlock()
// NB probe might have reconnected in the mean time, need to ensure we do not
// delete new connection! Also, it might have connected then deleted itself!
if cr.probes[probeID].id == handler.id {
delete(cr.probes, probeID)
}
}
// handleControl routes control requests from the client to the appropriate
// probe. Its is blocking.
func (cr *controlRouter) handleControl(w http.ResponseWriter, r *http.Request) {
var (
vars = mux.Vars(r)
probeID = vars["probeID"]
nodeID = vars["nodeID"]
control = vars["control"]
)
handler, ok := cr.get(probeID)
if !ok {
log.Printf("Probe %s is not connected right now...", probeID)
http.NotFound(w, r)
return
}
result := handler.handle(xfer.Request{
ID: rand.Int63(),
NodeID: nodeID,
Control: control,
})
if result.Error != "" {
respondWith(w, http.StatusBadRequest, result.Error)
return
}
respondWith(w, http.StatusOK, result.Value)
}
// handleProbeWS accepts websocket connections from the probe and registers
// them in the control router, such that HandleControl calls can find them.
func (cr *controlRouter) handleProbeWS(w http.ResponseWriter, r *http.Request) {
probeID := r.Header.Get(xfer.ScopeProbeIDHeader)
if probeID == "" {
respondWith(w, http.StatusBadRequest, xfer.ScopeProbeIDHeader)
return
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("Error upgrading to websocket: %v", err)
return
}
defer conn.Close()
codec := xfer.NewJSONWebsocketCodec(conn)
client := rpc.NewClientWithCodec(codec)
handler := controlHandler{
id: rand.Int63(),
client: client,
}
cr.set(probeID, handler)
codec.WaitForReadError()
cr.rm(probeID, handler)
client.Close()
}

69
app/controls_test.go Normal file
View File

@@ -0,0 +1,69 @@
package main
import (
"encoding/json"
"net"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/weaveworks/scope/xfer"
"github.com/gorilla/mux"
)
func TestControl(t *testing.T) {
router := mux.NewRouter()
registerControlRoutes(router)
server := httptest.NewServer(router)
defer server.Close()
ip, port, err := net.SplitHostPort(strings.TrimPrefix(server.URL, "http://"))
if err != nil {
t.Fatal(err)
}
probeConfig := xfer.ProbeConfig{
ProbeID: "foo",
}
client, err := xfer.NewAppClient(probeConfig, ip+":"+port, ip+":"+port)
if err != nil {
t.Fatal(err)
}
defer client.Stop()
client.ControlConnection(xfer.ControlHandlerFunc(func(req xfer.Request) xfer.Response {
if req.NodeID != "nodeid" {
t.Fatalf("'%s' != 'nodeid'", req.NodeID)
}
if req.Control != "control" {
t.Fatalf("'%s' != 'control'", req.Control)
}
return xfer.Response{
Value: "foo",
}
}))
time.Sleep(100 * time.Millisecond)
httpClient := http.Client{
Timeout: 1 * time.Second,
}
resp, err := httpClient.Post(server.URL+"/api/control/foo/nodeid/control", "", nil)
if err != nil {
t.Fatal(err)
}
var response string
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
t.Fatal(err)
}
if response != "foo" {
t.Fatalf("'%s' != 'foo'", response)
}
}

View File

@@ -14,6 +14,8 @@ import (
"syscall"
"time"
"github.com/gorilla/mux"
"github.com/weaveworks/scope/xfer"
)
@@ -25,6 +27,19 @@ var (
uniqueID = "0"
)
func registerStatic(router *mux.Router) {
router.Methods("GET").PathPrefix("/").Handler(http.FileServer(FS(false)))
}
// Router creates the mux for all the various app components.
func Router(c collector) *mux.Router {
router := mux.NewRouter()
registerTopologyRoutes(c, router)
registerControlRoutes(router)
registerStatic(router)
return router
}
func main() {
var (
window = flag.Duration("window", 15*time.Second, "window")

View File

@@ -11,6 +11,7 @@ import (
"github.com/gorilla/mux"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/xfer"
)
// URLMatcher uses request.RequestURI (the raw, unparsed request) to attempt
@@ -53,13 +54,7 @@ func gzipHandler(h http.HandlerFunc) http.HandlerFunc {
return handlers.GZIPHandlerFunc(h, nil)
}
// Router returns the HTTP dispatcher, managing API and UI requests, and
// accepting reports from probes.. It will always use the embedded HTML
// resources for the UI.
func Router(c collector) *mux.Router {
router := mux.NewRouter()
router.HandleFunc("/api/report", makeReportPostHandler(c)).Methods("POST")
func registerTopologyRoutes(c collector, router *mux.Router) {
get := router.Methods("GET").Subrouter()
get.HandleFunc("/api", gzipHandler(apiHandler))
get.HandleFunc("/api/topology", gzipHandler(topologyRegistry.makeTopologyList(c)))
@@ -72,9 +67,9 @@ func Router(c collector) *mux.Router {
get.MatcherFunc(URLMatcher("/api/topology/{topology}/{local}/{remote}")).HandlerFunc(
gzipHandler(topologyRegistry.captureRenderer(c, handleEdge)))
get.HandleFunc("/api/report", gzipHandler(makeRawReportHandler(c)))
get.PathPrefix("/").Handler(http.FileServer(FS(false))) // everything else is static
return router
post := router.Methods("POST").Subrouter()
post.HandleFunc("/api/report", makeReportPostHandler(c)).Methods("POST")
}
func makeReportPostHandler(a Adder) http.HandlerFunc {
@@ -104,12 +99,6 @@ func makeReportPostHandler(a Adder) http.HandlerFunc {
}
}
// APIDetails are some generic details that can be fetched from /api
type APIDetails struct {
ID string `json:"id"`
Version string `json:"version"`
}
func apiHandler(w http.ResponseWriter, r *http.Request) {
respondWith(w, http.StatusOK, APIDetails{ID: uniqueID, Version: version})
respondWith(w, http.StatusOK, xfer.Details{ID: uniqueID, Version: version})
}

View File

@@ -4,11 +4,15 @@ package main
import (
"net/http/httptest"
"testing"
"github.com/gorilla/mux"
)
// Test site
func TestSite(t *testing.T) {
ts := httptest.NewServer(Router(StaticReport{}))
router := mux.NewRouter()
registerStatic(router)
ts := httptest.NewServer(router)
defer ts.Close()
is200(t, ts, "/")

View File

@@ -26,6 +26,9 @@
"templateStrings": true,
"jsx": true
},
"globals": {
__WS_URL__: false
},
"rules": {
/**
* Strict mode

View File

@@ -66,12 +66,29 @@ module.exports = {
});
},
clearControlError: function() {
AppDispatcher.dispatch({
type: ActionTypes.CLEAR_CONTROL_ERROR
});
},
closeWebsocket: function() {
AppDispatcher.dispatch({
type: ActionTypes.CLOSE_WEBSOCKET
});
},
doControl: function(probeId, nodeId, control) {
AppDispatcher.dispatch({
type: ActionTypes.DO_CONTROL
});
WebapiUtils.doControl(
probeId,
nodeId,
control,
);
},
enterEdge: function(edgeId) {
AppDispatcher.dispatch({
type: ActionTypes.ENTER_EDGE,
@@ -107,6 +124,19 @@ module.exports = {
});
},
receiveControlError: function(err) {
AppDispatcher.dispatch({
type: ActionTypes.DO_CONTROL_ERROR,
error: err
});
},
receiveControlSuccess: function() {
AppDispatcher.dispatch({
type: ActionTypes.DO_CONTROL_SUCCESS
});
},
receiveNodeDetails: function(details) {
AppDispatcher.dispatch({
type: ActionTypes.RECEIVE_NODE_DETAILS,
@@ -139,15 +169,15 @@ module.exports = {
receiveApiDetails: function(apiDetails) {
AppDispatcher.dispatch({
type: ActionTypes.RECEIVE_API_DETAILS,
version: apiDetails.version
type: ActionTypes.RECEIVE_API_DETAILS,
version: apiDetails.version
});
},
receiveError: function(errorUrl) {
AppDispatcher.dispatch({
errorUrl: errorUrl,
type: ActionTypes.RECEIVE_ERROR
errorUrl: errorUrl,
type: ActionTypes.RECEIVE_ERROR
});
},

View File

@@ -22,9 +22,9 @@ const Node = React.createClass({
const subLabelOffsetY = labelOffsetY + 17;
const isPseudo = !!this.props.pseudo;
const color = isPseudo ? '' : this.getNodeColor(this.props.rank);
const onClick = this.props.onClick;
const onMouseEnter = this.handleMouseEnter;
const onMouseLeave = this.handleMouseLeave;
const onMouseClick = this.handleMouseClick;
const classNames = ['node'];
const animConfig = [80, 20]; // stiffness, bounce
const label = this.ellipsis(props.label, 14, scale(4 * scaleFactor));
@@ -51,7 +51,7 @@ const Node = React.createClass({
const transform = `translate(${interpolated.x.val},${interpolated.y.val})`;
return (
<g className={classes} transform={transform} id={props.id}
onClick={onClick} onMouseEnter={onMouseEnter} onMouseLeave={onMouseLeave}>
onClick={onMouseClick} onMouseEnter={onMouseEnter} onMouseLeave={onMouseLeave}>
{props.highlighted && <circle r={scale(0.7 * interpolated.f.val)} className="highlighted"></circle>}
<circle r={scale(0.5 * interpolated.f.val)} className="border" stroke={color}></circle>
<circle r={scale(0.45 * interpolated.f.val)} className="shadow"></circle>
@@ -79,6 +79,11 @@ const Node = React.createClass({
return truncatedText;
},
handleMouseClick: function(ev) {
ev.stopPropagation();
AppActions.clickNode(ev.currentTarget.id);
},
handleMouseEnter: function(ev) {
AppActions.enterNode(ev.currentTarget.id);
},

View File

@@ -218,7 +218,7 @@ const NodesChart = React.createClass({
<div className="nodes-chart">
{errorEmpty}
{errorMaxNodesExceeded}
<svg width="100%" height="100%" className={svgClassNames} onMouseUp={this.handleMouseUp}>
<svg width="100%" height="100%" className={svgClassNames} onClick={this.handleMouseClick}>
<Spring endValue={{val: translate, config: [80, 20]}}>
{function(interpolated) {
let interpolatedTranslate = wasShifted ? interpolated.val : panTranslate;
@@ -402,7 +402,7 @@ const NodesChart = React.createClass({
isZooming: false, // distinguish pan/zoom from click
handleMouseUp: function() {
handleMouseClick: function() {
if (!this.isZooming) {
AppActions.clickCloseDetails();
// allow shifts again

View File

@@ -2,6 +2,8 @@ jest.dontMock('../node-details.js');
jest.dontMock('../../mixins/node-color-mixin');
jest.dontMock('../../utils/title-utils');
__WS_URL__ = false
describe('NodeDetails', () => {
let NodeDetails;
let nodes;

View File

@@ -20,6 +20,8 @@ const ESC_KEY_CODE = 27;
function getStateFromStores() {
return {
activeTopologyOptions: AppStore.getActiveTopologyOptions(),
controlError: AppStore.getControlError(),
controlPending: AppStore.isControlPending(),
currentTopology: AppStore.getCurrentTopology(),
currentTopologyId: AppStore.getCurrentTopologyId(),
currentTopologyOptions: AppStore.getCurrentTopologyOptions(),
@@ -81,6 +83,8 @@ const App = React.createClass({
return (
<div>
{showingDetails && <Details nodes={this.state.nodes}
controlError={this.state.controlError}
controlPending={this.state.controlPending}
nodeId={this.state.selectedNodeId}
details={this.state.nodeDetails} /> }

View File

@@ -2,7 +2,6 @@ const React = require('react');
const mui = require('material-ui');
const Paper = mui.Paper;
const AppActions = require('../actions/app-actions');
const NodeDetails = require('./node-details');
const Details = React.createClass({
@@ -11,21 +10,10 @@ const Details = React.createClass({
return (
<div id="details">
<Paper zDepth={3} style={{height: '100%', paddingBottom: 8}}>
<div className="details-tools-wrapper">
<div className="details-tools">
<span className="fa fa-close" onClick={this.handleClickClose} />
</div>
</div>
<NodeDetails nodeId={this.props.nodeId} details={this.props.details}
nodes={this.props.nodes} />
<NodeDetails {...this.props} />
</Paper>
</div>
);
},
handleClickClose: function(ev) {
ev.preventDefault();
AppActions.clickCloseDetails();
}
});

View File

@@ -0,0 +1,23 @@
const React = require('react');
const AppActions = require('../actions/app-actions');
const NodeControlButton = React.createClass({
render: function() {
let className = `node-control-button fa ${this.props.control.icon}`;
if (this.props.pending) {
className += ' node-control-button-pending';
}
return (
<span className={className} title={this.props.control.human} onClick={this.handleClick} />
);
},
handleClick: function(ev) {
ev.preventDefault();
AppActions.doControl(this.props.control.probeId, this.props.control.nodeId, this.props.control.id);
}
});
module.exports = NodeControlButton;

View File

@@ -0,0 +1,25 @@
const React = require('react');
const NodeControlButton = require('./node-control-button');
const NodeDetailsControls = React.createClass({
render: function() {
return (
<div className="node-details-controls">
{this.props.error && <div className="node-details-controls-error" title={this.props.error}>
<span className="node-details-controls-error-icon fa fa-warning" />
<span className="node-details-controls-error-messages">{this.props.error}</span>
</div>}
{this.props.controls && this.props.controls.map(control => {
return (
<NodeControlButton control={control} pending={this.props.pending} />
);
})}
</div>
);
}
});
module.exports = NodeDetailsControls;

View File

@@ -1,6 +1,7 @@
const _ = require('lodash');
const React = require('react');
const NodeDetailsControls = require('./node-details-controls');
const NodeDetailsTable = require('./node-details-table');
const NodeColorMixin = require('../mixins/node-color-mixin');
const TitleUtils = require('../utils/title-utils');
@@ -65,6 +66,8 @@ const NodeDetails = React.createClass({
return (
<div className="node-details">
<div className="node-details-header" style={style}>
<NodeDetailsControls controls={details.controls}
pending={this.props.controlPending} error={this.props.controlError} />
<h2 className="node-details-header-label truncate" title={details.label_major}>
{details.label_major}
</h2>

View File

@@ -1,7 +1,6 @@
const React = require('react');
const NodesChart = require('../charts/nodes-chart');
const AppActions = require('../actions/app-actions');
const navbarHeight = 160;
const marginTop = 0;
@@ -23,10 +22,6 @@ const Nodes = React.createClass({
window.removeEventListener('resize', this.handleResize);
},
onNodeClick: function(ev) {
AppActions.clickNode(ev.currentTarget.id);
},
render: function() {
return (
<NodesChart
@@ -34,7 +29,6 @@ const Nodes = React.createClass({
highlightedNodeIds={this.props.highlightedNodeIds}
selectedNodeId={this.props.selectedNodeId}
nodes={this.props.nodes}
onNodeClick={this.onNodeClick}
width={this.state.width}
height={this.state.height}
topologyId={this.props.topologyId}

View File

@@ -2,10 +2,14 @@ const keymirror = require('keymirror');
module.exports = keymirror({
CHANGE_TOPOLOGY_OPTION: null,
CLEAR_CONTROL_ERROR: null,
CLICK_CLOSE_DETAILS: null,
CLICK_NODE: null,
CLICK_TOPOLOGY: null,
CLOSE_WEBSOCKET: null,
DO_CONTROL: null,
DO_CONTROL_ERROR: null,
DO_CONTROL_SUCCESS: null,
ENTER_EDGE: null,
ENTER_NODE: null,
HIT_ESC_KEY: null,

View File

@@ -5,7 +5,7 @@ const AppDispatcher = new flux.Dispatcher();
AppDispatcher.dispatch = _.wrap(flux.Dispatcher.prototype.dispatch, function(func) {
const args = Array.prototype.slice.call(arguments, 1);
// console.log(args[0]);
console.log(args[0]);
func.apply(this, args);
});

View File

@@ -1,8 +0,0 @@
// This file is an entrypoint for development,
// see main.js for the real entrypoint
// Inject websocket url to dev backend
window.WS_PROTO = (location.protocol === 'https:' ? 'wss' : 'ws');
window.WS_URL = window.WS_PROTO + '://' + location.hostname + ':4040';
require('./main');

View File

@@ -46,6 +46,8 @@ function makeNode(node) {
let topologyOptions = makeOrderedMap();
let adjacentNodes = makeSet();
let controlError = null;
let controlPending = false;
let currentTopology = null;
let currentTopologyId = 'containers';
let errorUrl = null;
@@ -132,6 +134,10 @@ const AppStore = assign({}, EventEmitter.prototype, {
return adjacentNodes;
},
getControlError: function() {
return controlError;
},
getCurrentTopology: function() {
if (!currentTopology) {
currentTopology = setTopology(currentTopologyId);
@@ -209,6 +215,10 @@ const AppStore = assign({}, EventEmitter.prototype, {
return version;
},
isControlPending: function() {
return controlPending;
},
isRouteSet: function() {
return routeSet;
},
@@ -244,13 +254,23 @@ AppStore.registeredCallback = function(payload) {
AppStore.emit(AppStore.CHANGE_EVENT);
break;
case ActionTypes.CLEAR_CONTROL_ERROR:
controlError = null;
AppStore.emit(AppStore.CHANGE_EVENT);
break;
case ActionTypes.CLICK_CLOSE_DETAILS:
selectedNodeId = null;
AppStore.emit(AppStore.CHANGE_EVENT);
break;
case ActionTypes.CLICK_NODE:
selectedNodeId = payload.nodeId;
if (payload.nodeId === selectedNodeId) {
// clicking same node twice unsets the selection
selectedNodeId = null;
} else {
selectedNodeId = payload.nodeId;
}
AppStore.emit(AppStore.CHANGE_EVENT);
break;
@@ -268,6 +288,12 @@ AppStore.registeredCallback = function(payload) {
AppStore.emit(AppStore.CHANGE_EVENT);
break;
case ActionTypes.DO_CONTROL:
controlPending = true;
controlError = null;
AppStore.emit(AppStore.CHANGE_EVENT);
break;
case ActionTypes.ENTER_EDGE:
mouseOverEdgeId = payload.edgeId;
AppStore.emit(AppStore.CHANGE_EVENT);
@@ -302,6 +328,18 @@ AppStore.registeredCallback = function(payload) {
AppStore.emit(AppStore.CHANGE_EVENT);
break;
case ActionTypes.DO_CONTROL_ERROR:
controlPending = false;
controlError = payload.error;
AppStore.emit(AppStore.CHANGE_EVENT);
break;
case ActionTypes.DO_CONTROL_SUCCESS:
controlPending = false;
controlError = null;
AppStore.emit(AppStore.CHANGE_EVENT);
break;
case ActionTypes.RECEIVE_ERROR:
errorUrl = payload.errorUrl;
AppStore.emit(AppStore.CHANGE_EVENT);

View File

@@ -1,10 +1,11 @@
const debug = require('debug')('scope:web-api-utils');
const reqwest = require('reqwest');
const AppActions = require('../actions/app-actions');
const WS_PROTO = window.WS_PROTO || (location.protocol === 'https:' ? 'wss' : 'ws');
const WS_URL = window.WS_URL || WS_PROTO + '://' + location.host + location.pathname.replace(/\/$/, '');
const wsProto = location.protocol === 'https:' ? 'wss' : 'ws';
const wsUrl = __WS_URL__ || wsProto + '://' + location.host + location.pathname.replace(/\/$/, '');
const apiTimerInterval = 10000;
const reconnectTimerInterval = 5000;
@@ -17,7 +18,7 @@ let currentUrl = null;
let currentOptions = null;
let topologyTimer = 0;
let apiDetailsTimer = 0;
let controlErrorTimer = 0;
function buildOptionsQuery(options) {
if (options) {
@@ -35,7 +36,7 @@ function createWebsocket(topologyUrl, optionsQuery) {
socket.close();
}
socket = new WebSocket(WS_URL + topologyUrl
socket = new WebSocket(wsUrl + topologyUrl
+ '/ws?t=' + updateFrequency + '&' + optionsQuery);
socket.onopen = function() {
@@ -135,7 +136,28 @@ function getApiDetails() {
});
}
function doControl(probeId, nodeId, control) {
clearTimeout(controlErrorTimer);
const url = `api/control/${encodeURIComponent(probeId)}/`
+ `${encodeURIComponent(nodeId)}/${control}`;
reqwest({
method: 'POST',
url: url,
success: function() {
AppActions.receiveControlSuccess();
},
error: function(err) {
AppActions.receiveControlError(err.response);
controlErrorTimer = setTimeout(function() {
AppActions.clearControlError();
}, 10000);
}
});
}
module.exports = {
doControl: doControl,
getNodeDetails: getNodeDetails,
getTopologies: getTopologies,

View File

@@ -35,6 +35,11 @@
text-overflow: ellipsis;
}
.palable {
transition: opacity .2s ease-in-out;
transition: border-color .2s ease-in-out;
}
.hideable {
transition: opacity .5s ease-in-out;
}
@@ -313,24 +318,6 @@ h2 {
top: 24px;
bottom: 48px;
width: 420px;
.details-tools-wrapper {
position: relative;
}
.details-tools {
position: absolute;
top: 16px;
right: 24px;
color: @white;
span {
cursor: pointer;
&:hover {
color: white;
}
}
}
}
.node-details {
@@ -342,26 +329,79 @@ h2 {
padding: 24px 36px 24px 36px;
&-row {
display: flex;
}
&-label {
color: white;
margin: 0;
width: 348px;
padding-top: 0;
&-minor {
width: 348px;
flex: 1;
font-size: 120%;
color: @white;
}
}
.details-tools {
position: absolute;
top: 16px;
right: 24px;
}
&-notavailable {
background-color: @background-dark-color;
}
}
&-controls {
white-space: nowrap;
text-align: right;
margin: -8px -8px 0 0;
.node-control-button {
.palable;
padding: 6px;
margin-left: 2px;
font-size: 110%;
color: @white;
cursor: pointer;
opacity: 0.7;
border: 1px solid rgba(255, 255, 255, 0);
border-radius: 10%;
&:hover {
opacity: 1;
border-color: rgba(255, 255, 255, 0.6);
}
&-pending, &-pending:hover {
opacity: 0.2;
border-color: rgba(255, 255, 255, 0);
cursor: not-allowed;
}
}
&-error {
.truncate;
float: left;
width: 66%;
padding-top: 6px;
text-align: left;
color: @white;
&-icon {
margin-right: 0.5em;
animation: blinking 2.0s infinite ease-in-out;
}
}
}
&-content {
position: absolute;
top: 115px;
top: 128px;
bottom: 0;
padding: 0 36px 0 36px;
overflow-y: scroll;
@@ -449,6 +489,14 @@ h2 {
}
}
@keyframes blinking {
0%, 100% {
opacity: 1.0;
} 50% {
opacity: 0.5;
}
}
@keyframes status-loading {
0%, 100% {
background-color: darken(@background-color, 4%);

View File

@@ -28,9 +28,11 @@ app.get('/app.js', function(req, res) {
// Proxy to backend
var BACKEND_HOST = process.env.BACKEND_HOST || 'localhost:4040';
// HACK need express-http-proxy, because proxy-middleware does
// not proxy to /api itself
app.use(httpProxy('localhost:4040', {
app.use(httpProxy(BACKEND_HOST, {
filter: function(req) {
return url.parse(req.url).path === '/api';
},
@@ -39,7 +41,7 @@ app.use(httpProxy('localhost:4040', {
}
}));
app.use('/api', proxy('http://localhost:4040/api/'));
app.use('/api', proxy('http://' + BACKEND_HOST + '/api/'));
// Serve index page

View File

@@ -12,6 +12,13 @@ var path = require('path');
*
* For more information, see: http://webpack.github.io/docs/configuration.html
*/
// Inject websocket url to dev backend
var BACKEND_HOST = process.env.BACKEND_HOST || 'localhost:4040';
var GLOBALS = {
__WS_URL__: JSON.stringify('ws://' + BACKEND_HOST)
};
module.exports = {
// Efficiently evaluate modules with source maps
@@ -21,7 +28,7 @@ module.exports = {
entry: [
'webpack-dev-server/client?http://localhost:4041',
'webpack/hot/only-dev-server',
'./app/scripts/local'
'./app/scripts/main'
],
// This will not actually create a app.js file in ./build. It is used
@@ -34,6 +41,7 @@ module.exports = {
// Necessary plugins for hot load
plugins: [
new webpack.DefinePlugin(GLOBALS),
new webpack.HotModuleReplacementPlugin(),
new webpack.NoErrorsPlugin()
],

View File

@@ -2,6 +2,10 @@ var webpack = require('webpack');
var autoprefixer = require('autoprefixer-core');
var path = require('path');
var GLOBALS = {
__WS_URL__: 'false'
};
/**
* This is the Webpack configuration file for production.
*/
@@ -56,9 +60,12 @@ module.exports = {
extensions: ['', '.js', '.jsx']
},
plugins: [new webpack.optimize.UglifyJsPlugin({
compress: {
warnings: false
}
})]
plugins: [
new webpack.DefinePlugin(GLOBALS),
new webpack.optimize.UglifyJsPlugin({
compress: {
warnings: false
}
})
]
};

View File

@@ -0,0 +1,22 @@
#! /bin/bash
. ./config.sh
start_suite "Test container controls"
weave_on $HOST1 launch
scope_on $HOST1 launch
CID=$(weave_on $HOST1 run -dti --name alpine alpine /bin/sh)
wait_for_containers $HOST1 60 alpine
assert "docker_on $HOST1 inspect --format='{{.State.Running}}' alpine" "true"
PROBEID=$(docker_on $HOST1 logs weavescope 2>&1 | grep "probe starting" | sed -n 's/^.*ID \([0-9a-f]*\)$/\1/p')
HOSTID=$(echo $HOST1 | cut -d"." -f1)
assert_raises "curl -f -X POST 'http://$HOST1:4040/api/control/$PROBEID/$HOSTID;$CID/docker_stop_container'"
sleep 5
assert "docker_on $HOST1 inspect --format='{{.State.Running}}' alpine" "false"
scope_end_suite

View File

@@ -0,0 +1,44 @@
package controls
import (
"fmt"
"sync"
"github.com/weaveworks/scope/xfer"
)
var (
mtx = sync.Mutex{}
handlers = map[string]xfer.ControlHandlerFunc{}
)
// HandleControlRequest performs a control request.
func HandleControlRequest(req xfer.Request) xfer.Response {
mtx.Lock()
handler, ok := handlers[req.Control]
mtx.Unlock()
if !ok {
return xfer.Response{
ID: req.ID,
Error: fmt.Sprintf("Control '%s' not recognised", req.Control),
}
}
response := handler(req)
response.ID = req.ID
return response
}
// Register a new control handler under a given id.
func Register(control string, f xfer.ControlHandlerFunc) {
mtx.Lock()
defer mtx.Unlock()
handlers[control] = f
}
// Rm deletes the handler for a given name
func Rm(control string) {
mtx.Lock()
defer mtx.Unlock()
delete(handlers, control)
}

View File

@@ -0,0 +1,45 @@
package controls_test
import (
"reflect"
"testing"
"github.com/weaveworks/scope/probe/controls"
"github.com/weaveworks/scope/test"
"github.com/weaveworks/scope/xfer"
)
func TestControls(t *testing.T) {
controls.Register("foo", func(req xfer.Request) xfer.Response {
return xfer.Response{
Value: "bar",
}
})
defer controls.Rm("foo")
want := xfer.Response{
ID: 1234,
Value: "bar",
}
have := controls.HandleControlRequest(xfer.Request{
ID: 1234,
Control: "foo",
})
if !reflect.DeepEqual(want, have) {
t.Fatal(test.Diff(want, have))
}
}
func TestControlsNotFound(t *testing.T) {
want := xfer.Response{
ID: 3456,
Error: "Control 'baz' not recognised",
}
have := controls.HandleControlRequest(xfer.Request{
ID: 3456,
Control: "baz",
})
if !reflect.DeepEqual(want, have) {
t.Fatal(test.Diff(want, have))
}
}

View File

@@ -29,6 +29,7 @@ const (
ContainerIPs = "docker_container_ips"
ContainerHostname = "docker_container_hostname"
ContainerIPsWithScopes = "docker_container_ips_with_scopes"
ContainerState = "docker_container_state"
NetworkRxDropped = "network_rx_dropped"
NetworkRxBytes = "network_rx_bytes"
@@ -49,6 +50,12 @@ const (
CPUTotalUsage = "cpu_total_usage"
CPUUsageInKernelmode = "cpu_usage_in_kernelmode"
CPUSystemCPUUsage = "cpu_system_cpu_usage"
StateRunning = "running"
StateStopped = "stopped"
StatePaused = "paused"
stopTimeout = 10
)
// Exported for testing
@@ -69,6 +76,8 @@ type ClientConn interface {
// Container represents a Docker container
type Container interface {
UpdateState(*docker.Container)
ID() string
Image() string
PID() int
@@ -88,7 +97,15 @@ type container struct {
// NewContainer creates a new Container
func NewContainer(c *docker.Container) Container {
return &container{container: c}
return &container{
container: c,
}
}
func (c *container) UpdateState(container *docker.Container) {
c.Lock()
defer c.Unlock()
c.container = container
}
func (c *container) ID() string {
@@ -231,6 +248,15 @@ func (c *container) GetNode(hostID string, localAddrs []net.IP) report.Node {
ipsWithScopes = append(ipsWithScopes, report.MakeScopedAddressNodeID(hostID, ip))
}
var state string
if c.container.State.Paused {
state = StatePaused
} else if c.container.State.Running {
state = StateRunning
} else {
state = StateStopped
}
result := report.MakeNodeWith(map[string]string{
ContainerID: c.ID(),
ContainerName: strings.TrimPrefix(c.container.Name, "/"),
@@ -238,11 +264,21 @@ func (c *container) GetNode(hostID string, localAddrs []net.IP) report.Node {
ContainerCommand: c.container.Path + " " + strings.Join(c.container.Args, " "),
ImageID: c.container.Image,
ContainerHostname: c.Hostname(),
ContainerState: state,
}).WithSets(report.Sets{
ContainerPorts: c.ports(localAddrs),
ContainerIPs: report.MakeStringSet(ips...),
ContainerIPsWithScopes: report.MakeStringSet(ipsWithScopes...),
})
if c.container.State.Paused {
result = result.WithControls(UnpauseContainer)
} else if c.container.State.Running {
result = result.WithControls(RestartContainer, StopContainer, PauseContainer)
} else {
result = result.WithControls(StartContainer)
}
AddLabels(result, c.container.Config.Labels)
if c.latestStats == nil {

View File

@@ -74,11 +74,12 @@ func TestContainer(t *testing.T) {
"docker_label_foo1": "bar1",
"docker_label_foo2": "bar2",
"memory_usage": "12345",
"docker_container_state": "running",
}).WithSets(report.Sets{
"docker_container_ports": report.MakeStringSet("1.2.3.4:80->80/tcp", "81/tcp"),
"docker_container_ips": report.MakeStringSet("1.2.3.4"),
"docker_container_ips_with_scopes": report.MakeStringSet("scope;1.2.3.4"),
})
}).WithControls(docker.RestartContainer, docker.StopContainer, docker.PauseContainer)
test.Poll(t, 100*time.Millisecond, want, func() interface{} {
node := c.GetNode("scope", []net.IP{})
for k, v := range node.Metadata {
@@ -93,7 +94,7 @@ func TestContainer(t *testing.T) {
t.Errorf("%s != baz", c.Image())
}
if c.PID() != 1 {
t.Errorf("%s != 1", c.PID())
t.Errorf("%d != 1", c.PID())
}
if have := docker.ExtractContainerIPs(c.GetNode("", []net.IP{})); !reflect.DeepEqual(have, []string{"1.2.3.4"}) {
t.Errorf("%v != %v", have, []string{"1.2.3.4"})

83
probe/docker/controls.go Normal file
View File

@@ -0,0 +1,83 @@
package docker
import (
"log"
"github.com/weaveworks/scope/probe/controls"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/xfer"
)
// Control IDs used by the docker intergation.
const (
StopContainer = "docker_stop_container"
StartContainer = "docker_start_container"
RestartContainer = "docker_restart_container"
PauseContainer = "docker_pause_container"
UnpauseContainer = "docker_unpause_container"
waitTime = 10
)
func (r *registry) stopContainer(req xfer.Request) xfer.Response {
log.Printf("Stopping container %s", req.NodeID)
_, containerID, ok := report.ParseContainerNodeID(req.NodeID)
if !ok {
return xfer.ResponseErrorf("Invalid ID: %s", req.NodeID)
}
return xfer.ResponseError(r.client.StopContainer(containerID, waitTime))
}
func (r *registry) startContainer(req xfer.Request) xfer.Response {
log.Printf("Starting container %s", req.NodeID)
_, containerID, ok := report.ParseContainerNodeID(req.NodeID)
if !ok {
return xfer.ResponseErrorf("Invalid ID: %s", req.NodeID)
}
return xfer.ResponseError(r.client.StartContainer(containerID, nil))
}
func (r *registry) restartContainer(req xfer.Request) xfer.Response {
log.Printf("Restarting container %s", req.NodeID)
_, containerID, ok := report.ParseContainerNodeID(req.NodeID)
if !ok {
return xfer.ResponseErrorf("Invalid ID: %s", req.NodeID)
}
return xfer.ResponseError(r.client.RestartContainer(containerID, waitTime))
}
func (r *registry) pauseContainer(req xfer.Request) xfer.Response {
log.Printf("Pausing container %s", req.NodeID)
_, containerID, ok := report.ParseContainerNodeID(req.NodeID)
if !ok {
return xfer.ResponseErrorf("Invalid ID: %s", req.NodeID)
}
return xfer.ResponseError(r.client.PauseContainer(containerID))
}
func (r *registry) unpauseContainer(req xfer.Request) xfer.Response {
log.Printf("Unpausing container %s", req.NodeID)
_, containerID, ok := report.ParseContainerNodeID(req.NodeID)
if !ok {
return xfer.ResponseErrorf("Invalid ID: %s", req.NodeID)
}
return xfer.ResponseError(r.client.UnpauseContainer(containerID))
}
func (r *registry) registerControls() {
controls.Register(StopContainer, r.stopContainer)
controls.Register(StartContainer, r.startContainer)
controls.Register(RestartContainer, r.restartContainer)
controls.Register(PauseContainer, r.pauseContainer)
controls.Register(UnpauseContainer, r.unpauseContainer)
}

View File

@@ -0,0 +1,38 @@
package docker_test
import (
"reflect"
"testing"
"time"
"github.com/weaveworks/scope/probe/controls"
"github.com/weaveworks/scope/probe/docker"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/xfer"
)
func TestControls(t *testing.T) {
mdc := newMockClient()
setupStubs(mdc, func() {
registry, _ := docker.NewRegistry(10 * time.Second)
defer registry.Stop()
for _, tc := range []struct{ command, result string }{
{docker.StopContainer, "stopped"},
{docker.StartContainer, "started"},
{docker.RestartContainer, "restarted"},
{docker.PauseContainer, "paused"},
{docker.UnpauseContainer, "unpaused"},
} {
result := controls.HandleControlRequest(xfer.Request{
Control: tc.command,
NodeID: report.MakeContainerNodeID("", "a1b2c3d4e5"),
})
if !reflect.DeepEqual(result, xfer.Response{
Error: tc.result,
}) {
t.Error(result)
}
}
})
}

View File

@@ -10,9 +10,13 @@ import (
// Consts exported for testing.
const (
StartEvent = "start"
DieEvent = "die"
endpoint = "unix:///var/run/docker.sock"
CreateEvent = "create"
DestroyEvent = "destroy"
StartEvent = "start"
DieEvent = "die"
PauseEvent = "pause"
UnpauseEvent = "unpause"
endpoint = "unix:///var/run/docker.sock"
)
// Vars exported for testing.
@@ -47,6 +51,11 @@ type Client interface {
ListImages(docker_client.ListImagesOptions) ([]docker_client.APIImages, error)
AddEventListener(chan<- *docker_client.APIEvents) error
RemoveEventListener(chan *docker_client.APIEvents) error
StopContainer(string, uint) error
StartContainer(string, *docker_client.HostConfig) error
RestartContainer(string, uint) error
PauseContainer(string) error
UnpauseContainer(string) error
}
func newDockerClient(endpoint string) (Client, error) {
@@ -70,6 +79,7 @@ func NewRegistry(interval time.Duration) (Registry, error) {
quit: make(chan chan struct{}),
}
r.registerControls()
go r.loop()
return r, nil
}
@@ -170,9 +180,7 @@ func (r *registry) updateContainers() error {
}
for _, apiContainer := range apiContainers {
if err := r.addContainer(apiContainer.ID); err != nil {
return err
}
r.updateContainerState(apiContainer.ID)
}
return nil
@@ -197,56 +205,54 @@ func (r *registry) updateImages() error {
func (r *registry) handleEvent(event *docker_client.APIEvents) {
switch event.Status {
case DieEvent:
containerID := event.ID
r.removeContainer(containerID)
case StartEvent:
containerID := event.ID
if err := r.addContainer(containerID); err != nil {
log.Printf("docker registry: %s", err)
}
case CreateEvent, StartEvent, DieEvent, DestroyEvent, PauseEvent, UnpauseEvent:
r.updateContainerState(event.ID)
}
}
func (r *registry) addContainer(containerID string) error {
func (r *registry) updateContainerState(containerID string) {
r.Lock()
defer r.Unlock()
dockerContainer, err := r.client.InspectContainer(containerID)
if err != nil {
// Don't spam the logs if the container was short lived
if _, ok := err.(*docker_client.NoSuchContainer); ok {
return nil
if _, ok := err.(*docker_client.NoSuchContainer); !ok {
log.Printf("Error processing event for container %s: %v", containerID, err)
return
}
return err
}
if !dockerContainer.State.Running {
// We get events late, and the containers sometimes have already
// stopped. Not an error, so don't return it.
return nil
}
// Container doesn't exist anymore, so lets stop and remove it
container, ok := r.containers[containerID]
if !ok {
return
}
r.Lock()
defer r.Unlock()
c := NewContainerStub(dockerContainer)
r.containers[containerID] = c
r.containersByPID[dockerContainer.State.Pid] = c
return c.StartGatheringStats()
}
func (r *registry) removeContainer(containerID string) {
r.Lock()
defer r.Unlock()
container, ok := r.containers[containerID]
if !ok {
delete(r.containers, containerID)
delete(r.containersByPID, container.PID())
container.StopGatheringStats()
return
}
delete(r.containers, containerID)
delete(r.containersByPID, container.PID())
container.StopGatheringStats()
// Container exists, ensure we have it
c, ok := r.containers[containerID]
if !ok {
c = NewContainerStub(dockerContainer)
r.containers[containerID] = c
r.containersByPID[dockerContainer.State.Pid] = c
} else {
c.UpdateState(dockerContainer)
}
// And finally, ensure we gather stats for it
if dockerContainer.State.Running {
if err := c.StartGatheringStats(); err != nil {
log.Printf("Error gather stats for container: %s", containerID)
return
}
} else {
c.StopGatheringStats()
}
}
// LockedPIDLookup runs f under a read lock, and gives f a function for
@@ -272,6 +278,13 @@ func (r *registry) WalkContainers(f func(Container)) {
}
}
func (r *registry) getContainer(id string) (Container, bool) {
r.RLock()
defer r.RUnlock()
c, ok := r.containers[id]
return c, ok
}
// WalkImages runs f on every image of running containers the registry
// knows of. f may be run on the same image more than once.
func (r *registry) WalkImages(f func(*docker_client.APIImages)) {

View File

@@ -1,6 +1,7 @@
package docker_test
import (
"fmt"
"net"
"runtime"
"sort"
@@ -19,6 +20,8 @@ type mockContainer struct {
c *client.Container
}
func (c *mockContainer) UpdateState(_ *client.Container) {}
func (c *mockContainer) ID() string {
return c.c.ID
}
@@ -66,7 +69,11 @@ func (m *mockDockerClient) ListContainers(client.ListContainersOptions) ([]clien
func (m *mockDockerClient) InspectContainer(id string) (*client.Container, error) {
m.RLock()
defer m.RUnlock()
return m.containers[id], nil
c, ok := m.containers[id]
if !ok {
return nil, &client.NoSuchContainer{}
}
return c, nil
}
func (m *mockDockerClient) ListImages(client.ListImagesOptions) ([]client.APIImages, error) {
@@ -93,6 +100,26 @@ func (m *mockDockerClient) RemoveEventListener(events chan *client.APIEvents) er
return nil
}
func (m *mockDockerClient) StartContainer(_ string, _ *client.HostConfig) error {
return fmt.Errorf("started")
}
func (m *mockDockerClient) StopContainer(_ string, _ uint) error {
return fmt.Errorf("stopped")
}
func (m *mockDockerClient) RestartContainer(_ string, _ uint) error {
return fmt.Errorf("restarted")
}
func (m *mockDockerClient) PauseContainer(_ string) error {
return fmt.Errorf("paused")
}
func (m *mockDockerClient) UnpauseContainer(_ string) error {
return fmt.Errorf("unpaused")
}
func (m *mockDockerClient) send(event *client.APIEvents) {
m.RLock()
defer m.RUnlock()
@@ -259,7 +286,7 @@ func TestRegistryEvents(t *testing.T) {
mdc.apiContainers = []client.APIContainers{apiContainer1}
delete(mdc.containers, "wiff")
mdc.Unlock()
mdc.send(&client.APIEvents{Status: docker.DieEvent, ID: "wiff"})
mdc.send(&client.APIEvents{Status: docker.DestroyEvent, ID: "wiff"})
runtime.Gosched()
want := []docker.Container{&mockContainer{container1}}

View File

@@ -43,6 +43,31 @@ func (r *Reporter) Report() (report.Report, error) {
func (r *Reporter) containerTopology(localAddrs []net.IP) report.Topology {
result := report.MakeTopology()
result.Controls.AddControl(report.Control{
ID: StopContainer,
Human: "Stop",
Icon: "fa-stop",
})
result.Controls.AddControl(report.Control{
ID: StartContainer,
Human: "Start",
Icon: "fa-play",
})
result.Controls.AddControl(report.Control{
ID: RestartContainer,
Human: "Restart",
Icon: "fa-repeat",
})
result.Controls.AddControl(report.Control{
ID: PauseContainer,
Human: "Pause",
Icon: "fa-pause",
})
result.Controls.AddControl(report.Control{
ID: UnpauseContainer,
Human: "Unpause",
Icon: "fa-play",
})
r.registry.WalkContainers(func(c Container) {
nodeID := report.MakeContainerNodeID(r.hostID, c.ID())

View File

@@ -57,6 +57,33 @@ func TestReporter(t *testing.T) {
docker.ImageID: "baz",
}),
},
Controls: report.Controls{
docker.RestartContainer: report.Control{
ID: docker.RestartContainer,
Human: "Restart",
Icon: "fa-repeat",
},
docker.StartContainer: report.Control{
ID: docker.StartContainer,
Human: "Start",
Icon: "fa-play",
},
docker.StopContainer: report.Control{
ID: docker.StopContainer,
Human: "Stop",
Icon: "fa-stop",
},
docker.PauseContainer: report.Control{
ID: docker.PauseContainer,
Human: "Pause",
Icon: "fa-pause",
},
docker.UnpauseContainer: report.Control{
ID: docker.UnpauseContainer,
Human: "Unpause",
Icon: "fa-play",
},
},
}
want.ContainerImage = report.Topology{
Nodes: report.Nodes{
@@ -65,6 +92,7 @@ func TestReporter(t *testing.T) {
docker.ImageName: "bang",
}),
},
Controls: report.Controls{},
}
reporter := docker.NewReporter(mockRegistryInstance, "")

View File

@@ -7,17 +7,26 @@ import (
// Tagger tags each node in each topology of a report with the origin host
// node ID of this (probe) host. Effectively, a foreign key linking every node
// in every topology to an origin host node in the host topology.
type Tagger struct{ hostNodeID string }
type Tagger struct {
hostNodeID string
probeID string
}
// NewTagger tags each node with a foreign key linking it to its origin host
// in the host topology.
func NewTagger(hostID string) Tagger {
return Tagger{hostNodeID: report.MakeHostNodeID(hostID)}
func NewTagger(hostID, probeID string) Tagger {
return Tagger{
hostNodeID: report.MakeHostNodeID(hostID),
probeID: probeID,
}
}
// Tag implements Tagger.
func (t Tagger) Tag(r report.Report) (report.Report, error) {
other := report.MakeNodeWith(map[string]string{report.HostNodeID: t.hostNodeID})
other := report.MakeNodeWith(map[string]string{
report.HostNodeID: t.hostNodeID,
report.ProbeID: t.probeID,
})
// Explicity don't tag Endpoints and Addresses - These topologies include pseudo nodes,
// and as such do their own host tagging

View File

@@ -12,6 +12,7 @@ import (
func TestTagger(t *testing.T) {
var (
hostID = "foo"
probeID = "a1b2c3d4"
endpointNodeID = report.MakeEndpointNodeID(hostID, "1.2.3.4", "56789") // hostID ignored
nodeMetadata = report.MakeNodeWith(map[string]string{"foo": "bar"})
)
@@ -20,8 +21,9 @@ func TestTagger(t *testing.T) {
r.Process.AddNode(endpointNodeID, nodeMetadata)
want := nodeMetadata.Merge(report.MakeNodeWith(map[string]string{
report.HostNodeID: report.MakeHostNodeID(hostID),
report.ProbeID: probeID,
}))
rpt, _ := host.NewTagger(hostID).Tag(r)
rpt, _ := host.NewTagger(hostID, probeID).Tag(r)
have := rpt.Process.Nodes[endpointNodeID].Copy()
if !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))

View File

@@ -111,36 +111,27 @@ func TestReporter(t *testing.T) {
want := report.MakeReport()
pod1ID := report.MakePodNodeID("ping", "pong-a")
pod2ID := report.MakePodNodeID("ping", "pong-b")
want.Pod = report.Topology{
Nodes: report.Nodes{
pod1ID: report.MakeNodeWith(map[string]string{
kubernetes.PodID: "ping/pong-a",
kubernetes.PodName: "pong-a",
kubernetes.Namespace: "ping",
kubernetes.PodCreated: pod1.Created(),
kubernetes.PodContainerIDs: "container1 container2",
kubernetes.ServiceIDs: "ping/pongservice",
}),
pod2ID: report.MakeNodeWith(map[string]string{
kubernetes.PodID: "ping/pong-b",
kubernetes.PodName: "pong-b",
kubernetes.Namespace: "ping",
kubernetes.PodCreated: pod1.Created(),
kubernetes.PodContainerIDs: "container3 container4",
kubernetes.ServiceIDs: "ping/pongservice",
}),
},
}
want.Service = report.Topology{
Nodes: report.Nodes{
report.MakeServiceNodeID("ping", "pongservice"): report.MakeNodeWith(map[string]string{
kubernetes.ServiceID: "ping/pongservice",
kubernetes.ServiceName: "pongservice",
kubernetes.Namespace: "ping",
kubernetes.ServiceCreated: pod1.Created(),
}),
},
}
want.Pod = report.MakeTopology().AddNode(pod1ID, report.MakeNodeWith(map[string]string{
kubernetes.PodID: "ping/pong-a",
kubernetes.PodName: "pong-a",
kubernetes.Namespace: "ping",
kubernetes.PodCreated: pod1.Created(),
kubernetes.PodContainerIDs: "container1 container2",
kubernetes.ServiceIDs: "ping/pongservice",
})).AddNode(pod2ID, report.MakeNodeWith(map[string]string{
kubernetes.PodID: "ping/pong-b",
kubernetes.PodName: "pong-b",
kubernetes.Namespace: "ping",
kubernetes.PodCreated: pod1.Created(),
kubernetes.PodContainerIDs: "container3 container4",
kubernetes.ServiceIDs: "ping/pongservice",
}))
want.Service = report.MakeTopology().AddNode(report.MakeServiceNodeID("ping", "pongservice"), report.MakeNodeWith(map[string]string{
kubernetes.ServiceID: "ping/pongservice",
kubernetes.ServiceName: "pongservice",
kubernetes.Namespace: "ping",
kubernetes.ServiceCreated: pod1.Created(),
}))
reporter := kubernetes.NewReporter(mockClientInstance)
have, _ := reporter.Report()

View File

@@ -4,16 +4,19 @@ import (
"flag"
"fmt"
"log"
"math/rand"
"net"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"strconv"
"strings"
"sync"
"syscall"
"time"
"github.com/weaveworks/scope/probe/controls"
"github.com/weaveworks/scope/probe/docker"
"github.com/weaveworks/scope/probe/endpoint"
"github.com/weaveworks/scope/probe/host"
@@ -66,10 +69,11 @@ func main() {
log.Printf("warning: -process=true, but that requires root to find everything")
}
rand.Seed(time.Now().UnixNano())
probeID := strconv.FormatInt(rand.Int63(), 16)
var (
hostName = hostname()
hostID = hostName // TODO(pb): we should sanitize the hostname
probeID = hostName // TODO(pb): does this need to be a random string instead?
)
log.Printf("probe starting, version %s, ID %s", version, probeID)
@@ -101,7 +105,14 @@ func main() {
publishers := xfer.NewMultiPublisher(factory)
defer publishers.Stop()
resolver := newStaticResolver(targets, publishers.Set)
clients := xfer.NewMultiAppClient(xfer.ProbeConfig{
Token: *token,
ProbeID: probeID,
Insecure: *insecure,
}, xfer.ControlHandlerFunc(controls.HandleControlRequest), xfer.NewAppClient)
defer clients.Stop()
resolver := newStaticResolver(targets, publishers.Set, clients.Set)
defer resolver.Stop()
endpointReporter := endpoint.NewReporter(hostID, hostName, *spyProcs, *useConntrack)
@@ -112,7 +123,7 @@ func main() {
var (
tickers = []Ticker{processCache}
reporters = []Reporter{endpointReporter, host.NewReporter(hostID, hostName, localNets), process.NewReporter(processCache, hostID)}
taggers = []Tagger{newTopologyTagger(), host.NewTagger(hostID)}
taggers = []Tagger{newTopologyTagger(), host.NewTagger(hostID, probeID)}
)
dockerTagger, dockerReporter, dockerRegistry := func() (*docker.Tagger, *docker.Reporter, docker.Registry) {

View File

@@ -33,14 +33,13 @@ func TestWeaveTaggerOverlayTopology(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if want, have := (report.Topology{
Nodes: report.Nodes{
report.MakeOverlayNodeID(mockWeavePeerName): report.MakeNodeWith(map[string]string{
overlay.WeavePeerName: mockWeavePeerName,
overlay.WeavePeerNickName: mockWeavePeerNickName,
}),
},
}), have.Overlay; !reflect.DeepEqual(want, have) {
if want, have := report.MakeTopology().AddNode(
report.MakeOverlayNodeID(mockWeavePeerName),
report.MakeNodeWith(map[string]string{
overlay.WeavePeerName: mockWeavePeerName,
overlay.WeavePeerNickName: mockWeavePeerNickName,
}),
), have.Overlay; !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
}
}
@@ -48,27 +47,19 @@ func TestWeaveTaggerOverlayTopology(t *testing.T) {
{
nodeID := report.MakeContainerNodeID(mockHostID, mockContainerID)
want := report.Report{
Container: report.Topology{
Nodes: report.Nodes{
nodeID: report.MakeNodeWith(map[string]string{
docker.ContainerID: mockContainerID,
overlay.WeaveDNSHostname: mockHostname,
overlay.WeaveMACAddress: mockContainerMAC,
}).WithSets(report.Sets{
docker.ContainerIPs: report.MakeStringSet(mockContainerIP),
docker.ContainerIPsWithScopes: report.MakeStringSet(mockContainerIPWithScope),
}),
},
},
Container: report.MakeTopology().AddNode(nodeID, report.MakeNodeWith(map[string]string{
docker.ContainerID: mockContainerID,
overlay.WeaveDNSHostname: mockHostname,
overlay.WeaveMACAddress: mockContainerMAC,
}).WithSets(report.Sets{
docker.ContainerIPs: report.MakeStringSet(mockContainerIP),
docker.ContainerIPsWithScopes: report.MakeStringSet(mockContainerIPWithScope),
})),
}
have, err := w.Tag(report.Report{
Container: report.Topology{
Nodes: report.Nodes{
nodeID: report.MakeNodeWith(map[string]string{
docker.ContainerID: mockContainerID,
}),
},
},
Container: report.MakeTopology().AddNode(nodeID, report.MakeNodeWith(map[string]string{
docker.ContainerID: mockContainerID,
})),
})
if err != nil {
t.Fatal(err)

View File

@@ -33,40 +33,42 @@ func TestReporter(t *testing.T) {
reporter := process.NewReporter(walker, "")
want := report.MakeReport()
want.Process = report.Topology{
Nodes: report.Nodes{
report.MakeProcessNodeID("", "1"): report.MakeNodeWith(map[string]string{
process.PID: "1",
process.Comm: "init",
process.Threads: "0",
}),
report.MakeProcessNodeID("", "2"): report.MakeNodeWith(map[string]string{
process.PID: "2",
process.Comm: "bash",
process.PPID: "1",
process.Threads: "0",
}),
report.MakeProcessNodeID("", "3"): report.MakeNodeWith(map[string]string{
process.PID: "3",
process.Comm: "apache",
process.PPID: "1",
process.Threads: "2",
}),
report.MakeProcessNodeID("", "4"): report.MakeNodeWith(map[string]string{
process.PID: "4",
process.Comm: "ping",
process.PPID: "2",
process.Cmdline: "ping foo.bar.local",
process.Threads: "0",
}),
report.MakeProcessNodeID("", "5"): report.MakeNodeWith(map[string]string{
process.PID: "5",
process.PPID: "1",
process.Cmdline: "tail -f /var/log/syslog",
process.Threads: "0",
}),
},
}
want.Process = report.MakeTopology().AddNode(
report.MakeProcessNodeID("", "1"), report.MakeNodeWith(map[string]string{
process.PID: "1",
process.Comm: "init",
process.Threads: "0",
}),
).AddNode(
report.MakeProcessNodeID("", "2"), report.MakeNodeWith(map[string]string{
process.PID: "2",
process.Comm: "bash",
process.PPID: "1",
process.Threads: "0",
}),
).AddNode(
report.MakeProcessNodeID("", "3"), report.MakeNodeWith(map[string]string{
process.PID: "3",
process.Comm: "apache",
process.PPID: "1",
process.Threads: "2",
}),
).AddNode(
report.MakeProcessNodeID("", "4"), report.MakeNodeWith(map[string]string{
process.PID: "4",
process.Comm: "ping",
process.PPID: "2",
process.Cmdline: "ping foo.bar.local",
process.Threads: "0",
}),
).AddNode(
report.MakeProcessNodeID("", "5"), report.MakeNodeWith(map[string]string{
process.PID: "5",
process.PPID: "1",
process.Cmdline: "tail -f /var/log/syslog",
process.Threads: "0",
}),
)
have, err := reporter.Report()
if err != nil || !reflect.DeepEqual(want, have) {

View File

@@ -19,8 +19,10 @@ var (
lookupIP = net.LookupIP
)
type setter func(string, []string)
type staticResolver struct {
set func(string, []string)
setters []setter
targets []target
quit chan struct{}
}
@@ -32,10 +34,10 @@ func (t target) String() string { return net.JoinHostPort(t.host, t.port) }
// newStaticResolver periodically resolves the targets, and calls the set
// function with all the resolved IPs. It explictiy supports targets which
// resolve to multiple IPs.
func newStaticResolver(targets []string, set func(target string, endpoints []string)) staticResolver {
func newStaticResolver(targets []string, setters ...setter) staticResolver {
r := staticResolver{
targets: prepare(targets),
set: set,
setters: setters,
quit: make(chan struct{}),
}
go r.loop()
@@ -80,7 +82,9 @@ func prepare(strs []string) []target {
func (r staticResolver) resolve() {
for t, endpoints := range resolveMany(r.targets) {
r.set(t.String(), endpoints)
for _, setter := range r.setters {
setter(t.String(), endpoints)
}
}
}

View File

@@ -5,6 +5,7 @@ import (
"testing"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/test"
)
func TestApply(t *testing.T) {
@@ -41,6 +42,7 @@ func TestTagMissingID(t *testing.T) {
rpt, _ := newTopologyTagger().Tag(r)
have := rpt.Endpoint.Nodes[nodeID].Copy()
if !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
t.Error("TopologyTagger erroneously tagged a missing node ID")
}
}

View File

@@ -24,11 +24,12 @@ const (
// DetailedNode is the data type that's yielded to the JavaScript layer when
// we want deep information about an individual node.
type DetailedNode struct {
ID string `json:"id"`
LabelMajor string `json:"label_major"`
LabelMinor string `json:"label_minor,omitempty"`
Pseudo bool `json:"pseudo,omitempty"`
Tables []Table `json:"tables"`
ID string `json:"id"`
LabelMajor string `json:"label_major"`
LabelMinor string `json:"label_minor,omitempty"`
Pseudo bool `json:"pseudo,omitempty"`
Tables []Table `json:"tables"`
Controls []ControlInstance `json:"controls"`
}
// Table is a dataset associated with a node. It will be displayed in the
@@ -48,6 +49,14 @@ type Row struct {
Expandable bool `json:"expandable,omitempty"` // Whether it can be expanded (hidden by default)
}
// ControlInstance contains a control description, and all the info
// needed to execute it.
type ControlInstance struct {
ProbeID string `json:"probeId"`
NodeID string `json:"nodeId"`
report.Control
}
type sortableRows []Row
func (r sortableRows) Len() int { return len(r) }
@@ -107,6 +116,7 @@ func MakeDetailedNode(r report.Report, n RenderableNode) DetailedNode {
LabelMinor: n.LabelMinor,
Pseudo: n.Pseudo,
Tables: tables,
Controls: controls(r, n),
}
}
@@ -190,22 +200,55 @@ func connectionsTable(connections []Row, r report.Report, n RenderableNode) (Tab
return Table{}, false
}
func controlsFor(topology report.Topology, nodeID string) []ControlInstance {
result := []ControlInstance{}
node, ok := topology.Nodes[nodeID]
if !ok {
return result
}
for _, id := range node.Controls {
if control, ok := topology.Controls[id]; ok {
result = append(result, ControlInstance{
ProbeID: node.Metadata[report.ProbeID],
NodeID: nodeID,
Control: control,
})
}
}
return result
}
func controls(r report.Report, n RenderableNode) []ControlInstance {
if _, ok := r.Process.Nodes[n.ControlNode]; ok {
return controlsFor(r.Process, n.ControlNode)
} else if _, ok := r.Container.Nodes[n.ControlNode]; ok {
return controlsFor(r.Container, n.ControlNode)
} else if _, ok := r.ContainerImage.Nodes[n.ControlNode]; ok {
return controlsFor(r.ContainerImage, n.ControlNode)
} else if _, ok := r.Host.Nodes[n.ControlNode]; ok {
return controlsFor(r.Host, n.ControlNode)
}
return []ControlInstance{}
}
// OriginTable produces a table (to be consumed directly by the UI) based on
// an origin ID, which is (optimistically) a node ID in one of our topologies.
func OriginTable(r report.Report, originID string, addHostTags bool, addContainerTags bool) (Table, bool) {
result, show := Table{}, false
if nmd, ok := r.Process.Nodes[originID]; ok {
return processOriginTable(nmd, addHostTags, addContainerTags)
result, show = processOriginTable(nmd, addHostTags, addContainerTags)
}
if nmd, ok := r.Container.Nodes[originID]; ok {
return containerOriginTable(nmd, addHostTags)
result, show = containerOriginTable(nmd, addHostTags)
}
if nmd, ok := r.ContainerImage.Nodes[originID]; ok {
return containerImageOriginTable(nmd)
result, show = containerImageOriginTable(nmd)
}
if nmd, ok := r.Host.Nodes[originID]; ok {
return hostOriginTable(nmd)
result, show = hostOriginTable(nmd)
}
return Table{}, false
return result, show
}
func connectionDetailsRows(topology report.Topology, originID string) []Row {
@@ -303,6 +346,7 @@ func processOriginTable(nmd report.Node, addHostTag bool, addContainerTag bool)
func containerOriginTable(nmd report.Node, addHostTag bool) (Table, bool) {
rows := []Row{}
for _, tuple := range []struct{ key, human string }{
{docker.ContainerState, "State"},
{docker.ContainerID, "ID"},
{docker.ImageID, "Image ID"},
{docker.ContainerPorts, "Ports"},

View File

@@ -14,12 +14,13 @@ func TestOriginTable(t *testing.T) {
if _, ok := render.OriginTable(fixture.Report, "not-found", false, false); ok {
t.Errorf("unknown origin ID gave unexpected success")
}
for originID, want := range map[string]render.Table{fixture.ServerProcessNodeID: {
Title: fmt.Sprintf(`Process "apache" (%s)`, fixture.ServerPID),
Numeric: false,
Rank: 2,
Rows: []render.Row{},
},
for originID, want := range map[string]render.Table{
fixture.ServerProcessNodeID: {
Title: fmt.Sprintf(`Process "apache" (%s)`, fixture.ServerPID),
Numeric: false,
Rank: 2,
Rows: []render.Row{},
},
fixture.ServerHostNodeID: {
Title: fmt.Sprintf("Host %q", fixture.ServerHostName),
Numeric: false,
@@ -75,7 +76,6 @@ func TestOriginTable(t *testing.T) {
t.Errorf("%q: %s", originID, test.Diff(want, have))
}
}
}
func TestMakeDetailedHostNode(t *testing.T) {
@@ -86,6 +86,7 @@ func TestMakeDetailedHostNode(t *testing.T) {
LabelMajor: "client",
LabelMinor: "hostname.com",
Pseudo: false,
Controls: []render.ControlInstance{},
Tables: []render.Table{
{
Title: fmt.Sprintf("Host %q", fixture.ClientHostName),
@@ -143,6 +144,7 @@ func TestMakeDetailedContainerNode(t *testing.T) {
LabelMajor: "server",
LabelMinor: fixture.ServerHostName,
Pseudo: false,
Controls: []render.ControlInstance{},
Tables: []render.Table{
{
Title: `Container Image "image/server"`,

View File

@@ -213,6 +213,7 @@ var (
EgressPacketCount: newu64(30),
EgressByteCount: newu64(300),
},
ControlNode: fixture.ClientContainerNodeID,
},
fixture.ServerContainerID: {
ID: fixture.ServerContainerID,
@@ -232,6 +233,7 @@ var (
IngressPacketCount: newu64(210),
IngressByteCount: newu64(2100),
},
ControlNode: fixture.ServerContainerNodeID,
},
uncontainedServerID: {
ID: uncontainedServerID,

190
render/filters.go Normal file
View File

@@ -0,0 +1,190 @@
package render
import (
"strings"
"github.com/weaveworks/scope/probe/docker"
"github.com/weaveworks/scope/probe/kubernetes"
"github.com/weaveworks/scope/report"
)
// CustomRenderer allow for mapping functions that recived the entire topology
// in one call - useful for functions that need to consider the entire graph.
// We should minimise the use of this renderer type, as it is very inflexible.
type CustomRenderer struct {
RenderFunc func(RenderableNodes) RenderableNodes
Renderer
}
// Render implements Renderer
func (c CustomRenderer) Render(rpt report.Report) RenderableNodes {
return c.RenderFunc(c.Renderer.Render(rpt))
}
// ColorConnected colors nodes with the IsConnected key if
// they have edges to or from them.
func ColorConnected(r Renderer) Renderer {
return CustomRenderer{
Renderer: r,
RenderFunc: func(input RenderableNodes) RenderableNodes {
connected := map[string]struct{}{}
void := struct{}{}
for id, node := range input {
if len(node.Adjacency) == 0 {
continue
}
connected[id] = void
for _, id := range node.Adjacency {
connected[id] = void
}
}
for id := range connected {
node := input[id]
node.Metadata[IsConnected] = "true"
input[id] = node
}
return input
},
}
}
// Filter removes nodes from a view based on a predicate.
type Filter struct {
Renderer
FilterFunc func(RenderableNode) bool
}
// Render implements Renderer
func (f Filter) Render(rpt report.Report) RenderableNodes {
nodes, _ := f.render(rpt)
return nodes
}
func (f Filter) render(rpt report.Report) (RenderableNodes, int) {
output := RenderableNodes{}
inDegrees := map[string]int{}
filtered := 0
for id, node := range f.Renderer.Render(rpt) {
if f.FilterFunc(node) {
output[id] = node
inDegrees[id] = 0
} else {
filtered++
}
}
// Deleted nodes also need to be cut as destinations in adjacency lists.
for id, node := range output {
newAdjacency := make(report.IDList, 0, len(node.Adjacency))
for _, dstID := range node.Adjacency {
if _, ok := output[dstID]; ok {
newAdjacency = newAdjacency.Add(dstID)
inDegrees[dstID]++
}
}
node.Adjacency = newAdjacency
output[id] = node
}
// Remove unconnected pseudo nodes, see #483.
for id, inDegree := range inDegrees {
if inDegree > 0 {
continue
}
node := output[id]
if !node.Pseudo || len(node.Adjacency) > 0 {
continue
}
delete(output, id)
filtered++
}
return output, filtered
}
// Stats implements Renderer
func (f Filter) Stats(rpt report.Report) Stats {
_, filtered := f.render(rpt)
var upstream = f.Renderer.Stats(rpt)
upstream.FilteredNodes += filtered
return upstream
}
// IsConnected is the key added to Node.Metadata by ColorConnected
// to indicate a node has an edge pointing to it or from it
const IsConnected = "is_connected"
// FilterUnconnected produces a renderer that filters unconnected nodes
// from the given renderer
func FilterUnconnected(r Renderer) Renderer {
return Filter{
Renderer: ColorConnected(r),
FilterFunc: func(node RenderableNode) bool {
_, ok := node.Metadata[IsConnected]
return ok
},
}
}
// FilterNoop does nothing.
func FilterNoop(in Renderer) Renderer {
return in
}
// FilterStopped filters out stopped containers.
func FilterStopped(r Renderer) Renderer {
return Filter{
Renderer: r,
FilterFunc: func(node RenderableNode) bool {
containerState := node.Metadata[docker.ContainerState]
return containerState != docker.StateStopped
},
}
}
// FilterSystem is a Renderer which filters out system nodes.
func FilterSystem(r Renderer) Renderer {
return Filter{
Renderer: r,
FilterFunc: func(node RenderableNode) bool {
containerName := node.Metadata[docker.ContainerName]
if _, ok := systemContainerNames[containerName]; ok {
return false
}
imagePrefix := strings.SplitN(node.Metadata[docker.ImageName], ":", 2)[0] // :(
if _, ok := systemImagePrefixes[imagePrefix]; ok {
return false
}
if node.Metadata[docker.LabelPrefix+"works.weave.role"] == "system" {
return false
}
if node.Metadata[kubernetes.Namespace] == "kube-system" {
return false
}
if strings.HasPrefix(node.Metadata[docker.LabelPrefix+"io.kubernetes.pod.name"], "kube-system/") {
return false
}
return true
},
}
}
var systemContainerNames = map[string]struct{}{
"weavescope": {},
"weavedns": {},
"weave": {},
"weaveproxy": {},
"weaveexec": {},
"ecs-agent": {},
}
var systemImagePrefixes = map[string]struct{}{
"weaveworks/scope": {},
"weaveworks/weavedns": {},
"weaveworks/weave": {},
"weaveworks/weaveproxy": {},
"weaveworks/weaveexec": {},
"amazon/amazon-ecs-agent": {},
}

View File

@@ -134,6 +134,7 @@ func MapContainerIdentity(m RenderableNode, _ report.Networks) RenderableNodes {
)
node := NewRenderableNodeWith(id, major, minor, rank, m)
node.ControlNode = m.ID
if imageID, ok := m.Metadata[docker.ImageID]; ok {
hostID, _, _ := report.ParseContainerNodeID(m.ID)
node.Origins = node.Origins.Add(report.MakeContainerNodeID(hostID, imageID))

View File

@@ -1,10 +1,6 @@
package render
import (
"strings"
"github.com/weaveworks/scope/probe/docker"
"github.com/weaveworks/scope/probe/kubernetes"
"github.com/weaveworks/scope/report"
)
@@ -155,168 +151,3 @@ func (m Map) EdgeMetadata(rpt report.Report, srcRenderableID, dstRenderableID st
}
return output
}
// CustomRenderer allow for mapping functions that recived the entire topology
// in one call - useful for functions that need to consider the entire graph.
// We should minimise the use of this renderer type, as it is very inflexible.
type CustomRenderer struct {
RenderFunc func(RenderableNodes) RenderableNodes
Renderer
}
// Render implements Renderer
func (c CustomRenderer) Render(rpt report.Report) RenderableNodes {
return c.RenderFunc(c.Renderer.Render(rpt))
}
// ColorConnected colors nodes with the IsConnected key if
// they have edges to or from them.
func ColorConnected(r Renderer) Renderer {
return CustomRenderer{
Renderer: r,
RenderFunc: func(input RenderableNodes) RenderableNodes {
connected := map[string]struct{}{}
void := struct{}{}
for id, node := range input {
if len(node.Adjacency) == 0 {
continue
}
connected[id] = void
for _, id := range node.Adjacency {
connected[id] = void
}
}
for id := range connected {
node := input[id]
node.Metadata[IsConnected] = "true"
input[id] = node
}
return input
},
}
}
// Filter removes nodes from a view based on a predicate.
type Filter struct {
Renderer
FilterFunc func(RenderableNode) bool
}
// Render implements Renderer
func (f Filter) Render(rpt report.Report) RenderableNodes {
nodes, _ := f.render(rpt)
return nodes
}
func (f Filter) render(rpt report.Report) (RenderableNodes, int) {
output := RenderableNodes{}
inDegrees := map[string]int{}
filtered := 0
for id, node := range f.Renderer.Render(rpt) {
if f.FilterFunc(node) {
output[id] = node
inDegrees[id] = 0
} else {
filtered++
}
}
// Deleted nodes also need to be cut as destinations in adjacency lists.
for id, node := range output {
newAdjacency := make(report.IDList, 0, len(node.Adjacency))
for _, dstID := range node.Adjacency {
if _, ok := output[dstID]; ok {
newAdjacency = newAdjacency.Add(dstID)
inDegrees[dstID]++
}
}
node.Adjacency = newAdjacency
output[id] = node
}
// Remove unconnected pseudo nodes, see #483.
for id, inDegree := range inDegrees {
if inDegree > 0 {
continue
}
node := output[id]
if !node.Pseudo || len(node.Adjacency) > 0 {
continue
}
delete(output, id)
filtered++
}
return output, filtered
}
// Stats implements Renderer
func (f Filter) Stats(rpt report.Report) Stats {
_, filtered := f.render(rpt)
var upstream = f.Renderer.Stats(rpt)
upstream.FilteredNodes += filtered
return upstream
}
// IsConnected is the key added to Node.Metadata by ColorConnected
// to indicate a node has an edge pointing to it or from it
const IsConnected = "is_connected"
// FilterUnconnected produces a renderer that filters unconnected nodes
// from the given renderer
func FilterUnconnected(r Renderer) Renderer {
return Filter{
Renderer: ColorConnected(r),
FilterFunc: func(node RenderableNode) bool {
_, ok := node.Metadata[IsConnected]
return ok
},
}
}
// FilterSystem is a Renderer which filters out system nodes.
func FilterSystem(r Renderer) Renderer {
return Filter{
Renderer: r,
FilterFunc: func(node RenderableNode) bool {
containerName := node.Metadata[docker.ContainerName]
if _, ok := systemContainerNames[containerName]; ok {
return false
}
imagePrefix := strings.SplitN(node.Metadata[docker.ImageName], ":", 2)[0] // :(
if _, ok := systemImagePrefixes[imagePrefix]; ok {
return false
}
if node.Metadata[docker.LabelPrefix+"works.weave.role"] == "system" {
return false
}
if node.Metadata[kubernetes.Namespace] == "kube-system" {
return false
}
if strings.HasPrefix(node.Metadata[docker.LabelPrefix+"io.kubernetes.pod.name"], "kube-system/") {
return false
}
return true
},
}
}
var systemContainerNames = map[string]struct{}{
"weavescope": {},
"weavedns": {},
"weave": {},
"weaveproxy": {},
"weaveexec": {},
"ecs-agent": {},
}
var systemImagePrefixes = map[string]struct{}{
"weaveworks/scope": {},
"weaveworks/weavedns": {},
"weaveworks/weave": {},
"weaveworks/weaveproxy": {},
"weaveworks/weaveexec": {},
"amazon/amazon-ecs-agent": {},
}

View File

@@ -8,12 +8,13 @@ import (
// an element of a topology. It should contain information that's relevant
// to rendering a node when there are many nodes visible at once.
type RenderableNode struct {
ID string `json:"id"` //
LabelMajor string `json:"label_major"` // e.g. "process", human-readable
LabelMinor string `json:"label_minor,omitempty"` // e.g. "hostname", human-readable, optional
Rank string `json:"rank"` // to help the layout engine
Pseudo bool `json:"pseudo,omitempty"` // sort-of a placeholder node, for rendering purposes
Origins report.IDList `json:"origins,omitempty"` // Core node IDs that contributed information
ID string `json:"id"` //
LabelMajor string `json:"label_major"` // e.g. "process", human-readable
LabelMinor string `json:"label_minor,omitempty"` // e.g. "hostname", human-readable, optional
Rank string `json:"rank"` // to help the layout engine
Pseudo bool `json:"pseudo,omitempty"` // sort-of a placeholder node, for rendering purposes
Origins report.IDList `json:"origins,omitempty"` // Core node IDs that contributed information
ControlNode string `json:"-"` // ID of node from which to show the controls in the UI
report.EdgeMetadata `json:"metadata"` // Numeric sums
report.Node
@@ -58,6 +59,7 @@ func NewDerivedNode(id string, node RenderableNode) RenderableNode {
Origins: node.Origins.Copy(),
EdgeMetadata: node.EdgeMetadata.Copy(),
Node: node.Node.Copy(),
ControlNode: "", // Do not propagate ControlNode when making a derived node!
}
}
@@ -97,6 +99,10 @@ func (rn RenderableNode) Merge(other RenderableNode) RenderableNode {
result.Rank = other.Rank
}
if result.ControlNode == "" {
result.ControlNode = other.ControlNode
}
if result.Pseudo != other.Pseudo {
panic(result.ID)
}
@@ -119,6 +125,7 @@ func (rn RenderableNode) Copy() RenderableNode {
Origins: rn.Origins.Copy(),
EdgeMetadata: rn.EdgeMetadata.Copy(),
Node: rn.Node.Copy(),
ControlNode: rn.ControlNode,
}
}

View File

@@ -78,13 +78,14 @@ var (
Origins: report.MakeIDList(randomEndpointNodeID),
},
containerID: {
ID: containerID,
LabelMajor: containerName,
LabelMinor: serverHostID,
Rank: "",
Pseudo: false,
Origins: report.MakeIDList(containerNodeID, serverEndpointNodeID, serverHostNodeID),
Node: report.MakeNode(),
ID: containerID,
LabelMajor: containerName,
LabelMinor: serverHostID,
Rank: "",
Pseudo: false,
Origins: report.MakeIDList(containerNodeID, serverEndpointNodeID, serverHostNodeID),
Node: report.MakeNode(),
ControlNode: containerNodeID,
},
}).Prune()
)

34
report/controls.go Normal file
View File

@@ -0,0 +1,34 @@
package report
// Controls describe the control tags within the Nodes
type Controls map[string]Control
// A Control basically describes an RPC
type Control struct {
ID string `json:"id"`
Human string `json:"human"`
Icon string `json:"icon"` // from https://fortawesome.github.io/Font-Awesome/cheatsheet/ please
}
// Merge merges other with cs, returning a fresh Controls.
func (cs Controls) Merge(other Controls) Controls {
result := cs.Copy()
for k, v := range other {
result[k] = v
}
return result
}
// Copy produces a copy of cs.
func (cs Controls) Copy() Controls {
result := Controls{}
for k, v := range cs {
result[k] = v
}
return result
}
// AddControl returns a fresh Controls, c added to cs.
func (cs Controls) AddControl(c Control) {
cs[c.ID] = c
}

View File

@@ -181,4 +181,6 @@ const (
// a node in the host topology. That host node is the origin host, where
// the node was originally detected.
HostNodeID = "host_node_id"
// ProbeID is the random ID of the probe which generated the specific node.
ProbeID = "probe_id"
)

View File

@@ -12,12 +12,14 @@ import (
// in the Node struct.
type Topology struct {
Nodes // TODO(pb): remove Nodes intermediate type
Controls
}
// MakeTopology gives you a Topology.
func MakeTopology() Topology {
return Topology{
Nodes: map[string]Node{},
Nodes: map[string]Node{},
Controls: Controls{},
}
}
@@ -37,7 +39,8 @@ func (t Topology) AddNode(nodeID string, nmd Node) Topology {
// Copy returns a value copy of the Topology.
func (t Topology) Copy() Topology {
return Topology{
Nodes: t.Nodes.Copy(),
Nodes: t.Nodes.Copy(),
Controls: t.Controls.Copy(),
}
}
@@ -45,7 +48,8 @@ func (t Topology) Copy() Topology {
// The original is not modified.
func (t Topology) Merge(other Topology) Topology {
return Topology{
Nodes: t.Nodes.Merge(other.Nodes),
Nodes: t.Nodes.Merge(other.Nodes),
Controls: t.Controls.Merge(other.Controls),
}
}
@@ -84,6 +88,7 @@ type Node struct {
Sets Sets `json:"sets,omitempty"`
Adjacency IDList `json:"adjacency"`
Edges EdgeMetadatas `json:"edges,omitempty"`
Controls IDList `json:"controls,omitempty"`
}
// MakeNode creates a new Node with no initial metadata.
@@ -94,6 +99,7 @@ func MakeNode() Node {
Sets: Sets{},
Adjacency: MakeIDList(),
Edges: EdgeMetadatas{},
Controls: MakeIDList(),
}
}
@@ -147,6 +153,13 @@ func (n Node) WithEdge(dst string, md EdgeMetadata) Node {
return result
}
// WithControls returns a fresh copy of n, with cs added to Controls.
func (n Node) WithControls(cs ...string) Node {
result := n.Copy()
result.Controls = result.Controls.Add(cs...)
return result
}
// Copy returns a value copy of the Node.
func (n Node) Copy() Node {
cp := MakeNode()
@@ -155,6 +168,7 @@ func (n Node) Copy() Node {
cp.Sets = n.Sets.Copy()
cp.Adjacency = n.Adjacency.Copy()
cp.Edges = n.Edges.Copy()
cp.Controls = n.Controls.Copy()
return cp
}
@@ -167,6 +181,7 @@ func (n Node) Merge(other Node) Node {
cp.Sets = cp.Sets.Merge(other.Sets)
cp.Adjacency = cp.Adjacency.Merge(other.Adjacency)
cp.Edges = cp.Edges.Merge(other.Edges)
cp.Controls = cp.Controls.Merge(other.Controls)
return cp
}

151
xfer/app_client.go Normal file
View File

@@ -0,0 +1,151 @@
package xfer
import (
"encoding/json"
"log"
"net/http"
"net/rpc"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/weaveworks/scope/common/sanitize"
)
// Details are some generic details that can be fetched from /api
type Details struct {
ID string `json:"id"`
Version string `json:"version"`
}
// AppClient is a client to an app for dealing with controls.
type AppClient interface {
Details() (Details, error)
ControlConnection(handler ControlHandler)
Stop()
}
type appClient struct {
ProbeConfig
quit chan struct{}
target string
insecure bool
client http.Client
controlServerCodecMtx sync.Mutex
controlServerCodec rpc.ServerCodec
}
// NewAppClient makes a new AppClient.
func NewAppClient(pc ProbeConfig, hostname, target string) (AppClient, error) {
httpTransport, err := pc.getHTTPTransport(hostname)
if err != nil {
return nil, err
}
return &appClient{
ProbeConfig: pc,
quit: make(chan struct{}),
target: target,
client: http.Client{
Transport: httpTransport,
},
}, nil
}
// Stop stops the appClient.
func (c *appClient) Stop() {
c.controlServerCodecMtx.Lock()
defer c.controlServerCodecMtx.Unlock()
close(c.quit)
if c.controlServerCodec != nil {
c.controlServerCodec.Close()
}
c.client.Transport.(*http.Transport).CloseIdleConnections()
}
// Details fetches the details (version, id) of the app.
func (c *appClient) Details() (Details, error) {
result := Details{}
req, err := c.ProbeConfig.authorizedRequest("GET", sanitize.URL("", 0, "/api")(c.target), nil)
if err != nil {
return result, err
}
resp, err := c.client.Do(req)
if err != nil {
return result, err
}
defer resp.Body.Close()
return result, json.NewDecoder(resp.Body).Decode(&result)
}
func (c *appClient) controlConnection(handler ControlHandler) error {
dialer := websocket.Dialer{}
headers := http.Header{}
c.ProbeConfig.authorizeHeaders(headers)
// TODO(twilkie) need to update sanitize to work with wss
url := sanitize.URL("ws://", 0, "/api/control/ws")(c.target)
conn, _, err := dialer.Dial(url, headers)
if err != nil {
return err
}
defer func() {
log.Printf("Closing control connection to %s", c.target)
conn.Close()
}()
codec := NewJSONWebsocketCodec(conn)
server := rpc.NewServer()
if err := server.RegisterName("control", handler); err != nil {
return err
}
c.controlServerCodecMtx.Lock()
c.controlServerCodec = codec
// At this point we may have tried to quit earlier, so check to see if the
// quit channel has been closed, non-blocking.
select {
default:
case <-c.quit:
codec.Close()
return nil
}
c.controlServerCodecMtx.Unlock()
server.ServeCodec(codec)
c.controlServerCodecMtx.Lock()
c.controlServerCodec = nil
c.controlServerCodecMtx.Unlock()
return nil
}
func (c *appClient) controlConnectionLoop(handler ControlHandler) {
defer log.Printf("Control connection to %s exiting", c.target)
backoff := initialBackoff
for {
err := c.controlConnection(handler)
if err == nil {
backoff = initialBackoff
continue
}
log.Printf("Error doing controls for %s, backing off %s: %v", c.target, backoff, err)
select {
case <-time.After(backoff):
case <-c.quit:
return
}
backoff *= 2
if backoff > maxBackoff {
backoff = maxBackoff
}
}
}
func (c *appClient) ControlConnection(handler ControlHandler) {
go c.controlConnectionLoop(handler)
}

141
xfer/controls.go Normal file
View File

@@ -0,0 +1,141 @@
package xfer
import (
"fmt"
"net/rpc"
"sync"
"github.com/gorilla/websocket"
)
// Request is the UI -> App -> Probe message type for control RPCs
type Request struct {
ID int64
NodeID string
Control string
}
// Response is the Probe -> App -> UI message type for the control RPCs.
type Response struct {
ID int64
Value interface{}
Error string
}
// ControlHandler is interface used in the app and the probe to represent
// a control RPC.
type ControlHandler interface {
Handle(req Request, res *Response) error
}
// ControlHandlerFunc is a adapter (ala golang's http RequestHandlerFunc)
// for ControlHandler
type ControlHandlerFunc func(Request) Response
// Handle is an adapter method to make ControlHandlers exposable via golang rpc
func (c ControlHandlerFunc) Handle(req Request, res *Response) error {
*res = c(req)
return nil
}
// ResponseErrorf creates a new Response with the given formatted error string.
func ResponseErrorf(format string, a ...interface{}) Response {
return Response{
Error: fmt.Sprintf(format, a...),
}
}
// ResponseError creates a new Response with the given error.
func ResponseError(err error) Response {
if err != nil {
return Response{
Error: err.Error(),
}
}
return Response{}
}
// JSONWebsocketCodec is golang rpc compatible Server and Client Codec
// that transmits and receives RPC messages over a websocker, as JSON.
type JSONWebsocketCodec struct {
sync.Mutex
conn *websocket.Conn
err chan struct{}
}
// NewJSONWebsocketCodec makes a new JSONWebsocketCodec
func NewJSONWebsocketCodec(conn *websocket.Conn) *JSONWebsocketCodec {
return &JSONWebsocketCodec{
conn: conn,
err: make(chan struct{}),
}
}
// WaitForReadError blocks until any read on this codec returns an error.
// This is useful to know when the server has disconnected from the client.
func (j *JSONWebsocketCodec) WaitForReadError() {
<-j.err
}
// WriteRequest implements rpc.ClientCodec
func (j *JSONWebsocketCodec) WriteRequest(r *rpc.Request, v interface{}) error {
j.Lock()
defer j.Unlock()
if err := j.conn.WriteJSON(r); err != nil {
return err
}
return j.conn.WriteJSON(v)
}
// ReadResponseHeader implements rpc.ClientCodec
func (j *JSONWebsocketCodec) ReadResponseHeader(r *rpc.Response) error {
err := j.conn.ReadJSON(r)
if err != nil {
close(j.err)
}
return err
}
// ReadResponseBody implements rpc.ClientCodec
func (j *JSONWebsocketCodec) ReadResponseBody(v interface{}) error {
err := j.conn.ReadJSON(v)
if err != nil {
close(j.err)
}
return err
}
// Close implements rpc.ClientCodec and rpc.ServerCodec
func (j *JSONWebsocketCodec) Close() error {
return j.conn.Close()
}
// ReadRequestHeader implements rpc.ServerCodec
func (j *JSONWebsocketCodec) ReadRequestHeader(r *rpc.Request) error {
err := j.conn.ReadJSON(r)
if err != nil {
close(j.err)
}
return err
}
// ReadRequestBody implements rpc.ServerCodec
func (j *JSONWebsocketCodec) ReadRequestBody(v interface{}) error {
err := j.conn.ReadJSON(v)
if err != nil {
close(j.err)
}
return err
}
// WriteResponse implements rpc.ServerCodec
func (j *JSONWebsocketCodec) WriteResponse(r *rpc.Response, v interface{}) error {
j.Lock()
defer j.Unlock()
if err := j.conn.WriteJSON(r); err != nil {
return err
}
return j.conn.WriteJSON(v)
}

View File

@@ -1,62 +1,39 @@
package xfer
import (
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"time"
"github.com/certifi/gocertifi"
"github.com/weaveworks/scope/common/sanitize"
)
// HTTPPublisher publishes buffers by POST to a fixed endpoint.
type HTTPPublisher struct {
url string
token string
probeID string
client *http.Client
}
ProbeConfig
func getHTTPTransport(hostname string, insecure bool) (*http.Transport, error) {
if insecure {
return &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}, nil
}
host, _, err := net.SplitHostPort(hostname)
if err != nil {
return nil, err
}
certPool, err := gocertifi.CACerts()
if err != nil {
return nil, err
}
return &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: certPool,
ServerName: host,
},
}, nil
url string
client *http.Client
}
// NewHTTPPublisher returns an HTTPPublisher ready for use.
func NewHTTPPublisher(hostname, target, token, probeID string, insecure bool) (string, *HTTPPublisher, error) {
httpTransport, err := getHTTPTransport(hostname, insecure)
pc := ProbeConfig{
Token: token,
ProbeID: probeID,
Insecure: insecure,
}
httpTransport, err := pc.getHTTPTransport(hostname)
if err != nil {
return "", nil, err
}
p := &HTTPPublisher{
url: sanitize.URL("", 0, "/api/report")(target),
token: token,
probeID: probeID,
ProbeConfig: pc,
url: sanitize.URL("", 0, "/api/report")(target),
client: &http.Client{
Transport: httpTransport,
},
@@ -66,7 +43,7 @@ func NewHTTPPublisher(hostname, target, token, probeID string, insecure bool) (s
Timeout: 5 * time.Second,
Transport: httpTransport,
}
req, err := p.authorizedRequest("GET", sanitize.URL("", 0, "/api")(target), nil)
req, err := pc.authorizedRequest("GET", sanitize.URL("", 0, "/api")(target), nil)
if err != nil {
return "", nil, err
}
@@ -84,22 +61,13 @@ func NewHTTPPublisher(hostname, target, token, probeID string, insecure bool) (s
return apiResponse.ID, p, nil
}
func (p HTTPPublisher) authorizedRequest(method string, urlStr string, body io.Reader) (*http.Request, error) {
req, err := http.NewRequest(method, urlStr, body)
if err == nil {
req.Header.Set("Authorization", AuthorizationHeader(p.token))
req.Header.Set(ScopeProbeIDHeader, p.probeID)
}
return req, err
}
func (p HTTPPublisher) String() string {
return p.url
}
// Publish publishes the report to the URL.
func (p HTTPPublisher) Publish(r io.Reader) error {
req, err := p.authorizedRequest("POST", p.url, r)
req, err := p.ProbeConfig.authorizedRequest("POST", p.url, r)
if err != nil {
return err
}
@@ -125,15 +93,3 @@ func (p *HTTPPublisher) Stop() {
// goroutines on the server (see #604)
p.client.Transport.(*http.Transport).CloseIdleConnections()
}
// AuthorizationHeader returns a value suitable for an HTTP Authorization
// header, based on the passed token string.
func AuthorizationHeader(token string) string {
return fmt.Sprintf("Scope-Probe token=%s", token)
}
// ScopeProbeIDHeader is the header we use to carry the probe's unique ID. The
// ID is currently set to the probe's hostname. It's designed to deduplicate
// reports from the same probe to the same receiver, in case the probe is
// configured to publish to multiple receivers that resolve to the same app.
const ScopeProbeIDHeader = "X-Scope-Probe-ID"

View File

@@ -4,6 +4,7 @@ import (
"compress/gzip"
"encoding/gob"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
@@ -27,7 +28,7 @@ func TestHTTPPublisher(t *testing.T) {
)
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if want, have := xfer.AuthorizationHeader(token), r.Header.Get("Authorization"); want != have {
if want, have := fmt.Sprintf("Scope-Probe token=%s", token), r.Header.Get("Authorization"); want != have {
t.Errorf("want %q, have %q", want, have)
}

118
xfer/multi_client.go Normal file
View File

@@ -0,0 +1,118 @@
package xfer
import (
"log"
"sync"
"github.com/weaveworks/scope/report"
)
// ClientFactory is a thing thats makes AppClients
type ClientFactory func(ProbeConfig, string, string) (AppClient, error)
type multiClient struct {
ProbeConfig
clientFactory ClientFactory
handler ControlHandler
mtx sync.Mutex
sema semaphore
clients map[string]AppClient // holds map from app id -> client
ids map[string]report.IDList // holds map from hostname -> app ids
quit chan struct{}
}
type clientTuple struct {
Details
AppClient
}
// MultiAppClient maintains a set of upstream apps, and ensures we have an
// AppClient for each one.
type MultiAppClient interface {
Set(hostname string, endpoints []string)
Stop()
}
// NewMultiAppClient creates a new MultiAppClient.
func NewMultiAppClient(pc ProbeConfig, handler ControlHandler, clientFactory ClientFactory) MultiAppClient {
return &multiClient{
ProbeConfig: pc,
clientFactory: clientFactory,
handler: handler,
sema: newSemaphore(maxConcurrentGET),
clients: map[string]AppClient{},
ids: map[string]report.IDList{},
quit: make(chan struct{}),
}
}
// Set the list of endpoints for the given hostname.
func (c *multiClient) Set(hostname string, endpoints []string) {
wg := sync.WaitGroup{}
wg.Add(len(endpoints))
clients := make(chan clientTuple, len(endpoints))
for _, endpoint := range endpoints {
go func(endpoint string) {
c.sema.acquire()
defer c.sema.release()
client, err := c.clientFactory(c.ProbeConfig, hostname, endpoint)
if err != nil {
log.Printf("Error creating new app client: %v", err)
return
}
details, err := client.Details()
if err != nil {
log.Printf("Error fetching app details: %v", err)
}
clients <- clientTuple{details, client}
wg.Done()
}(endpoint)
}
wg.Wait()
close(clients)
c.mtx.Lock()
defer c.mtx.Unlock()
// Start any new apps, and replace the list of app ids for this hostname
hostIDs := report.MakeIDList()
for tuple := range clients {
hostIDs = hostIDs.Add(tuple.ID)
_, ok := c.clients[tuple.ID]
if !ok {
c.clients[tuple.ID] = tuple.AppClient
tuple.AppClient.ControlConnection(c.handler)
}
}
c.ids[hostname] = hostIDs
// Remove apps that are no longer referenced (by id) from any hostname
allReferencedIDs := report.MakeIDList()
for _, ids := range c.ids {
allReferencedIDs = allReferencedIDs.Add(ids...)
}
for id, client := range c.clients {
if !allReferencedIDs.Contains(id) {
client.Stop()
delete(c.clients, id)
}
}
}
// Stop the MultiAppClient.
func (c *multiClient) Stop() {
c.mtx.Lock()
defer c.mtx.Unlock()
for _, c := range c.clients {
c.Stop()
}
c.clients = map[string]AppClient{}
close(c.quit)
}

78
xfer/multi_client_test.go Normal file
View File

@@ -0,0 +1,78 @@
package xfer_test
import (
"runtime"
"testing"
"github.com/weaveworks/scope/xfer"
)
type mockClient struct {
id string
count int
stopped int
}
func (c *mockClient) Details() (xfer.Details, error) {
return xfer.Details{ID: c.id}, nil
}
func (c *mockClient) ControlConnection(handler xfer.ControlHandler) {
c.count++
}
func (c *mockClient) Stop() {
c.stopped++
}
func TestMultiClient(t *testing.T) {
var (
a1 = &mockClient{id: "1"} // hostname a, app id 1
a2 = &mockClient{id: "2"} // hostname a, app id 2
b2 = &mockClient{id: "2"} // hostname b, app id 2 (duplicate)
b3 = &mockClient{id: "3"} // hostname b, app id 3
factory = func(_ xfer.ProbeConfig, hostname, target string) (xfer.AppClient, error) {
switch target {
case "a1":
return a1, nil
case "a2":
return a2, nil
case "b2":
return b2, nil
case "b3":
return b3, nil
}
t.Fatal(target)
return a1, nil
}
controlHandler = xfer.ControlHandlerFunc(func(_ xfer.Request) xfer.Response {
return xfer.Response{}
})
expect = func(i, j int) {
if i != j {
_, file, line, _ := runtime.Caller(1)
t.Fatalf("%s:%d: %d != %d", file, line, i, j)
}
}
)
mp := xfer.NewMultiAppClient(xfer.ProbeConfig{}, controlHandler, factory)
defer mp.Stop()
// Add two hostnames with overlapping apps, check we don't add the same app twice
mp.Set("a", []string{"a1", "a2"})
mp.Set("b", []string{"b2", "b3"})
expect(a1.count, 1)
expect(a2.count+b2.count, 1)
expect(b3.count, 1)
// Now drop the overlap, check we don't remove the app
mp.Set("b", []string{"b3"})
expect(a1.count, 1)
expect(a2.count+b2.count, 1)
expect(b3.count, 1)
// Now check we remove apps
mp.Set("b", []string{})
expect(b3.stopped, 1)
}

59
xfer/probe_config.go Normal file
View File

@@ -0,0 +1,59 @@
package xfer
import (
"crypto/tls"
"fmt"
"io"
"net"
"net/http"
"github.com/certifi/gocertifi"
)
// ScopeProbeIDHeader is the header we use to carry the probe's unique ID. The
// ID is currently set to the a random string on probe startup.
const ScopeProbeIDHeader = "X-Scope-Probe-ID"
// ProbeConfig contains all the info needed for a probe to do HTTP requests
type ProbeConfig struct {
Token string
ProbeID string
Insecure bool
}
func (pc ProbeConfig) authorizeHeaders(headers http.Header) {
headers.Set("Authorization", fmt.Sprintf("Scope-Probe token=%s", pc.Token))
headers.Set(ScopeProbeIDHeader, pc.ProbeID)
}
func (pc ProbeConfig) authorizedRequest(method string, urlStr string, body io.Reader) (*http.Request, error) {
req, err := http.NewRequest(method, urlStr, body)
if err == nil {
pc.authorizeHeaders(req.Header)
}
return req, err
}
func (pc ProbeConfig) getHTTPTransport(hostname string) (*http.Transport, error) {
if pc.Insecure {
return &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}, nil
}
host, _, err := net.SplitHostPort(hostname)
if err != nil {
return nil, err
}
certPool, err := gocertifi.CACerts()
if err != nil {
return nil, err
}
return &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: certPool,
ServerName: host,
},
}, nil
}