mirror of
https://github.com/weaveworks/scope.git
synced 2026-04-20 01:17:09 +00:00
Forward control requests to plugins
Thanks to that, plugins can react to requests from controls they exposed. To make it work, plugins registry modifies each plugin's report by prepending the plugin ID to the control name the plugin has exposed before sending it to the app. Then the registry installs the control request handler for this faked control name, which forwards the request to the correct plugin. This adds a new API endpoint to plugins next to "/report" - a "/control" entry. The body of the request is the JSON-encoded xfer.Request instance.
This commit is contained in:
@@ -1,7 +1,9 @@
|
||||
package plugins
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"path/filepath"
|
||||
@@ -20,6 +22,7 @@ import (
|
||||
"github.com/weaveworks/scope/common/backoff"
|
||||
"github.com/weaveworks/scope/common/fs"
|
||||
"github.com/weaveworks/scope/common/xfer"
|
||||
"github.com/weaveworks/scope/probe/controls"
|
||||
"github.com/weaveworks/scope/report"
|
||||
)
|
||||
|
||||
@@ -45,11 +48,14 @@ type Registry struct {
|
||||
lock sync.RWMutex
|
||||
context context.Context
|
||||
cancel context.CancelFunc
|
||||
controlsByPlugin map[string]report.StringSet
|
||||
pluginsByID map[string]*Plugin
|
||||
handlerRegistry *controls.HandlerRegistry
|
||||
}
|
||||
|
||||
// NewRegistry creates a new registry which watches the given dir root for new
|
||||
// plugins, and adds them.
|
||||
func NewRegistry(rootPath, apiVersion string, handshakeMetadata map[string]string) (*Registry, error) {
|
||||
func NewRegistry(rootPath, apiVersion string, handshakeMetadata map[string]string, handlerRegistry *controls.HandlerRegistry) (*Registry, error) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
r := &Registry{
|
||||
rootPath: rootPath,
|
||||
@@ -58,6 +64,9 @@ func NewRegistry(rootPath, apiVersion string, handshakeMetadata map[string]strin
|
||||
pluginsBySocket: map[string]*Plugin{},
|
||||
context: ctx,
|
||||
cancel: cancel,
|
||||
controlsByPlugin: map[string]report.StringSet{},
|
||||
pluginsByID: map[string]*Plugin{},
|
||||
handlerRegistry: handlerRegistry,
|
||||
}
|
||||
if err := r.scan(); err != nil {
|
||||
r.Close()
|
||||
@@ -92,11 +101,14 @@ func (r *Registry) scan() error {
|
||||
}
|
||||
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
plugins := map[string]*Plugin{}
|
||||
pluginsByID := map[string]*Plugin{}
|
||||
// add (or keep) plugins which were found
|
||||
for _, path := range sockets {
|
||||
if plugin, ok := r.pluginsBySocket[path]; ok {
|
||||
plugins[path] = plugin
|
||||
pluginsByID[plugin.PluginSpec.ID] = plugin
|
||||
continue
|
||||
}
|
||||
tr, err := transport(path, pluginTimeout)
|
||||
@@ -111,17 +123,20 @@ func (r *Registry) scan() error {
|
||||
continue
|
||||
}
|
||||
plugins[path] = plugin
|
||||
pluginsByID[plugin.PluginSpec.ID] = plugin
|
||||
log.Infof("plugins: added plugin %s", path)
|
||||
}
|
||||
// remove plugins which weren't found
|
||||
pluginsToClose := map[string]*Plugin{}
|
||||
for path, plugin := range r.pluginsBySocket {
|
||||
if _, ok := plugins[path]; !ok {
|
||||
plugin.Close()
|
||||
pluginsToClose[plugin.PluginSpec.ID] = plugin
|
||||
log.Infof("plugins: removed plugin %s", plugin.socket)
|
||||
}
|
||||
}
|
||||
r.closePlugins(pluginsToClose)
|
||||
r.pluginsBySocket = plugins
|
||||
r.lock.Unlock()
|
||||
r.pluginsByID = pluginsByID
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -155,10 +170,10 @@ func (r *Registry) sockets(path string) ([]string, error) {
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// ForEach walks through all the plugins running f for each one.
|
||||
func (r *Registry) ForEach(f func(p *Plugin)) {
|
||||
r.lock.RLock()
|
||||
defer r.lock.RUnlock()
|
||||
// forEach walks through all the plugins running f for each one.
|
||||
func (r *Registry) forEach(lock sync.Locker, f func(p *Plugin)) {
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
paths := []string{}
|
||||
for path := range r.pluginsBySocket {
|
||||
paths = append(paths, path)
|
||||
@@ -169,6 +184,11 @@ func (r *Registry) ForEach(f func(p *Plugin)) {
|
||||
}
|
||||
}
|
||||
|
||||
// ForEach walks through all the plugins running f for each one.
|
||||
func (r *Registry) ForEach(f func(p *Plugin)) {
|
||||
r.forEach(r.lock.RLocker(), f)
|
||||
}
|
||||
|
||||
// Implementers walks the available plugins fulfilling the given interface
|
||||
func (r *Registry) Implementers(iface string, f func(p *Plugin)) {
|
||||
r.ForEach(func(p *Plugin) {
|
||||
@@ -187,25 +207,143 @@ func (r *Registry) Name() string { return "plugins" }
|
||||
func (r *Registry) Report() (report.Report, error) {
|
||||
rpt := report.MakeReport()
|
||||
// All plugins are assumed to (and must) implement reporter
|
||||
r.ForEach(func(plugin *Plugin) {
|
||||
r.forEach(&r.lock, func(plugin *Plugin) {
|
||||
pluginReport, err := plugin.Report()
|
||||
if err != nil {
|
||||
log.Errorf("plugins: %s: /report error: %v", plugin.socket, err)
|
||||
}
|
||||
if plugin.Implements("controller") {
|
||||
r.updateAndRegisterControlsInReport(&pluginReport)
|
||||
}
|
||||
rpt = rpt.Merge(pluginReport)
|
||||
})
|
||||
return rpt, nil
|
||||
}
|
||||
|
||||
func (r *Registry) updateAndRegisterControlsInReport(rpt *report.Report) {
|
||||
key := rpt.Plugins.Keys()[0]
|
||||
spec, _ := rpt.Plugins.Lookup(key)
|
||||
pluginID := spec.ID
|
||||
topologies := topologyPointers(rpt)
|
||||
var newPluginControls []string
|
||||
for _, topology := range topologies {
|
||||
newPluginControls = append(newPluginControls, r.updateAndGetControlsInTopology(pluginID, topology)...)
|
||||
}
|
||||
r.updatePluginControls(pluginID, report.MakeStringSet(newPluginControls...))
|
||||
}
|
||||
|
||||
func topologyPointers(rpt *report.Report) []*report.Topology {
|
||||
// We cannot use rpt.Topologies(), because it makes a slice of
|
||||
// topology copies and we need original locations to modify
|
||||
// them.
|
||||
return []*report.Topology{
|
||||
&rpt.Endpoint,
|
||||
&rpt.Process,
|
||||
&rpt.Container,
|
||||
&rpt.ContainerImage,
|
||||
&rpt.Pod,
|
||||
&rpt.Service,
|
||||
&rpt.Deployment,
|
||||
&rpt.ReplicaSet,
|
||||
&rpt.Host,
|
||||
&rpt.Overlay,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Registry) updateAndGetControlsInTopology(pluginID string, topology *report.Topology) []string {
|
||||
var pluginControls []string
|
||||
newControls := report.Controls{}
|
||||
for controlID, control := range topology.Controls {
|
||||
fakeID := fakeControlID(pluginID, controlID)
|
||||
log.Debugf("plugins: replacing control %s with %s", controlID, fakeID)
|
||||
control.ID = fakeID
|
||||
newControls.AddControl(control)
|
||||
pluginControls = append(pluginControls, controlID)
|
||||
}
|
||||
newNodes := report.Nodes{}
|
||||
for name, node := range topology.Nodes {
|
||||
log.Debugf("plugins: checking node controls in node %s of %s", name, topology.Label)
|
||||
newNode := node.WithID(name)
|
||||
var nodeControls []string
|
||||
for _, controlID := range node.Controls.Controls {
|
||||
log.Debugf("plugins: got node control %s", controlID)
|
||||
newControlID := ""
|
||||
if _, found := topology.Controls[controlID]; !found {
|
||||
log.Debugf("plugins: node control %s does not exist in topology controls", controlID)
|
||||
newControlID = controlID
|
||||
} else {
|
||||
newControlID = fakeControlID(pluginID, controlID)
|
||||
log.Debugf("plugins: will replace node control %s with %s", controlID, newControlID)
|
||||
}
|
||||
nodeControls = append(nodeControls, newControlID)
|
||||
}
|
||||
newNode.Controls.Controls = report.MakeStringSet(nodeControls...)
|
||||
newNodes[newNode.ID] = newNode
|
||||
}
|
||||
topology.Controls = newControls
|
||||
topology.Nodes = newNodes
|
||||
return pluginControls
|
||||
}
|
||||
|
||||
func (r *Registry) updatePluginControls(pluginID string, newPluginControls report.StringSet) {
|
||||
oldFakePluginControls := r.fakePluginControls(pluginID)
|
||||
newFakePluginControls := map[string]xfer.ControlHandlerFunc{}
|
||||
for _, controlID := range newPluginControls {
|
||||
newFakePluginControls[fakeControlID(pluginID, controlID)] = r.pluginControlHandler
|
||||
}
|
||||
r.handlerRegistry.Batch(oldFakePluginControls, newFakePluginControls)
|
||||
r.controlsByPlugin[pluginID] = newPluginControls
|
||||
}
|
||||
|
||||
func (r *Registry) pluginControlHandler(req xfer.Request) xfer.Response {
|
||||
pluginID, controlID := realPluginAndControlID(req.Control)
|
||||
req.Control = controlID
|
||||
r.lock.RLock()
|
||||
defer r.lock.RUnlock()
|
||||
if plugin, found := r.pluginsByID[pluginID]; found {
|
||||
return plugin.Control(req)
|
||||
}
|
||||
return xfer.ResponseErrorf("plugin %s not found", pluginID)
|
||||
}
|
||||
|
||||
func realPluginAndControlID(fakeID string) (string, string) {
|
||||
parts := strings.SplitN(fakeID, "~", 2)
|
||||
if len(parts) != 2 {
|
||||
return "", fakeID
|
||||
}
|
||||
return parts[0], parts[1]
|
||||
}
|
||||
|
||||
// Close shuts down the registry. It can still be used after this, but will be
|
||||
// out of date.
|
||||
func (r *Registry) Close() {
|
||||
r.cancel()
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
for _, plugin := range r.pluginsBySocket {
|
||||
r.closePlugins(r.pluginsByID)
|
||||
}
|
||||
|
||||
func (r *Registry) closePlugins(plugins map[string]*Plugin) {
|
||||
var toRemove []string
|
||||
for pluginID, plugin := range plugins {
|
||||
toRemove = append(toRemove, r.fakePluginControls(pluginID)...)
|
||||
delete(r.controlsByPlugin, pluginID)
|
||||
plugin.Close()
|
||||
}
|
||||
r.handlerRegistry.Batch(toRemove, nil)
|
||||
}
|
||||
|
||||
func (r *Registry) fakePluginControls(pluginID string) []string {
|
||||
oldPluginControls := r.controlsByPlugin[pluginID]
|
||||
var oldFakePluginControls []string
|
||||
for _, controlID := range oldPluginControls {
|
||||
oldFakePluginControls = append(oldFakePluginControls, fakeControlID(pluginID, controlID))
|
||||
}
|
||||
return oldFakePluginControls
|
||||
}
|
||||
|
||||
func fakeControlID(pluginID, controlID string) string {
|
||||
return fmt.Sprintf("%s~%s", pluginID, controlID)
|
||||
}
|
||||
|
||||
// Plugin is the implementation of a plugin. It is responsible for doing the
|
||||
@@ -273,25 +411,46 @@ func (p *Plugin) Report() (result report.Report, err error) {
|
||||
}
|
||||
p.PluginSpec = spec
|
||||
|
||||
foundReporter := false
|
||||
for _, i := range spec.Interfaces {
|
||||
if i == "reporter" {
|
||||
foundReporter = true
|
||||
break
|
||||
}
|
||||
}
|
||||
switch {
|
||||
case spec.APIVersion != p.expectedAPIVersion:
|
||||
err = fmt.Errorf("incorrect API version: expected %q, got %q", p.expectedAPIVersion, spec.APIVersion)
|
||||
case spec.Label == "":
|
||||
err = fmt.Errorf("spec must contain a label")
|
||||
case !foundReporter:
|
||||
case !p.Implements("reporter"):
|
||||
err = fmt.Errorf("spec must implement the \"reporter\" interface")
|
||||
}
|
||||
|
||||
return result, err
|
||||
}
|
||||
|
||||
// Control sends a control message to a plugin
|
||||
func (p *Plugin) Control(request xfer.Request) (res xfer.Response) {
|
||||
var err error
|
||||
defer func() {
|
||||
p.setStatus(err)
|
||||
if err != nil {
|
||||
res = xfer.ResponseError(err)
|
||||
}
|
||||
}()
|
||||
|
||||
if p.Implements("controller") {
|
||||
err = p.post("/control", p.handshakeMetadata, request, &res)
|
||||
} else {
|
||||
err = fmt.Errorf("the %s plugin does not implement the controller interface", p.PluginSpec.Label)
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
// Implements checks if the plugin implements the given interface
|
||||
func (p *Plugin) Implements(iface string) bool {
|
||||
for _, i := range p.PluginSpec.Interfaces {
|
||||
if i == iface {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (p *Plugin) setStatus(err error) {
|
||||
if err == nil {
|
||||
p.Status = "ok"
|
||||
@@ -308,11 +467,34 @@ func (p *Plugin) get(path string, params url.Values, result interface{}) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("plugin returned non-200 status code: %s", resp.Status)
|
||||
}
|
||||
return getResult(resp.Body, result)
|
||||
}
|
||||
|
||||
func (p *Plugin) post(path string, params url.Values, data interface{}, result interface{}) error {
|
||||
// Context here lets us either timeout req. or cancel it in Plugin.Close
|
||||
ctx, cancel := context.WithTimeout(p.context, pluginTimeout)
|
||||
defer cancel()
|
||||
buf := &bytes.Buffer{}
|
||||
if err := codec.NewEncoder(buf, &codec.JsonHandle{}).Encode(data); err != nil {
|
||||
return fmt.Errorf("encoding error: %s", err)
|
||||
}
|
||||
resp, err := ctxhttp.Post(ctx, p.client, fmt.Sprintf("http://plugin%s?%s", path, params.Encode()), "application/json", buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
err = codec.NewDecoder(MaxBytesReader(resp.Body, maxResponseBytes, errResponseTooLarge), &codec.JsonHandle{}).Decode(&result)
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("plugin returned non-200 status code: %s", resp.Status)
|
||||
}
|
||||
return getResult(resp.Body, result)
|
||||
}
|
||||
|
||||
func getResult(body io.ReadCloser, result interface{}) error {
|
||||
err := codec.NewDecoder(MaxBytesReader(body, maxResponseBytes, errResponseTooLarge), &codec.JsonHandle{}).Decode(&result)
|
||||
if err == errResponseTooLarge {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -9,19 +9,33 @@ import (
|
||||
"net/http/httputil"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"sync"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/paypal/ionet"
|
||||
"github.com/ugorji/go/codec"
|
||||
|
||||
fs_hook "github.com/weaveworks/scope/common/fs"
|
||||
"github.com/weaveworks/scope/common/xfer"
|
||||
"github.com/weaveworks/scope/probe/controls"
|
||||
"github.com/weaveworks/scope/report"
|
||||
"github.com/weaveworks/scope/test"
|
||||
"github.com/weaveworks/scope/test/fs"
|
||||
"github.com/weaveworks/scope/test/reflect"
|
||||
)
|
||||
|
||||
func testRegistry(t *testing.T, apiVersion string) *Registry {
|
||||
handlerRegistry := controls.NewDefaultHandlerRegistry()
|
||||
root := "/plugins"
|
||||
r, err := NewRegistry(root, apiVersion, nil, handlerRegistry)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
func stubTransport(fn func(socket string, timeout time.Duration) (http.RoundTripper, error)) {
|
||||
transport = fn
|
||||
}
|
||||
@@ -158,16 +172,68 @@ func checkLoadedPluginIDs(t *testing.T, forEach iterator, expectedIDs []string)
|
||||
}
|
||||
}
|
||||
|
||||
type testResponse struct {
|
||||
Status int
|
||||
Body string
|
||||
}
|
||||
|
||||
type testResponseMap map[string]testResponse
|
||||
|
||||
// mapStringHandler returns an http.Handler which just prints the given string for each path
|
||||
func mapStringHandler(responses testResponseMap) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if response, found := responses[r.URL.Path]; found {
|
||||
w.WriteHeader(response.Status)
|
||||
fmt.Fprint(w, response.Body)
|
||||
} else {
|
||||
http.NotFound(w, r)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// stringHandler returns an http.Handler which just prints the given string
|
||||
func stringHandler(status int, j string) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path != "/report" {
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(status)
|
||||
fmt.Fprint(w, j)
|
||||
})
|
||||
return mapStringHandler(testResponseMap{"/report": {status, j}})
|
||||
}
|
||||
|
||||
type testHandlerRegistryBackend struct {
|
||||
handlers map[string]xfer.ControlHandlerFunc
|
||||
t *testing.T
|
||||
mtx sync.Mutex
|
||||
}
|
||||
|
||||
func newTestHandlerRegistryBackend(t *testing.T) *testHandlerRegistryBackend {
|
||||
return &testHandlerRegistryBackend{
|
||||
handlers: map[string]xfer.ControlHandlerFunc{},
|
||||
t: t,
|
||||
}
|
||||
}
|
||||
|
||||
// Lock locks the backend, so the batch insertions or removals can be
|
||||
// performed.
|
||||
func (b *testHandlerRegistryBackend) Lock() {
|
||||
b.mtx.Lock()
|
||||
}
|
||||
|
||||
// Unlock unlocks the backend.
|
||||
func (b *testHandlerRegistryBackend) Unlock() {
|
||||
b.mtx.Unlock()
|
||||
}
|
||||
|
||||
// Register a new control handler under a given id.
|
||||
func (b *testHandlerRegistryBackend) Register(control string, f xfer.ControlHandlerFunc) {
|
||||
b.handlers[control] = f
|
||||
}
|
||||
|
||||
// Rm deletes the handler for a given name.
|
||||
func (b *testHandlerRegistryBackend) Rm(control string) {
|
||||
delete(b.handlers, control)
|
||||
}
|
||||
|
||||
// Handler gets the handler for the given id.
|
||||
func (b *testHandlerRegistryBackend) Handler(control string) (xfer.ControlHandlerFunc, bool) {
|
||||
handler, ok := b.handlers[control]
|
||||
return handler, ok
|
||||
}
|
||||
|
||||
func TestRegistryLoadsExistingPlugins(t *testing.T) {
|
||||
@@ -181,11 +247,7 @@ func TestRegistryLoadsExistingPlugins(t *testing.T) {
|
||||
)
|
||||
defer restore(t)
|
||||
|
||||
root := "/plugins"
|
||||
r, err := NewRegistry(root, "1", nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
r := testRegistry(t, "1")
|
||||
defer r.Close()
|
||||
|
||||
r.Report()
|
||||
@@ -211,11 +273,7 @@ func TestRegistryLoadsExistingPluginsEvenWhenOneFails(t *testing.T) {
|
||||
)
|
||||
defer restore(t)
|
||||
|
||||
root := "/plugins"
|
||||
r, err := NewRegistry(root, "1", nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
r := testRegistry(t, "1")
|
||||
defer r.Close()
|
||||
|
||||
r.Report()
|
||||
@@ -239,11 +297,7 @@ func TestRegistryDiscoversNewPlugins(t *testing.T) {
|
||||
mockFS := setup(t)
|
||||
defer restore(t)
|
||||
|
||||
root := "/plugins"
|
||||
r, err := NewRegistry(root, "", nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
r := testRegistry(t, "")
|
||||
defer r.Close()
|
||||
|
||||
r.Report()
|
||||
@@ -273,11 +327,7 @@ func TestRegistryRemovesPlugins(t *testing.T) {
|
||||
mockFS := setup(t, plugin.file())
|
||||
defer restore(t)
|
||||
|
||||
root := "/plugins"
|
||||
r, err := NewRegistry(root, "", nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
r := testRegistry(t, "")
|
||||
defer r.Close()
|
||||
|
||||
r.Report()
|
||||
@@ -305,11 +355,7 @@ func TestRegistryUpdatesPluginsWhenTheyChange(t *testing.T) {
|
||||
setup(t, plugin.file())
|
||||
defer restore(t)
|
||||
|
||||
root := "/plugins"
|
||||
r, err := NewRegistry(root, "", nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
r := testRegistry(t, "")
|
||||
defer r.Close()
|
||||
|
||||
r.Report()
|
||||
@@ -345,11 +391,7 @@ func TestRegistryReturnsPluginsByInterface(t *testing.T) {
|
||||
)
|
||||
defer restore(t)
|
||||
|
||||
root := "/plugins"
|
||||
r, err := NewRegistry(root, "", nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
r := testRegistry(t, "")
|
||||
defer r.Close()
|
||||
|
||||
r.Report()
|
||||
@@ -374,11 +416,7 @@ func TestRegistryHandlesConflictingPlugins(t *testing.T) {
|
||||
)
|
||||
defer restore(t)
|
||||
|
||||
root := "/plugins"
|
||||
r, err := NewRegistry(root, "", nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
r := testRegistry(t, "")
|
||||
defer r.Close()
|
||||
|
||||
r.Report()
|
||||
@@ -422,8 +460,8 @@ func TestRegistryRejectsErroneousPluginResponses(t *testing.T) {
|
||||
}.file(),
|
||||
mockPlugin{
|
||||
t: t,
|
||||
Name: "changedId",
|
||||
Handler: stringHandler(http.StatusOK, `{"Plugins":[{"id":"differentId","label":"changedId","interfaces":["reporter"]}]}`),
|
||||
Name: "changedID",
|
||||
Handler: stringHandler(http.StatusOK, `{"Plugins":[{"id":"differentID","label":"changedID","interfaces":["reporter"]}]}`),
|
||||
}.file(),
|
||||
mockPlugin{
|
||||
t: t,
|
||||
@@ -433,19 +471,15 @@ func TestRegistryRejectsErroneousPluginResponses(t *testing.T) {
|
||||
)
|
||||
defer restore(t)
|
||||
|
||||
root := "/plugins"
|
||||
r, err := NewRegistry(root, "", nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
r := testRegistry(t, "")
|
||||
defer r.Close()
|
||||
|
||||
r.Report()
|
||||
checkLoadedPlugins(t, r.ForEach, []xfer.PluginSpec{
|
||||
{
|
||||
ID: "changedId",
|
||||
Label: "changedId",
|
||||
Status: `error: plugin must not change its id (is "differentId", should be "changedId")`,
|
||||
ID: "changedID",
|
||||
Label: "changedID",
|
||||
Status: `error: plugin must not change its id (is "differentID", should be "changedID")`,
|
||||
},
|
||||
{
|
||||
ID: "moreThanOnePlugin",
|
||||
@@ -516,11 +550,7 @@ func TestRegistryRejectsPluginResponsesWhichAreTooLarge(t *testing.T) {
|
||||
restore(t)
|
||||
}()
|
||||
|
||||
root := "/plugins"
|
||||
r, err := NewRegistry(root, "", nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
r := testRegistry(t, "")
|
||||
defer r.Close()
|
||||
|
||||
r.Report()
|
||||
@@ -570,13 +600,150 @@ func TestRegistryChecksForValidPluginIDs(t *testing.T) {
|
||||
)
|
||||
defer restore(t)
|
||||
|
||||
r := testRegistry(t, "1")
|
||||
defer r.Close()
|
||||
|
||||
r.Report()
|
||||
checkLoadedPluginIDs(t, r.ForEach, []string{"P-L-U-G-I-N", "another-testPlugin", "testPlugin"})
|
||||
}
|
||||
|
||||
func checkControls(t *testing.T, topology report.Topology, expectedControls, expectedNodeControls []string, nodeID string) {
|
||||
controlsSet := report.MakeStringSet(expectedControls...)
|
||||
for _, id := range controlsSet {
|
||||
control, found := topology.Controls[id]
|
||||
if !found {
|
||||
t.Fatalf("Could not find an expected control %s in topology %s", id, topology.Label)
|
||||
}
|
||||
if control.ID != id {
|
||||
t.Fatalf("Control ID mismatch, expected %s, got %s", id, control.ID)
|
||||
}
|
||||
}
|
||||
if len(controlsSet) != len(topology.Controls) {
|
||||
t.Fatalf("Expected exactly %d controls in topology, got %d", len(controlsSet), len(topology.Controls))
|
||||
}
|
||||
|
||||
node, found := topology.Nodes[nodeID]
|
||||
if !found {
|
||||
t.Fatalf("expected a node %s in a topology", nodeID)
|
||||
}
|
||||
nodeControlsSet := report.MakeStringSet(expectedNodeControls...)
|
||||
if !reflect.DeepEqual(nodeControlsSet, node.Controls.Controls) {
|
||||
t.Fatalf("node controls in node %s in topology %s are not equal:\n%s", nodeID, topology.Label, test.Diff(nodeControlsSet, node.Controls.Controls))
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegistryRewritesControlReports(t *testing.T) {
|
||||
setup(
|
||||
t,
|
||||
mockPlugin{
|
||||
t: t,
|
||||
Name: "testPlugin",
|
||||
Handler: mapStringHandler(testResponseMap{
|
||||
"/report": {http.StatusOK, `{"Pod": {"label":"pod","controls": {"ctrl1":{"id": "ctrl1","human":"Ctrl 1","icon":"fa-at","rank":1}},"nodes":{"node1":{"id":"node1","adjacency":[], "controls":{"timestamp":"2006-01-02 15:04:05.999999999 -0700 MST","controls":["ctrl1", "ctrl2"]}}}},"Plugins":[{"id":"testPlugin","label":"testPlugin","interfaces":["reporter", "controller"],"api_version":"1"}]}`},
|
||||
"/control": {http.StatusOK, `{"value":"foo"}`},
|
||||
}),
|
||||
}.file(),
|
||||
mockPlugin{
|
||||
t: t,
|
||||
Name: "testPluginReporterOnly",
|
||||
Handler: mapStringHandler(testResponseMap{
|
||||
"/report": {http.StatusOK, `{"Host": {"label":"host","controls": {"ctrl1":{"id": "ctrl1","human":"Ctrl 1","icon":"fa-at","rank":1}},"nodes":{"node1":{"id":"node1","adjacency":[], "controls":{"timestamp":"2006-01-02 15:04:05.999999999 -0700 MST","controls":["ctrl1", "ctrl2"]}}}},"Plugins":[{"id":"testPluginReporterOnly","label":"testPluginReporterOnly","interfaces":["reporter"],"api_version":"1"}]}`},
|
||||
}),
|
||||
}.file(),
|
||||
)
|
||||
defer restore(t)
|
||||
|
||||
r := testRegistry(t, "1")
|
||||
defer r.Close()
|
||||
|
||||
rpt, err := r.Report()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// in a Pod topology, ctrl1 should be faked, ctrl2 should be left intact
|
||||
expectedPodControls := []string{fakeControlID("testPlugin", "ctrl1")}
|
||||
expectedPodNodeControls := []string{fakeControlID("testPlugin", "ctrl1"), "ctrl2"}
|
||||
checkControls(t, rpt.Pod, expectedPodControls, expectedPodNodeControls, "node1")
|
||||
// in a Host topology, controls should be kept untouched
|
||||
expectedHostControls := []string{"ctrl1"}
|
||||
expectedHostNodeControls := []string{"ctrl1", "ctrl2"}
|
||||
checkControls(t, rpt.Host, expectedHostControls, expectedHostNodeControls, "node1")
|
||||
}
|
||||
|
||||
func TestRegistryRegistersHandlers(t *testing.T) {
|
||||
setup(
|
||||
t,
|
||||
mockPlugin{
|
||||
t: t,
|
||||
Name: "testPlugin",
|
||||
Handler: mapStringHandler(testResponseMap{
|
||||
"/report": {http.StatusOK, `{"Pod": {"label":"pod","controls": {"ctrl1":{"id": "ctrl1","human":"Ctrl 1","icon":"fa-at","rank":1}},"nodes":{"node1":{"id":"node1","adjacency":[], "controls":{"timestamp":"2006-01-02 15:04:05.999999999 -0700 MST","controls":["ctrl1", "ctrl2"]}}}},"Plugins":[{"id":"testPlugin","label":"testPlugin","interfaces":["reporter", "controller"],"api_version":"1"}]}`},
|
||||
"/control": {http.StatusOK, `{"value":"foo"}`},
|
||||
}),
|
||||
}.file(),
|
||||
)
|
||||
defer restore(t)
|
||||
|
||||
testBackend := newTestHandlerRegistryBackend(t)
|
||||
handlerRegistry := controls.NewHandlerRegistry(testBackend)
|
||||
root := "/plugins"
|
||||
r, err := NewRegistry(root, "1", nil)
|
||||
r, err := NewRegistry(root, "1", nil, handlerRegistry)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
r.Report()
|
||||
checkLoadedPluginIDs(t, r.ForEach, []string{"P-L-U-G-I-N", "another-testPlugin", "testPlugin"})
|
||||
if len(testBackend.handlers) != 1 {
|
||||
t.Fatalf("Expected only one registered handler, got %d", len(testBackend.handlers))
|
||||
}
|
||||
fakeID := fakeControlID("testPlugin", "ctrl1")
|
||||
if _, found := testBackend.Handler(fakeID); !found {
|
||||
t.Fatalf("Expected to have a handler for %s", fakeID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegistryHandlersCallPlugins(t *testing.T) {
|
||||
setup(
|
||||
t,
|
||||
mockPlugin{
|
||||
t: t,
|
||||
Name: "testPlugin",
|
||||
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
switch r.URL.Path {
|
||||
case "/report":
|
||||
w.WriteHeader(http.StatusOK)
|
||||
fmt.Fprint(w, `{"Pod": {"label":"pod","controls": {"ctrl1":{"id": "ctrl1","human":"Ctrl 1","icon":"fa-at","rank":1}},"nodes":{"node1":{"id":"node1","adjacency":[], "controls":{"timestamp":"2006-01-02 15:04:05.999999999 -0700 MST","controls":["ctrl1", "ctrl2"]}}}},"Plugins":[{"id":"testPlugin","label":"testPlugin","interfaces":["reporter", "controller"],"api_version":"1"}]}`)
|
||||
case "/control":
|
||||
xreq := xfer.Request{}
|
||||
err := codec.NewDecoder(r.Body, &codec.JsonHandle{}).Decode(&xreq)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
fmt.Fprintf(w, `{"value":"%s,%s"}`, xreq.NodeID, xreq.Control)
|
||||
default:
|
||||
http.NotFound(w, r)
|
||||
}
|
||||
}),
|
||||
}.file(),
|
||||
)
|
||||
defer restore(t)
|
||||
|
||||
handlerRegistry := controls.NewDefaultHandlerRegistry()
|
||||
root := "/plugins"
|
||||
r, err := NewRegistry(root, "1", nil, handlerRegistry)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
r.Report()
|
||||
fakeID := fakeControlID("testPlugin", "ctrl1")
|
||||
req := xfer.Request{NodeID: "node1", Control: fakeID}
|
||||
res := handlerRegistry.HandleControlRequest(req)
|
||||
if res.Value != "node1,ctrl1" {
|
||||
t.Fatalf("Got unexpected response: %#v", res)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -206,6 +206,7 @@ func probeMain(flags probeFlags) {
|
||||
"probe_id": probeID,
|
||||
"api_version": pluginAPIVersion,
|
||||
},
|
||||
handlerRegistry,
|
||||
)
|
||||
if err != nil {
|
||||
log.Errorf("plugins: problem loading: %v", err)
|
||||
|
||||
Reference in New Issue
Block a user