Make control handlers registry an object and extend its functionality

It is not a singleton anymore. Instead it is an object with a registry
backend. The default registry backend is provided, which is equivalent
to what used to be before. Custom backend can be provided for testing
purposes.

The registry also supports batch operations to remove and add handlers
as an atomic step.
This commit is contained in:
Krzesimir Nowak
2016-07-12 23:17:02 +02:00
parent c6b5d98699
commit 41193b428e
13 changed files with 234 additions and 105 deletions

View File

@@ -6,33 +6,126 @@ import (
"github.com/weaveworks/scope/common/xfer"
)
var (
mtx = sync.Mutex{}
handlers = map[string]xfer.ControlHandlerFunc{}
)
// HandlerRegistryBackend is an interface for storing control request
// handlers.
type HandlerRegistryBackend interface {
// Lock locks the backend, so the batch insertions or
// removals can be performed.
Lock()
// Unlock unlocks the registry.
Unlock()
// Register a new control handler under a given
// id. Implementations should not call Lock() or Unlock()
// here, it will be done by HandlerRegistry.
Register(control string, f xfer.ControlHandlerFunc)
// Rm deletes the handler for a given name. Implementations
// should not call Lock() or Unlock() here, it will be done by
// HandlerRegistry.
Rm(control string)
// Handler gets the handler for a control. Implementations
// should not call Lock() or Unlock() here, it will be done by
// HandlerRegistry.
Handler(control string) (xfer.ControlHandlerFunc, bool)
}
type defaultBackend struct {
handlers map[string]xfer.ControlHandlerFunc
mtx sync.Mutex
}
// NewDefaultHandlerRegistryBackend creates a default backend for
// handler registry.
func NewDefaultHandlerRegistryBackend() HandlerRegistryBackend {
return &defaultBackend{
handlers: map[string]xfer.ControlHandlerFunc{},
}
}
// Lock locks the registry, so the batch insertions or
// removals can be performed.
func (b *defaultBackend) Lock() {
b.mtx.Lock()
}
// Unlock unlocks the registry.
func (b *defaultBackend) Unlock() {
b.mtx.Unlock()
}
// Register a new control handler under a given id.
func (b *defaultBackend) Register(control string, f xfer.ControlHandlerFunc) {
b.handlers[control] = f
}
// Rm deletes the handler for a given name.
func (b *defaultBackend) Rm(control string) {
delete(b.handlers, control)
}
// Handler gets the handler for a control.
func (b *defaultBackend) Handler(control string) (xfer.ControlHandlerFunc, bool) {
handler, ok := b.handlers[control]
return handler, ok
}
// HandlerRegistry uses backend for storing and retrieving control
// requests handlers.
type HandlerRegistry struct {
backend HandlerRegistryBackend
}
// NewDefaultHandlerRegistry creates a registry with a default
// backend.
func NewDefaultHandlerRegistry() *HandlerRegistry {
return NewHandlerRegistry(NewDefaultHandlerRegistryBackend())
}
// NewHandlerRegistry creates a registry with a custom backend.
func NewHandlerRegistry(backend HandlerRegistryBackend) *HandlerRegistry {
return &HandlerRegistry{
backend: backend,
}
}
// Register registers a new control handler under a given name.
func (r *HandlerRegistry) Register(control string, f xfer.ControlHandlerFunc) {
r.backend.Lock()
defer r.backend.Unlock()
r.backend.Register(control, f)
}
// Rm deletes the handler for a given name.
func (r *HandlerRegistry) Rm(control string) {
r.backend.Lock()
defer r.backend.Unlock()
r.backend.Rm(control)
}
// Batch first deletes handlers for given names in toRemove then
// registers new handlers for given names in toAdd.
func (r *HandlerRegistry) Batch(toRemove []string, toAdd map[string]xfer.ControlHandlerFunc) {
r.backend.Lock()
defer r.backend.Unlock()
for _, control := range toRemove {
r.backend.Rm(control)
}
for control, handler := range toAdd {
r.backend.Register(control, handler)
}
}
// HandleControlRequest performs a control request.
func HandleControlRequest(req xfer.Request) xfer.Response {
mtx.Lock()
handler, ok := handlers[req.Control]
mtx.Unlock()
func (r *HandlerRegistry) HandleControlRequest(req xfer.Request) xfer.Response {
h, ok := r.handler(req.Control)
if !ok {
return xfer.ResponseErrorf("Control %q not recognised", req.Control)
}
return handler(req)
return h(req)
}
// Register a new control handler under a given id.
func Register(control string, f xfer.ControlHandlerFunc) {
mtx.Lock()
defer mtx.Unlock()
handlers[control] = f
}
// Rm deletes the handler for a given name
func Rm(control string) {
mtx.Lock()
defer mtx.Unlock()
delete(handlers, control)
func (r *HandlerRegistry) handler(control string) (xfer.ControlHandlerFunc, bool) {
r.backend.Lock()
defer r.backend.Unlock()
return r.backend.Handler(control)
}

View File

@@ -10,17 +10,18 @@ import (
)
func TestControls(t *testing.T) {
controls.Register("foo", func(req xfer.Request) xfer.Response {
registry := controls.NewDefaultHandlerRegistry()
registry.Register("foo", func(req xfer.Request) xfer.Response {
return xfer.Response{
Value: "bar",
}
})
defer controls.Rm("foo")
defer registry.Rm("foo")
want := xfer.Response{
Value: "bar",
}
have := controls.HandleControlRequest(xfer.Request{
have := registry.HandleControlRequest(xfer.Request{
Control: "foo",
})
if !reflect.DeepEqual(want, have) {
@@ -29,10 +30,11 @@ func TestControls(t *testing.T) {
}
func TestControlsNotFound(t *testing.T) {
registry := controls.NewDefaultHandlerRegistry()
want := xfer.Response{
Error: "Control \"baz\" not recognised",
}
have := controls.HandleControlRequest(xfer.Request{
have := registry.HandleControlRequest(xfer.Request{
Control: "baz",
})
if !reflect.DeepEqual(want, have) {

View File

@@ -162,23 +162,29 @@ func captureContainerID(f func(string, xfer.Request) xfer.Response) func(xfer.Re
}
func (r *registry) registerControls() {
controls.Register(StopContainer, captureContainerID(r.stopContainer))
controls.Register(StartContainer, captureContainerID(r.startContainer))
controls.Register(RestartContainer, captureContainerID(r.restartContainer))
controls.Register(PauseContainer, captureContainerID(r.pauseContainer))
controls.Register(UnpauseContainer, captureContainerID(r.unpauseContainer))
controls.Register(RemoveContainer, captureContainerID(r.removeContainer))
controls.Register(AttachContainer, captureContainerID(r.attachContainer))
controls.Register(ExecContainer, captureContainerID(r.execContainer))
controls := map[string]xfer.ControlHandlerFunc{
StopContainer: captureContainerID(r.stopContainer),
StartContainer: captureContainerID(r.startContainer),
RestartContainer: captureContainerID(r.restartContainer),
PauseContainer: captureContainerID(r.pauseContainer),
UnpauseContainer: captureContainerID(r.unpauseContainer),
RemoveContainer: captureContainerID(r.removeContainer),
AttachContainer: captureContainerID(r.attachContainer),
ExecContainer: captureContainerID(r.execContainer),
}
r.handlerRegistry.Batch(nil, controls)
}
func (r *registry) deregisterControls() {
controls.Rm(StopContainer)
controls.Rm(StartContainer)
controls.Rm(RestartContainer)
controls.Rm(PauseContainer)
controls.Rm(UnpauseContainer)
controls.Rm(RemoveContainer)
controls.Rm(AttachContainer)
controls.Rm(ExecContainer)
controls := []string{
StopContainer,
StartContainer,
RestartContainer,
PauseContainer,
UnpauseContainer,
RemoveContainer,
AttachContainer,
ExecContainer,
}
r.handlerRegistry.Batch(controls, nil)
}

View File

@@ -16,7 +16,8 @@ import (
func TestControls(t *testing.T) {
mdc := newMockClient()
setupStubs(mdc, func() {
registry, _ := docker.NewRegistry(10*time.Second, nil, false, "")
hr := controls.NewDefaultHandlerRegistry()
registry, _ := docker.NewRegistry(10*time.Second, nil, false, "", hr)
defer registry.Stop()
for _, tc := range []struct{ command, result string }{
@@ -26,7 +27,7 @@ func TestControls(t *testing.T) {
{docker.PauseContainer, "paused"},
{docker.UnpauseContainer, "unpaused"},
} {
result := controls.HandleControlRequest(xfer.Request{
result := hr.HandleControlRequest(xfer.Request{
Control: tc.command,
NodeID: report.MakeContainerNodeID("a1b2c3d4e5"),
})
@@ -56,7 +57,8 @@ func TestPipes(t *testing.T) {
mdc := newMockClient()
setupStubs(mdc, func() {
registry, _ := docker.NewRegistry(10*time.Second, nil, false, "")
hr := controls.NewDefaultHandlerRegistry()
registry, _ := docker.NewRegistry(10*time.Second, nil, false, "", hr)
defer registry.Stop()
test.Poll(t, 100*time.Millisecond, true, func() interface{} {
@@ -68,7 +70,7 @@ func TestPipes(t *testing.T) {
docker.AttachContainer,
docker.ExecContainer,
} {
result := controls.HandleControlRequest(xfer.Request{
result := hr.HandleControlRequest(xfer.Request{
Control: tc,
NodeID: report.MakeContainerNodeID("ping"),
})

View File

@@ -52,12 +52,13 @@ type ContainerUpdateWatcher func(report.Node)
type registry struct {
sync.RWMutex
quit chan chan struct{}
interval time.Duration
collectStats bool
client Client
pipes controls.PipeClient
hostID string
quit chan chan struct{}
interval time.Duration
collectStats bool
client Client
pipes controls.PipeClient
hostID string
handlerRegistry *controls.HandlerRegistry
watchers []ContainerUpdateWatcher
containers *radix.Tree
@@ -91,7 +92,7 @@ func newDockerClient(endpoint string) (Client, error) {
}
// NewRegistry returns a usable Registry. Don't forget to Stop it.
func NewRegistry(interval time.Duration, pipes controls.PipeClient, collectStats bool, hostID string) (Registry, error) {
func NewRegistry(interval time.Duration, pipes controls.PipeClient, collectStats bool, hostID string, handlerRegistry *controls.HandlerRegistry) (Registry, error) {
client, err := NewDockerClientStub(endpoint)
if err != nil {
return nil, err
@@ -102,12 +103,13 @@ func NewRegistry(interval time.Duration, pipes controls.PipeClient, collectStats
containersByPID: map[int]Container{},
images: map[string]docker_client.APIImages{},
client: client,
pipes: pipes,
interval: interval,
collectStats: collectStats,
hostID: hostID,
quit: make(chan chan struct{}),
client: client,
pipes: pipes,
interval: interval,
collectStats: collectStats,
hostID: hostID,
handlerRegistry: handlerRegistry,
quit: make(chan chan struct{}),
}
r.registerControls()

View File

@@ -12,12 +12,19 @@ import (
client "github.com/fsouza/go-dockerclient"
"github.com/weaveworks/scope/common/mtime"
"github.com/weaveworks/scope/probe/controls"
"github.com/weaveworks/scope/probe/docker"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/test"
"github.com/weaveworks/scope/test/reflect"
)
func testRegistry() docker.Registry {
hr := controls.NewDefaultHandlerRegistry()
registry, _ := docker.NewRegistry(10*time.Second, nil, true, "", hr)
return registry
}
type mockContainer struct {
c *client.Container
}
@@ -319,7 +326,7 @@ func allNetworks(r docker.Registry) []client.Network {
func TestRegistry(t *testing.T) {
mdc := newMockClient()
setupStubs(mdc, func() {
registry, _ := docker.NewRegistry(10*time.Second, nil, true, "")
registry := testRegistry()
defer registry.Stop()
runtime.Gosched()
@@ -350,7 +357,7 @@ func TestRegistry(t *testing.T) {
func TestLookupByPID(t *testing.T) {
mdc := newMockClient()
setupStubs(mdc, func() {
registry, _ := docker.NewRegistry(10*time.Second, nil, true, "")
registry := testRegistry()
defer registry.Stop()
want := docker.Container(&mockContainer{container1})
@@ -367,7 +374,7 @@ func TestLookupByPID(t *testing.T) {
func TestRegistryEvents(t *testing.T) {
mdc := newMockClient()
setupStubs(mdc, func() {
registry, _ := docker.NewRegistry(10*time.Second, nil, true, "")
registry := testRegistry()
defer registry.Stop()
runtime.Gosched()
@@ -441,7 +448,7 @@ func TestRegistryDelete(t *testing.T) {
mdc := newMockClient()
setupStubs(mdc, func() {
registry, _ := docker.NewRegistry(10*time.Second, nil, true, "")
registry := testRegistry()
defer registry.Stop()
runtime.Gosched()

View File

@@ -16,11 +16,11 @@ const (
)
func (r *Reporter) registerControls() {
controls.Register(ExecHost, r.execHost)
r.handlerRegistry.Register(ExecHost, r.execHost)
}
func (*Reporter) deregisterControls() {
controls.Rm(ExecHost)
func (r *Reporter) deregisterControls() {
r.handlerRegistry.Rm(ExecHost)
}
func (r *Reporter) execHost(req xfer.Request) xfer.Response {

View File

@@ -52,24 +52,26 @@ var (
// Reporter generates Reports containing the host topology.
type Reporter struct {
hostID string
hostName string
probeID string
version string
pipes controls.PipeClient
hostShellCmd []string
hostID string
hostName string
probeID string
version string
pipes controls.PipeClient
hostShellCmd []string
handlerRegistry *controls.HandlerRegistry
}
// NewReporter returns a Reporter which produces a report containing host
// topology for this host.
func NewReporter(hostID, hostName, probeID, version string, pipes controls.PipeClient) *Reporter {
func NewReporter(hostID, hostName, probeID, version string, pipes controls.PipeClient, handlerRegistry *controls.HandlerRegistry) *Reporter {
r := &Reporter{
hostID: hostID,
hostName: hostName,
probeID: probeID,
pipes: pipes,
version: version,
hostShellCmd: getHostShellCmd(),
hostID: hostID,
hostName: hostName,
probeID: probeID,
pipes: pipes,
version: version,
hostShellCmd: getHostShellCmd(),
handlerRegistry: handlerRegistry,
}
r.registerControls()
return r

View File

@@ -7,6 +7,7 @@ import (
"time"
"github.com/weaveworks/scope/common/mtime"
"github.com/weaveworks/scope/probe/controls"
"github.com/weaveworks/scope/probe/host"
"github.com/weaveworks/scope/report"
)
@@ -55,7 +56,8 @@ func TestReporter(t *testing.T) {
host.GetMemoryUsageBytes = func() (float64, float64) { return 60.0, 100.0 }
host.GetLocalNetworks = func() ([]*net.IPNet, error) { return []*net.IPNet{ipnet}, nil }
rpt, err := host.NewReporter(hostID, hostname, "", "", nil).Report()
hr := controls.NewDefaultHandlerRegistry()
rpt, err := host.NewReporter(hostID, hostname, "", "", nil, hr).Report()
if err != nil {
t.Fatal(err)
}

View File

@@ -144,15 +144,21 @@ func (r *Reporter) ScaleDown(req xfer.Request, resource, namespace, id string) x
}
func (r *Reporter) registerControls() {
controls.Register(GetLogs, r.CapturePod(r.GetLogs))
controls.Register(DeletePod, r.CapturePod(r.deletePod))
controls.Register(ScaleUp, r.CaptureResource(r.ScaleUp))
controls.Register(ScaleDown, r.CaptureResource(r.ScaleDown))
controls := map[string]xfer.ControlHandlerFunc{
GetLogs: r.CapturePod(r.GetLogs),
DeletePod: r.CapturePod(r.deletePod),
ScaleUp: r.CaptureResource(r.ScaleUp),
ScaleDown: r.CaptureResource(r.ScaleDown),
}
r.handlerRegistry.Batch(nil, controls)
}
func (r *Reporter) deregisterControls() {
controls.Rm(GetLogs)
controls.Rm(DeletePod)
controls.Rm(ScaleUp)
controls.Rm(ScaleDown)
controls := []string{
GetLogs,
DeletePod,
ScaleUp,
ScaleDown,
}
r.handlerRegistry.Batch(controls, nil)
}

View File

@@ -89,21 +89,23 @@ var (
// Reporter generate Reports containing Container and ContainerImage topologies
type Reporter struct {
client Client
pipes controls.PipeClient
probeID string
probe *probe.Probe
hostID string
client Client
pipes controls.PipeClient
probeID string
probe *probe.Probe
hostID string
handlerRegistry *controls.HandlerRegistry
}
// NewReporter makes a new Reporter
func NewReporter(client Client, pipes controls.PipeClient, probeID string, hostID string, probe *probe.Probe) *Reporter {
func NewReporter(client Client, pipes controls.PipeClient, probeID string, hostID string, probe *probe.Probe, handlerRegistry *controls.HandlerRegistry) *Reporter {
reporter := &Reporter{
client: client,
pipes: pipes,
probeID: probeID,
probe: probe,
hostID: hostID,
client: client,
pipes: pipes,
probeID: probeID,
probe: probe,
hostID: hostID,
handlerRegistry: handlerRegistry,
}
reporter.registerControls()
client.WatchPods(reporter.podEvent)

View File

@@ -12,6 +12,7 @@ import (
"k8s.io/kubernetes/pkg/types"
"github.com/weaveworks/scope/common/xfer"
"github.com/weaveworks/scope/probe/controls"
"github.com/weaveworks/scope/probe/docker"
"github.com/weaveworks/scope/probe/kubernetes"
"github.com/weaveworks/scope/report"
@@ -184,7 +185,8 @@ func TestReporter(t *testing.T) {
pod1ID := report.MakePodNodeID(pod1UID)
pod2ID := report.MakePodNodeID(pod2UID)
serviceID := report.MakeServiceNodeID(serviceUID)
rpt, _ := kubernetes.NewReporter(newMockClient(), nil, "", "foo", nil).Report()
hr := controls.NewDefaultHandlerRegistry()
rpt, _ := kubernetes.NewReporter(newMockClient(), nil, "", "foo", nil, hr).Report()
// Reporter should have added the following pods
for _, pod := range []struct {
@@ -244,7 +246,8 @@ func TestTagger(t *testing.T) {
docker.LabelPrefix + "io.kubernetes.pod.uid": "123456",
}))
rpt, err := kubernetes.NewReporter(newMockClient(), nil, "", "", nil).Tag(rpt)
hr := controls.NewDefaultHandlerRegistry()
rpt, err := kubernetes.NewReporter(newMockClient(), nil, "", "", nil, hr).Tag(rpt)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
@@ -272,7 +275,8 @@ func TestReporterGetLogs(t *testing.T) {
client := newMockClient()
pipes := mockPipeClient{}
reporter := kubernetes.NewReporter(client, pipes, "", "", nil)
hr := controls.NewDefaultHandlerRegistry()
reporter := kubernetes.NewReporter(client, pipes, "", "", nil, hr)
// Should error on invalid IDs
{

View File

@@ -113,10 +113,11 @@ func probeMain(flags probeFlags) {
ProbeID: probeID,
Insecure: flags.insecure,
}
handlerRegistry := controls.NewDefaultHandlerRegistry()
clientFactory := func(hostname, endpoint string) (appclient.AppClient, error) {
return appclient.NewAppClient(
probeConfig, hostname, endpoint,
xfer.ControlHandlerFunc(controls.HandleControlRequest),
xfer.ControlHandlerFunc(handlerRegistry.HandleControlRequest),
)
}
clients := appclient.NewMultiAppClient(clientFactory, flags.noControls)
@@ -131,7 +132,7 @@ func probeMain(flags probeFlags) {
p := probe.New(flags.spyInterval, flags.publishInterval, clients, flags.noControls)
hostReporter := host.NewReporter(hostID, hostName, probeID, version, clients)
hostReporter := host.NewReporter(hostID, hostName, probeID, version, clients, handlerRegistry)
defer hostReporter.Stop()
p.AddReporter(hostReporter)
p.AddTagger(probe.NewTopologyTagger(), host.NewTagger(hostID))
@@ -157,7 +158,7 @@ func probeMain(flags probeFlags) {
log.Errorf("Docker: problem with bridge %s: %v", flags.dockerBridge, err)
}
}
if registry, err := docker.NewRegistry(flags.dockerInterval, clients, true, hostID); err == nil {
if registry, err := docker.NewRegistry(flags.dockerInterval, clients, true, hostID, handlerRegistry); err == nil {
defer registry.Stop()
if flags.procEnabled {
p.AddTagger(docker.NewTagger(registry, processCache))
@@ -171,7 +172,7 @@ func probeMain(flags probeFlags) {
if flags.kubernetesEnabled {
if client, err := kubernetes.NewClient(flags.kubernetesAPI, flags.kubernetesInterval); err == nil {
defer client.Stop()
reporter := kubernetes.NewReporter(client, clients, probeID, hostID, p)
reporter := kubernetes.NewReporter(client, clients, probeID, hostID, p, handlerRegistry)
defer reporter.Stop()
p.AddReporter(reporter)
p.AddTagger(reporter)