diff --git a/probe/appclient/app_client.go b/probe/appclient/app_client.go index c2fec8b2c..31837093b 100644 --- a/probe/appclient/app_client.go +++ b/probe/appclient/app_client.go @@ -75,7 +75,7 @@ func NewAppClient(pc ProbeConfig, hostname, target string, control xfer.ControlH TLSClientConfig: httpTransport.TLSClientConfig, }, conns: map[string]xfer.Websocket{}, - readers: make(chan io.Reader), + readers: make(chan io.Reader, 2), control: control, }, nil } @@ -273,6 +273,7 @@ func (c *appClient) Publish(r io.Reader) error { select { case c.readers <- r: default: + log.Errorf("Dropping report to %s", c.target) } return nil } diff --git a/probe/docker/controls.go b/probe/docker/controls.go index de2f4fe30..8d681b3e6 100644 --- a/probe/docker/controls.go +++ b/probe/docker/controls.go @@ -49,7 +49,7 @@ func (r *registry) unpauseContainer(containerID string, _ xfer.Request) xfer.Res return xfer.ResponseError(r.client.UnpauseContainer(containerID)) } -func (r *registry) removeContainer(containerID string, _ xfer.Request) xfer.Response { +func (r *registry) removeContainer(containerID string, req xfer.Request) xfer.Response { log.Infof("Removing container %s", containerID) if err := r.client.RemoveContainer(docker_client.RemoveContainerOptions{ ID: containerID, @@ -57,7 +57,7 @@ func (r *registry) removeContainer(containerID string, _ xfer.Request) xfer.Resp return xfer.ResponseError(err) } return xfer.Response{ - RemovedNode: containerID, + RemovedNode: req.NodeID, } } diff --git a/probe/kubernetes/client.go b/probe/kubernetes/client.go index 7bcafa5b1..2b6129f0c 100644 --- a/probe/kubernetes/client.go +++ b/probe/kubernetes/client.go @@ -3,6 +3,7 @@ package kubernetes import ( "io" "strconv" + "sync" "time" log "github.com/Sirupsen/logrus" @@ -26,7 +27,11 @@ type Client interface { WalkPods(f func(Pod) error) error WalkServices(f func(Service) error) error WalkNodes(f func(*api.Node) error) error + + WatchPods(f func(Event, Pod)) + GetLogs(namespaceID, podID string) (io.ReadCloser, error) + DeletePod(namespaceID, podID string) error } type client struct { @@ -38,6 +43,9 @@ type client struct { podStore *cache.StoreToPodLister serviceStore *cache.StoreToServiceLister nodeStore *cache.StoreToNodeLister + + podWatchesMutex sync.Mutex + podWatches []func(Event, Pod) } // runReflectorUntil is equivalent to cache.Reflector.RunUntil, but it also logs @@ -72,33 +80,44 @@ func NewClient(addr string, resyncPeriod time.Duration) (Client, error) { return nil, err } + result := &client{ + quit: make(chan struct{}), + client: c, + } + podListWatch := cache.NewListWatchFromClient(c, "pods", api.NamespaceAll, fields.Everything()) - podStore := cache.NewStore(cache.MetaNamespaceKeyFunc) - podReflector := cache.NewReflector(podListWatch, &api.Pod{}, podStore, resyncPeriod) + podStore := NewEventStore(result.triggerPodWatches, cache.MetaNamespaceKeyFunc) + result.podStore = &cache.StoreToPodLister{Store: podStore} + result.podReflector = cache.NewReflector(podListWatch, &api.Pod{}, podStore, resyncPeriod) serviceListWatch := cache.NewListWatchFromClient(c, "services", api.NamespaceAll, fields.Everything()) serviceStore := cache.NewStore(cache.MetaNamespaceKeyFunc) - serviceReflector := cache.NewReflector(serviceListWatch, &api.Service{}, serviceStore, resyncPeriod) + result.serviceStore = &cache.StoreToServiceLister{Store: serviceStore} + result.serviceReflector = cache.NewReflector(serviceListWatch, &api.Service{}, serviceStore, resyncPeriod) nodeListWatch := cache.NewListWatchFromClient(c, "nodes", api.NamespaceAll, fields.Everything()) nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc) - nodeReflector := cache.NewReflector(nodeListWatch, &api.Node{}, nodeStore, resyncPeriod) + result.nodeStore = &cache.StoreToNodeLister{Store: nodeStore} + result.nodeReflector = cache.NewReflector(nodeListWatch, &api.Node{}, nodeStore, resyncPeriod) - quit := make(chan struct{}) - runReflectorUntil(podReflector, resyncPeriod, quit) - runReflectorUntil(serviceReflector, resyncPeriod, quit) - runReflectorUntil(nodeReflector, resyncPeriod, quit) + runReflectorUntil(result.podReflector, resyncPeriod, result.quit) + runReflectorUntil(result.serviceReflector, resyncPeriod, result.quit) + runReflectorUntil(result.nodeReflector, resyncPeriod, result.quit) + return result, nil +} - return &client{ - quit: quit, - client: c, - podReflector: podReflector, - podStore: &cache.StoreToPodLister{Store: podStore}, - serviceReflector: serviceReflector, - serviceStore: &cache.StoreToServiceLister{Store: serviceStore}, - nodeReflector: nodeReflector, - nodeStore: &cache.StoreToNodeLister{Store: nodeStore}, - }, nil +func (c *client) WatchPods(f func(Event, Pod)) { + c.podWatchesMutex.Lock() + defer c.podWatchesMutex.Unlock() + c.podWatches = append(c.podWatches, f) +} + +func (c *client) triggerPodWatches(e Event, pod interface{}) { + c.podWatchesMutex.Lock() + defer c.podWatchesMutex.Unlock() + for _, watch := range c.podWatches { + watch(e, NewPod(pod.(*api.Pod))) + } } func (c *client) WalkPods(f func(Pod) error) error { @@ -152,6 +171,13 @@ func (c *client) GetLogs(namespaceID, podID string) (io.ReadCloser, error) { Stream() } +func (c *client) DeletePod(namespaceID, podID string) error { + return c.client.RESTClient.Delete(). + Namespace(namespaceID). + Name(podID). + Resource("pods").Do().Error() +} + func (c *client) Stop() { close(c.quit) } diff --git a/probe/kubernetes/controls.go b/probe/kubernetes/controls.go index ef1b9ad6f..df139bf42 100644 --- a/probe/kubernetes/controls.go +++ b/probe/kubernetes/controls.go @@ -11,16 +11,12 @@ import ( // Control IDs used by the kubernetes integration. const ( - GetLogs = "kubernetes_get_logs" + GetLogs = "kubernetes_get_logs" + DeletePod = "kubernetes_delete_pod" ) // GetLogs is the control to get the logs for a kubernetes pod -func (r *Reporter) GetLogs(req xfer.Request) xfer.Response { - namespaceID, podID, ok := report.ParsePodNodeID(req.NodeID) - if !ok { - return xfer.ResponseErrorf("Invalid ID: %s", req.NodeID) - } - +func (r *Reporter) GetLogs(req xfer.Request, namespaceID, podID string) xfer.Response { readCloser, err := r.client.GetLogs(namespaceID, podID) if err != nil { return xfer.ResponseError(err) @@ -45,10 +41,32 @@ func (r *Reporter) GetLogs(req xfer.Request) xfer.Response { } } +func (r *Reporter) deletePod(req xfer.Request, namespaceID, podID string) xfer.Response { + if err := r.client.DeletePod(namespaceID, podID); err != nil { + return xfer.ResponseError(err) + } + return xfer.Response{ + RemovedNode: req.NodeID, + } +} + +// CapturePod is exported for testing +func CapturePod(f func(xfer.Request, string, string) xfer.Response) func(xfer.Request) xfer.Response { + return func(req xfer.Request) xfer.Response { + namespaceID, podID, ok := report.ParsePodNodeID(req.NodeID) + if !ok { + return xfer.ResponseErrorf("Invalid ID: %s", req.NodeID) + } + return f(req, namespaceID, podID) + } +} + func (r *Reporter) registerControls() { - controls.Register(GetLogs, r.GetLogs) + controls.Register(GetLogs, CapturePod(r.GetLogs)) + controls.Register(DeletePod, CapturePod(r.deletePod)) } func (r *Reporter) deregisterControls() { controls.Rm(GetLogs) + controls.Rm(DeletePod) } diff --git a/probe/kubernetes/pod.go b/probe/kubernetes/pod.go index 09a57f382..badea86c7 100644 --- a/probe/kubernetes/pod.go +++ b/probe/kubernetes/pod.go @@ -18,6 +18,8 @@ const ( PodState = "kubernetes_pod_state" PodLabelPrefix = "kubernetes_pod_labels_" ServiceIDs = "kubernetes_service_ids" + + StateDeleted = "deleted" ) // Pod represents a Kubernetes pod @@ -107,6 +109,6 @@ func (p *pod) GetNode(probeID string) report.Node { ) } n = n.AddTable(PodLabelPrefix, p.ObjectMeta.Labels) - n = n.WithControls(GetLogs) + n = n.WithControls(GetLogs, DeletePod) return n } diff --git a/probe/kubernetes/reporter.go b/probe/kubernetes/reporter.go index 7010070e1..ad47c0530 100644 --- a/probe/kubernetes/reporter.go +++ b/probe/kubernetes/reporter.go @@ -8,6 +8,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/labels" + "github.com/weaveworks/scope/probe" "github.com/weaveworks/scope/probe/controls" "github.com/weaveworks/scope/probe/docker" "github.com/weaveworks/scope/report" @@ -44,16 +45,19 @@ type Reporter struct { client Client pipes controls.PipeClient probeID string + probe *probe.Probe } // NewReporter makes a new Reporter -func NewReporter(client Client, pipes controls.PipeClient, probeID string) *Reporter { +func NewReporter(client Client, pipes controls.PipeClient, probeID string, probe *probe.Probe) *Reporter { reporter := &Reporter{ client: client, pipes: pipes, probeID: probeID, + probe: probe, } reporter.registerControls() + client.WatchPods(reporter.podEvent) return reporter } @@ -65,6 +69,26 @@ func (r *Reporter) Stop() { // Name of this reporter, for metrics gathering func (Reporter) Name() string { return "K8s" } +func (r *Reporter) podEvent(e Event, pod Pod) { + switch e { + case ADD: + rpt := report.MakeReport() + rpt.Shortcut = true + rpt.Pod.AddNode(pod.GetNode(r.probeID)) + r.probe.Publish(rpt) + case DELETE: + rpt := report.MakeReport() + rpt.Shortcut = true + rpt.Pod.AddNode( + report.MakeNodeWith( + report.MakePodNodeID(pod.Namespace(), pod.Name()), + map[string]string{PodState: StateDeleted}, + ), + ) + r.probe.Publish(rpt) + } +} + // Report generates a Report containing Container and ContainerImage topologies func (r *Reporter) Report() (report.Report, error) { result := report.MakeReport() @@ -132,6 +156,12 @@ func (r *Reporter) podTopology(services []Service) (report.Topology, report.Topo Icon: "fa-desktop", Rank: 0, }) + pods.Controls.AddControl(report.Control{ + ID: DeletePod, + Human: "Delete", + Icon: "fa-trash-o", + Rank: 1, + }) for _, service := range services { selectors[service.ID()] = service.Selector() } diff --git a/probe/kubernetes/reporter_test.go b/probe/kubernetes/reporter_test.go index a3610e744..ec6d26d01 100644 --- a/probe/kubernetes/reporter_test.go +++ b/probe/kubernetes/reporter_test.go @@ -125,6 +125,7 @@ func (c *mockClient) WalkServices(f func(kubernetes.Service) error) error { func (*mockClient) WalkNodes(f func(*api.Node) error) error { return nil } +func (*mockClient) WatchPods(func(kubernetes.Event, kubernetes.Pod)) {} func (c *mockClient) GetLogs(namespaceID, podName string) (io.ReadCloser, error) { r, ok := c.logs[report.MakePodNodeID(namespaceID, podName)] if !ok { @@ -132,6 +133,9 @@ func (c *mockClient) GetLogs(namespaceID, podName string) (io.ReadCloser, error) } return r, nil } +func (c *mockClient) DeletePod(namespaceID, podID string) error { + return nil +} type mockPipeClient map[string]xfer.Pipe @@ -156,7 +160,7 @@ func TestReporter(t *testing.T) { pod1ID := report.MakePodNodeID("ping", "pong-a") pod2ID := report.MakePodNodeID("ping", "pong-b") serviceID := report.MakeServiceNodeID("ping", "pongservice") - rpt, _ := kubernetes.NewReporter(newMockClient(), nil, "").Report() + rpt, _ := kubernetes.NewReporter(newMockClient(), nil, "", nil).Report() // Reporter should have added the following pods for _, pod := range []struct { @@ -261,11 +265,11 @@ func TestReporterGetLogs(t *testing.T) { client := newMockClient() pipes := mockPipeClient{} - reporter := kubernetes.NewReporter(client, pipes, "") + reporter := kubernetes.NewReporter(client, pipes, "", nil) // Should error on invalid IDs { - resp := reporter.GetLogs(xfer.Request{ + resp := kubernetes.CapturePod(reporter.GetLogs)(xfer.Request{ NodeID: "invalidID", Control: kubernetes.GetLogs, }) @@ -276,7 +280,7 @@ func TestReporterGetLogs(t *testing.T) { // Should pass through errors from k8s (e.g if pod does not exist) { - resp := reporter.GetLogs(xfer.Request{ + resp := kubernetes.CapturePod(reporter.GetLogs)(xfer.Request{ AppID: "appID", NodeID: report.MakePodNodeID("not", "found"), Control: kubernetes.GetLogs, @@ -302,7 +306,7 @@ func TestReporterGetLogs(t *testing.T) { }} // Should create a new pipe for the stream - resp := reporter.GetLogs(pod1Request) + resp := kubernetes.CapturePod(reporter.GetLogs)(pod1Request) if resp.Pipe == "" { t.Errorf("Expected pipe id to be returned, but got %#v", resp) } diff --git a/probe/kubernetes/service.go b/probe/kubernetes/service.go index d2bd38dfa..99886303d 100644 --- a/probe/kubernetes/service.go +++ b/probe/kubernetes/service.go @@ -66,5 +66,9 @@ func (s *service) GetNode() report.Node { if s.Spec.LoadBalancerIP != "" { latest[ServicePublicIP] = s.Spec.LoadBalancerIP } - return report.MakeNodeWith(report.MakeServiceNodeID(s.Namespace(), s.Name()), latest).AddTable(ServiceLabelPrefix, s.Labels) + return report.MakeNodeWith( + report.MakeServiceNodeID(s.Namespace(), s.Name()), + latest, + ). + AddTable(ServiceLabelPrefix, s.Labels) } diff --git a/probe/kubernetes/store.go b/probe/kubernetes/store.go new file mode 100644 index 000000000..56f5ae609 --- /dev/null +++ b/probe/kubernetes/store.go @@ -0,0 +1,94 @@ +package kubernetes + +import ( + "sync" + + "k8s.io/kubernetes/pkg/client/cache" +) + +// Event type is an enum of ADD, UPDATE and DELETE +type Event int + +// Watch type is for callbacks when somethings happens to the store. +type Watch func(Event, interface{}) + +// Event enum values. +const ( + ADD Event = iota + UPDATE + DELETE +) + +type eventStore struct { + mtx sync.Mutex + watch Watch + keyFunc cache.KeyFunc + cache.Store +} + +// NewEventStore creates a new Store which triggers watch whenever +// an object is added, removed or updated. +func NewEventStore(watch Watch, keyFunc cache.KeyFunc) cache.Store { + return &eventStore{ + keyFunc: keyFunc, + watch: watch, + Store: cache.NewStore(keyFunc), + } +} + +func (e *eventStore) Add(o interface{}) error { + e.mtx.Lock() + defer e.mtx.Unlock() + e.watch(ADD, o) + return e.Store.Add(o) +} + +func (e *eventStore) Update(o interface{}) error { + e.mtx.Lock() + defer e.mtx.Unlock() + e.watch(UPDATE, o) + return e.Store.Update(o) +} + +func (e *eventStore) Delete(o interface{}) error { + e.mtx.Lock() + defer e.mtx.Unlock() + e.watch(DELETE, o) + return e.Store.Delete(o) +} + +func (e *eventStore) Replace(os []interface{}, ver string) error { + e.mtx.Lock() + defer e.mtx.Unlock() + + indexed := map[string]interface{}{} + for _, o := range os { + key, err := e.keyFunc(o) + if err != nil { + return err + } + indexed[key] = o + } + + existing := map[string]interface{}{} + for _, o := range e.Store.List() { + key, err := e.keyFunc(o) + if err != nil { + return err + } + existing[key] = o + if _, ok := indexed[key]; !ok { + e.watch(DELETE, o) + } + } + + for key, o := range indexed { + if _, ok := existing[key]; !ok { + e.watch(ADD, o) + } else { + e.watch(UPDATE, o) + } + } + + return e.Store.Replace(os, ver) +} diff --git a/prog/probe.go b/prog/probe.go index f18768d55..fdf987458 100644 --- a/prog/probe.go +++ b/prog/probe.go @@ -149,7 +149,7 @@ func probeMain(flags probeFlags) { if flags.kubernetesEnabled { if client, err := kubernetes.NewClient(flags.kubernetesAPI, flags.kubernetesInterval); err == nil { defer client.Stop() - reporter := kubernetes.NewReporter(client, clients, probeID) + reporter := kubernetes.NewReporter(client, clients, probeID, p) defer reporter.Stop() p.AddReporter(reporter) } else { diff --git a/render/pod.go b/render/pod.go index 623eea80e..c3cc7e77f 100644 --- a/render/pod.go +++ b/render/pod.go @@ -15,7 +15,12 @@ const ( // PodRenderer is a Renderer which produces a renderable kubernetes // graph by merging the container graph and the pods topology. -var PodRenderer = FilterEmpty(report.Container, +var PodRenderer = MakeFilter( + func(n report.Node) bool { + // Drop deleted or empty pods + state, ok := n.Latest.Lookup(kubernetes.PodState) + return HasChildren(report.Container)(n) && (!ok || state != kubernetes.StateDeleted) + }, MakeReduce( MakeFilter( func(n report.Node) bool {