Files
weave-scope/probe/plugins/registry.go
Bryan Boreham a47cf0a2aa Remove copying Merge() on Report
It was only used in a few places, and all of those were better off
using the Unsafe variant.
2020-03-06 15:03:43 +00:00

515 lines
15 KiB
Go

package plugins
import (
"bytes"
"fmt"
"io"
"net/http"
"net/url"
"path/filepath"
"regexp"
"sort"
"strings"
"sync"
"syscall"
"time"
"context"
log "github.com/sirupsen/logrus"
"github.com/ugorji/go/codec"
"golang.org/x/net/context/ctxhttp"
"github.com/weaveworks/common/backoff"
"github.com/weaveworks/common/fs"
"github.com/weaveworks/scope/common/xfer"
"github.com/weaveworks/scope/probe/controls"
"github.com/weaveworks/scope/report"
)
// Exposed for testing
var (
transport = makeUnixRoundTripper
maxResponseBytes int64 = 50 * 1024 * 1024
errResponseTooLarge = fmt.Errorf("response must be shorter than 50MB")
validPluginName = regexp.MustCompile("^[A-Za-z0-9]+([-][A-Za-z0-9]+)*$")
)
const (
pluginTimeout = 500 * time.Millisecond
scanningInterval = 5 * time.Second
)
// ReportPublisher is an interface for publishing reports immediately
type ReportPublisher interface {
Publish(rpt report.Report)
}
// Registry maintains a list of available plugins by name.
type Registry struct {
rootPath string
apiVersion string
handshakeMetadata map[string]string
pluginsBySocket map[string]*Plugin
lock sync.RWMutex
context context.Context
cancel context.CancelFunc
controlsByPlugin map[string]report.StringSet
pluginsByID map[string]*Plugin
handlerRegistry *controls.HandlerRegistry
publisher ReportPublisher
}
// NewRegistry creates a new registry which watches the given dir root for new
// plugins, and adds them.
func NewRegistry(rootPath, apiVersion string, handshakeMetadata map[string]string, handlerRegistry *controls.HandlerRegistry, publisher ReportPublisher) (*Registry, error) {
ctx, cancel := context.WithCancel(context.Background())
r := &Registry{
rootPath: rootPath,
apiVersion: apiVersion,
handshakeMetadata: handshakeMetadata,
pluginsBySocket: map[string]*Plugin{},
context: ctx,
cancel: cancel,
controlsByPlugin: map[string]report.StringSet{},
pluginsByID: map[string]*Plugin{},
handlerRegistry: handlerRegistry,
publisher: publisher,
}
if err := r.scan(); err != nil {
r.Close()
return nil, err
}
go r.loop()
return r, nil
}
// loop periodically rescans for plugins
func (r *Registry) loop() {
ticker := time.NewTicker(scanningInterval)
defer ticker.Stop()
for {
select {
case <-r.context.Done():
return
case <-ticker.C:
log.Debugf("plugins: scanning...")
if err := r.scan(); err != nil {
log.Warningf("plugins: error: %v", err)
}
}
}
}
// Rescan the plugins directory, load new plugins, and remove missing plugins
func (r *Registry) scan() error {
sockets, err := r.sockets(r.rootPath)
if err != nil {
return err
}
r.lock.Lock()
defer r.lock.Unlock()
plugins := map[string]*Plugin{}
pluginsByID := map[string]*Plugin{}
// add (or keep) plugins which were found
for _, path := range sockets {
if plugin, ok := r.pluginsBySocket[path]; ok {
plugins[path] = plugin
pluginsByID[plugin.PluginSpec.ID] = plugin
continue
}
tr, err := transport(path, pluginTimeout)
if err != nil {
log.Warningf("plugins: error loading plugin %s: %v", path, err)
continue
}
client := &http.Client{Transport: tr, Timeout: pluginTimeout}
plugin, err := NewPlugin(r.context, path, client, r.apiVersion, r.handshakeMetadata)
if err != nil {
log.Warningf("plugins: error loading plugin %s: %v", path, err)
continue
}
plugins[path] = plugin
pluginsByID[plugin.PluginSpec.ID] = plugin
log.Infof("plugins: added plugin %s", path)
}
// remove plugins which weren't found
pluginsToClose := map[string]*Plugin{}
for path, plugin := range r.pluginsBySocket {
if _, ok := plugins[path]; !ok {
pluginsToClose[plugin.PluginSpec.ID] = plugin
log.Infof("plugins: removed plugin %s", plugin.socket)
}
}
r.closePlugins(pluginsToClose)
r.pluginsBySocket = plugins
r.pluginsByID = pluginsByID
return nil
}
// sockets recursively finds all unix sockets under the path provided
func (r *Registry) sockets(path string) ([]string, error) {
var (
result []string
statT syscall.Stat_t
)
// TODO: use of fs.Stat (which is syscall.Stat) here makes this linux specific.
if err := fs.Stat(path, &statT); err != nil {
return nil, err
}
switch statT.Mode & syscall.S_IFMT {
case syscall.S_IFDIR:
files, err := fs.ReadDir(path)
if err != nil {
return nil, err
}
for _, file := range files {
fpath := filepath.Join(path, file.Name())
s, err := r.sockets(fpath)
if err != nil {
log.Warningf("plugins: error loading path %s: %v", fpath, err)
}
result = append(result, s...)
}
case syscall.S_IFSOCK:
result = append(result, path)
}
return result, nil
}
// forEach walks through all the plugins running f for each one.
func (r *Registry) forEach(lock sync.Locker, f func(p *Plugin)) {
lock.Lock()
defer lock.Unlock()
paths := []string{}
for path := range r.pluginsBySocket {
paths = append(paths, path)
}
sort.Strings(paths)
for _, path := range paths {
f(r.pluginsBySocket[path])
}
}
// ForEach walks through all the plugins running f for each one.
func (r *Registry) ForEach(f func(p *Plugin)) {
r.forEach(r.lock.RLocker(), f)
}
// Implementers walks the available plugins fulfilling the given interface
func (r *Registry) Implementers(iface string, f func(p *Plugin)) {
r.ForEach(func(p *Plugin) {
for _, piface := range p.Interfaces {
if piface == iface {
f(p)
}
}
})
}
// Name implements the Reporter interface
func (r *Registry) Name() string { return "plugins" }
// Report implements the Reporter interface
func (r *Registry) Report() (report.Report, error) {
rpt := report.MakeReport()
// All plugins are assumed to (and must) implement reporter
r.forEach(&r.lock, func(plugin *Plugin) {
pluginReport, err := plugin.Report()
if err != nil {
log.Errorf("plugins: %s: /report error: %v", plugin.socket, err)
}
if plugin.Implements("controller") {
r.updateAndRegisterControlsInReport(&pluginReport)
}
rpt.UnsafeMerge(pluginReport)
})
return rpt, nil
}
func (r *Registry) updateAndRegisterControlsInReport(rpt *report.Report) {
key := rpt.Plugins.Keys()[0]
spec, _ := rpt.Plugins.Lookup(key)
pluginID := spec.ID
var newPluginControls []string
rpt.WalkTopologies(func(topology *report.Topology) {
newPluginControls = append(newPluginControls, r.updateAndGetControlsInTopology(pluginID, topology)...)
})
r.updatePluginControls(pluginID, report.MakeStringSet(newPluginControls...))
}
func (r *Registry) updateAndGetControlsInTopology(pluginID string, topology *report.Topology) []string {
var pluginControls []string
newControls := report.Controls{}
for controlID, control := range topology.Controls {
fakeID := fakeControlID(pluginID, controlID)
log.Debugf("plugins: replacing control %s with %s", controlID, fakeID)
control.ID = fakeID
newControls.AddControl(control)
pluginControls = append(pluginControls, controlID)
}
newNodes := report.Nodes{}
for name, node := range topology.Nodes {
log.Debugf("plugins: checking node controls in node %s of %s", name, topology.Label)
newNode := node.WithID(name)
newLatestControls := []string{}
for _, controlID := range node.ActiveControls() {
log.Debugf("plugins: got node control %s", controlID)
newControlID := ""
if _, found := topology.Controls[controlID]; !found {
log.Debugf("plugins: node control %s does not exist in topology controls", controlID)
newControlID = controlID
} else {
newControlID = fakeControlID(pluginID, controlID)
log.Debugf("plugins: will replace node control %s with %s", controlID, newControlID)
}
newLatestControls = append(newLatestControls, newControlID)
}
newNode = newNode.WithLatestActiveControls(newLatestControls...)
newNodes[newNode.ID] = newNode
}
topology.Controls = newControls
topology.Nodes = newNodes
return pluginControls
}
func (r *Registry) updatePluginControls(pluginID string, newPluginControls report.StringSet) {
oldFakePluginControls := r.fakePluginControls(pluginID)
newFakePluginControls := map[string]xfer.ControlHandlerFunc{}
for _, controlID := range newPluginControls {
newFakePluginControls[fakeControlID(pluginID, controlID)] = r.pluginControlHandler
}
r.handlerRegistry.Batch(oldFakePluginControls, newFakePluginControls)
r.controlsByPlugin[pluginID] = newPluginControls
}
// PluginResponse is an extension of xfer.Response that allows plugins
// to send the shortcut reports
type PluginResponse struct {
xfer.Response
ShortcutReport *report.Report `json:"shortcutReport,omitempty"`
}
func (r *Registry) pluginControlHandler(req xfer.Request) xfer.Response {
pluginID, controlID := realPluginAndControlID(req.Control)
req.Control = controlID
r.lock.RLock()
defer r.lock.RUnlock()
if plugin, found := r.pluginsByID[pluginID]; found {
response := plugin.Control(req)
if response.ShortcutReport != nil {
r.updateAndRegisterControlsInReport(response.ShortcutReport)
response.ShortcutReport.Shortcut = true
r.publisher.Publish(*response.ShortcutReport)
}
return response.Response
}
return xfer.ResponseErrorf("plugin %s not found", pluginID)
}
func realPluginAndControlID(fakeID string) (string, string) {
parts := strings.SplitN(fakeID, "~", 2)
if len(parts) != 2 {
return "", fakeID
}
return parts[0], parts[1]
}
// Close shuts down the registry. It can still be used after this, but will be
// out of date.
func (r *Registry) Close() {
r.cancel()
r.lock.Lock()
defer r.lock.Unlock()
r.closePlugins(r.pluginsByID)
}
func (r *Registry) closePlugins(plugins map[string]*Plugin) {
var toRemove []string
for pluginID, plugin := range plugins {
toRemove = append(toRemove, r.fakePluginControls(pluginID)...)
delete(r.controlsByPlugin, pluginID)
plugin.Close()
}
r.handlerRegistry.Batch(toRemove, nil)
}
func (r *Registry) fakePluginControls(pluginID string) []string {
oldPluginControls := r.controlsByPlugin[pluginID]
var oldFakePluginControls []string
for _, controlID := range oldPluginControls {
oldFakePluginControls = append(oldFakePluginControls, fakeControlID(pluginID, controlID))
}
return oldFakePluginControls
}
func fakeControlID(pluginID, controlID string) string {
return fmt.Sprintf("%s~%s", pluginID, controlID)
}
// Plugin is the implementation of a plugin. It is responsible for doing the
// plugin handshake, gathering reports, etc.
type Plugin struct {
xfer.PluginSpec
context context.Context
socket string
expectedAPIVersion string
handshakeMetadata url.Values
client *http.Client
cancel context.CancelFunc
backoff backoff.Interface
}
// NewPlugin loads and initializes a new plugin. If client is nil,
// http.DefaultClient will be used.
func NewPlugin(ctx context.Context, socket string, client *http.Client, expectedAPIVersion string, handshakeMetadata map[string]string) (*Plugin, error) {
id := strings.TrimSuffix(filepath.Base(socket), filepath.Ext(socket))
if !validPluginName.MatchString(id) {
return nil, fmt.Errorf("invalid plugin id %q", id)
}
params := url.Values{}
for k, v := range handshakeMetadata {
params.Add(k, v)
}
ctx, cancel := context.WithCancel(ctx)
plugin := &Plugin{
PluginSpec: xfer.PluginSpec{ID: id, Label: id},
context: ctx,
socket: socket,
expectedAPIVersion: expectedAPIVersion,
handshakeMetadata: params,
client: client,
cancel: cancel,
}
return plugin, nil
}
// Report gets the latest report from the plugin
func (p *Plugin) Report() (result report.Report, err error) {
result = report.MakeReport()
defer func() {
p.setStatus(err)
result.Plugins = result.Plugins.Add(p.PluginSpec)
if err != nil {
result = report.MakeReport()
result.Plugins = xfer.MakePluginSpecs(p.PluginSpec)
}
}()
if err := p.get("/report", p.handshakeMetadata, &result); err != nil {
return result, err
}
if result.Plugins.Size() != 1 {
return result, fmt.Errorf("report must contain exactly one plugin (found %d)", result.Plugins.Size())
}
key := result.Plugins.Keys()[0]
spec, _ := result.Plugins.Lookup(key)
if spec.ID != p.PluginSpec.ID {
return result, fmt.Errorf("plugin must not change its id (is %q, should be %q)", spec.ID, p.PluginSpec.ID)
}
p.PluginSpec = spec
switch {
case spec.APIVersion != p.expectedAPIVersion:
err = fmt.Errorf("incorrect API version: expected %q, got %q", p.expectedAPIVersion, spec.APIVersion)
case spec.Label == "":
err = fmt.Errorf("spec must contain a label")
case !p.Implements("reporter"):
err = fmt.Errorf("spec must implement the \"reporter\" interface")
}
return result, err
}
// Control sends a control message to a plugin
func (p *Plugin) Control(request xfer.Request) (res PluginResponse) {
var err error
defer func() {
p.setStatus(err)
if err != nil {
res = PluginResponse{Response: xfer.ResponseError(err)}
}
}()
if p.Implements("controller") {
err = p.post("/control", p.handshakeMetadata, request, &res)
} else {
err = fmt.Errorf("the %s plugin does not implement the controller interface", p.PluginSpec.Label)
}
return res
}
// Implements checks if the plugin implements the given interface
func (p *Plugin) Implements(iface string) bool {
for _, i := range p.PluginSpec.Interfaces {
if i == iface {
return true
}
}
return false
}
func (p *Plugin) setStatus(err error) {
if err == nil {
p.Status = "ok"
} else {
p.Status = fmt.Sprintf("error: %v", err)
}
}
func (p *Plugin) get(path string, params url.Values, result interface{}) error {
// Context here lets us either timeout req. or cancel it in Plugin.Close
ctx, cancel := context.WithTimeout(p.context, pluginTimeout)
defer cancel()
resp, err := ctxhttp.Get(ctx, p.client, fmt.Sprintf("http://plugin%s?%s", path, params.Encode()))
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("plugin returned non-200 status code: %s", resp.Status)
}
return getResult(resp.Body, result)
}
func (p *Plugin) post(path string, params url.Values, data interface{}, result interface{}) error {
// Context here lets us either timeout req. or cancel it in Plugin.Close
ctx, cancel := context.WithTimeout(p.context, pluginTimeout)
defer cancel()
buf := &bytes.Buffer{}
if err := codec.NewEncoder(buf, &codec.JsonHandle{}).Encode(data); err != nil {
return fmt.Errorf("encoding error: %s", err)
}
resp, err := ctxhttp.Post(ctx, p.client, fmt.Sprintf("http://plugin%s?%s", path, params.Encode()), "application/json", buf)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("plugin returned non-200 status code: %s", resp.Status)
}
return getResult(resp.Body, result)
}
func getResult(body io.ReadCloser, result interface{}) error {
err := codec.NewDecoder(MaxBytesReader(body, maxResponseBytes, errResponseTooLarge), &codec.JsonHandle{}).Decode(&result)
if err == errResponseTooLarge {
return err
}
if err != nil {
return fmt.Errorf("decoding error: %s", err)
}
return nil
}
// Close closes the client
func (p *Plugin) Close() {
if p.backoff != nil {
p.backoff.Stop()
}
p.cancel()
}