Merge pull request #1368 from weaveworks/k8s-delete-pod-service

Add pod delete controls
This commit is contained in:
Paul Bellamy
2016-04-29 11:02:55 +01:00
11 changed files with 223 additions and 39 deletions

View File

@@ -75,7 +75,7 @@ func NewAppClient(pc ProbeConfig, hostname, target string, control xfer.ControlH
TLSClientConfig: httpTransport.TLSClientConfig,
},
conns: map[string]xfer.Websocket{},
readers: make(chan io.Reader),
readers: make(chan io.Reader, 2),
control: control,
}, nil
}
@@ -273,6 +273,7 @@ func (c *appClient) Publish(r io.Reader) error {
select {
case c.readers <- r:
default:
log.Errorf("Dropping report to %s", c.target)
}
return nil
}

View File

@@ -49,7 +49,7 @@ func (r *registry) unpauseContainer(containerID string, _ xfer.Request) xfer.Res
return xfer.ResponseError(r.client.UnpauseContainer(containerID))
}
func (r *registry) removeContainer(containerID string, _ xfer.Request) xfer.Response {
func (r *registry) removeContainer(containerID string, req xfer.Request) xfer.Response {
log.Infof("Removing container %s", containerID)
if err := r.client.RemoveContainer(docker_client.RemoveContainerOptions{
ID: containerID,
@@ -57,7 +57,7 @@ func (r *registry) removeContainer(containerID string, _ xfer.Request) xfer.Resp
return xfer.ResponseError(err)
}
return xfer.Response{
RemovedNode: containerID,
RemovedNode: req.NodeID,
}
}

View File

@@ -3,6 +3,7 @@ package kubernetes
import (
"io"
"strconv"
"sync"
"time"
log "github.com/Sirupsen/logrus"
@@ -26,7 +27,11 @@ type Client interface {
WalkPods(f func(Pod) error) error
WalkServices(f func(Service) error) error
WalkNodes(f func(*api.Node) error) error
WatchPods(f func(Event, Pod))
GetLogs(namespaceID, podID string) (io.ReadCloser, error)
DeletePod(namespaceID, podID string) error
}
type client struct {
@@ -38,6 +43,9 @@ type client struct {
podStore *cache.StoreToPodLister
serviceStore *cache.StoreToServiceLister
nodeStore *cache.StoreToNodeLister
podWatchesMutex sync.Mutex
podWatches []func(Event, Pod)
}
// runReflectorUntil is equivalent to cache.Reflector.RunUntil, but it also logs
@@ -72,33 +80,44 @@ func NewClient(addr string, resyncPeriod time.Duration) (Client, error) {
return nil, err
}
result := &client{
quit: make(chan struct{}),
client: c,
}
podListWatch := cache.NewListWatchFromClient(c, "pods", api.NamespaceAll, fields.Everything())
podStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
podReflector := cache.NewReflector(podListWatch, &api.Pod{}, podStore, resyncPeriod)
podStore := NewEventStore(result.triggerPodWatches, cache.MetaNamespaceKeyFunc)
result.podStore = &cache.StoreToPodLister{Store: podStore}
result.podReflector = cache.NewReflector(podListWatch, &api.Pod{}, podStore, resyncPeriod)
serviceListWatch := cache.NewListWatchFromClient(c, "services", api.NamespaceAll, fields.Everything())
serviceStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
serviceReflector := cache.NewReflector(serviceListWatch, &api.Service{}, serviceStore, resyncPeriod)
result.serviceStore = &cache.StoreToServiceLister{Store: serviceStore}
result.serviceReflector = cache.NewReflector(serviceListWatch, &api.Service{}, serviceStore, resyncPeriod)
nodeListWatch := cache.NewListWatchFromClient(c, "nodes", api.NamespaceAll, fields.Everything())
nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
nodeReflector := cache.NewReflector(nodeListWatch, &api.Node{}, nodeStore, resyncPeriod)
result.nodeStore = &cache.StoreToNodeLister{Store: nodeStore}
result.nodeReflector = cache.NewReflector(nodeListWatch, &api.Node{}, nodeStore, resyncPeriod)
quit := make(chan struct{})
runReflectorUntil(podReflector, resyncPeriod, quit)
runReflectorUntil(serviceReflector, resyncPeriod, quit)
runReflectorUntil(nodeReflector, resyncPeriod, quit)
runReflectorUntil(result.podReflector, resyncPeriod, result.quit)
runReflectorUntil(result.serviceReflector, resyncPeriod, result.quit)
runReflectorUntil(result.nodeReflector, resyncPeriod, result.quit)
return result, nil
}
return &client{
quit: quit,
client: c,
podReflector: podReflector,
podStore: &cache.StoreToPodLister{Store: podStore},
serviceReflector: serviceReflector,
serviceStore: &cache.StoreToServiceLister{Store: serviceStore},
nodeReflector: nodeReflector,
nodeStore: &cache.StoreToNodeLister{Store: nodeStore},
}, nil
func (c *client) WatchPods(f func(Event, Pod)) {
c.podWatchesMutex.Lock()
defer c.podWatchesMutex.Unlock()
c.podWatches = append(c.podWatches, f)
}
func (c *client) triggerPodWatches(e Event, pod interface{}) {
c.podWatchesMutex.Lock()
defer c.podWatchesMutex.Unlock()
for _, watch := range c.podWatches {
watch(e, NewPod(pod.(*api.Pod)))
}
}
func (c *client) WalkPods(f func(Pod) error) error {
@@ -152,6 +171,13 @@ func (c *client) GetLogs(namespaceID, podID string) (io.ReadCloser, error) {
Stream()
}
func (c *client) DeletePod(namespaceID, podID string) error {
return c.client.RESTClient.Delete().
Namespace(namespaceID).
Name(podID).
Resource("pods").Do().Error()
}
func (c *client) Stop() {
close(c.quit)
}

View File

@@ -11,16 +11,12 @@ import (
// Control IDs used by the kubernetes integration.
const (
GetLogs = "kubernetes_get_logs"
GetLogs = "kubernetes_get_logs"
DeletePod = "kubernetes_delete_pod"
)
// GetLogs is the control to get the logs for a kubernetes pod
func (r *Reporter) GetLogs(req xfer.Request) xfer.Response {
namespaceID, podID, ok := report.ParsePodNodeID(req.NodeID)
if !ok {
return xfer.ResponseErrorf("Invalid ID: %s", req.NodeID)
}
func (r *Reporter) GetLogs(req xfer.Request, namespaceID, podID string) xfer.Response {
readCloser, err := r.client.GetLogs(namespaceID, podID)
if err != nil {
return xfer.ResponseError(err)
@@ -45,10 +41,32 @@ func (r *Reporter) GetLogs(req xfer.Request) xfer.Response {
}
}
func (r *Reporter) deletePod(req xfer.Request, namespaceID, podID string) xfer.Response {
if err := r.client.DeletePod(namespaceID, podID); err != nil {
return xfer.ResponseError(err)
}
return xfer.Response{
RemovedNode: req.NodeID,
}
}
// CapturePod is exported for testing
func CapturePod(f func(xfer.Request, string, string) xfer.Response) func(xfer.Request) xfer.Response {
return func(req xfer.Request) xfer.Response {
namespaceID, podID, ok := report.ParsePodNodeID(req.NodeID)
if !ok {
return xfer.ResponseErrorf("Invalid ID: %s", req.NodeID)
}
return f(req, namespaceID, podID)
}
}
func (r *Reporter) registerControls() {
controls.Register(GetLogs, r.GetLogs)
controls.Register(GetLogs, CapturePod(r.GetLogs))
controls.Register(DeletePod, CapturePod(r.deletePod))
}
func (r *Reporter) deregisterControls() {
controls.Rm(GetLogs)
controls.Rm(DeletePod)
}

View File

@@ -18,6 +18,8 @@ const (
PodState = "kubernetes_pod_state"
PodLabelPrefix = "kubernetes_pod_labels_"
ServiceIDs = "kubernetes_service_ids"
StateDeleted = "deleted"
)
// Pod represents a Kubernetes pod
@@ -107,6 +109,6 @@ func (p *pod) GetNode(probeID string) report.Node {
)
}
n = n.AddTable(PodLabelPrefix, p.ObjectMeta.Labels)
n = n.WithControls(GetLogs)
n = n.WithControls(GetLogs, DeletePod)
return n
}

View File

@@ -8,6 +8,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/labels"
"github.com/weaveworks/scope/probe"
"github.com/weaveworks/scope/probe/controls"
"github.com/weaveworks/scope/probe/docker"
"github.com/weaveworks/scope/report"
@@ -44,16 +45,19 @@ type Reporter struct {
client Client
pipes controls.PipeClient
probeID string
probe *probe.Probe
}
// NewReporter makes a new Reporter
func NewReporter(client Client, pipes controls.PipeClient, probeID string) *Reporter {
func NewReporter(client Client, pipes controls.PipeClient, probeID string, probe *probe.Probe) *Reporter {
reporter := &Reporter{
client: client,
pipes: pipes,
probeID: probeID,
probe: probe,
}
reporter.registerControls()
client.WatchPods(reporter.podEvent)
return reporter
}
@@ -65,6 +69,26 @@ func (r *Reporter) Stop() {
// Name of this reporter, for metrics gathering
func (Reporter) Name() string { return "K8s" }
func (r *Reporter) podEvent(e Event, pod Pod) {
switch e {
case ADD:
rpt := report.MakeReport()
rpt.Shortcut = true
rpt.Pod.AddNode(pod.GetNode(r.probeID))
r.probe.Publish(rpt)
case DELETE:
rpt := report.MakeReport()
rpt.Shortcut = true
rpt.Pod.AddNode(
report.MakeNodeWith(
report.MakePodNodeID(pod.Namespace(), pod.Name()),
map[string]string{PodState: StateDeleted},
),
)
r.probe.Publish(rpt)
}
}
// Report generates a Report containing Container and ContainerImage topologies
func (r *Reporter) Report() (report.Report, error) {
result := report.MakeReport()
@@ -132,6 +156,12 @@ func (r *Reporter) podTopology(services []Service) (report.Topology, report.Topo
Icon: "fa-desktop",
Rank: 0,
})
pods.Controls.AddControl(report.Control{
ID: DeletePod,
Human: "Delete",
Icon: "fa-trash-o",
Rank: 1,
})
for _, service := range services {
selectors[service.ID()] = service.Selector()
}

View File

@@ -125,6 +125,7 @@ func (c *mockClient) WalkServices(f func(kubernetes.Service) error) error {
func (*mockClient) WalkNodes(f func(*api.Node) error) error {
return nil
}
func (*mockClient) WatchPods(func(kubernetes.Event, kubernetes.Pod)) {}
func (c *mockClient) GetLogs(namespaceID, podName string) (io.ReadCloser, error) {
r, ok := c.logs[report.MakePodNodeID(namespaceID, podName)]
if !ok {
@@ -132,6 +133,9 @@ func (c *mockClient) GetLogs(namespaceID, podName string) (io.ReadCloser, error)
}
return r, nil
}
func (c *mockClient) DeletePod(namespaceID, podID string) error {
return nil
}
type mockPipeClient map[string]xfer.Pipe
@@ -156,7 +160,7 @@ func TestReporter(t *testing.T) {
pod1ID := report.MakePodNodeID("ping", "pong-a")
pod2ID := report.MakePodNodeID("ping", "pong-b")
serviceID := report.MakeServiceNodeID("ping", "pongservice")
rpt, _ := kubernetes.NewReporter(newMockClient(), nil, "").Report()
rpt, _ := kubernetes.NewReporter(newMockClient(), nil, "", nil).Report()
// Reporter should have added the following pods
for _, pod := range []struct {
@@ -261,11 +265,11 @@ func TestReporterGetLogs(t *testing.T) {
client := newMockClient()
pipes := mockPipeClient{}
reporter := kubernetes.NewReporter(client, pipes, "")
reporter := kubernetes.NewReporter(client, pipes, "", nil)
// Should error on invalid IDs
{
resp := reporter.GetLogs(xfer.Request{
resp := kubernetes.CapturePod(reporter.GetLogs)(xfer.Request{
NodeID: "invalidID",
Control: kubernetes.GetLogs,
})
@@ -276,7 +280,7 @@ func TestReporterGetLogs(t *testing.T) {
// Should pass through errors from k8s (e.g if pod does not exist)
{
resp := reporter.GetLogs(xfer.Request{
resp := kubernetes.CapturePod(reporter.GetLogs)(xfer.Request{
AppID: "appID",
NodeID: report.MakePodNodeID("not", "found"),
Control: kubernetes.GetLogs,
@@ -302,7 +306,7 @@ func TestReporterGetLogs(t *testing.T) {
}}
// Should create a new pipe for the stream
resp := reporter.GetLogs(pod1Request)
resp := kubernetes.CapturePod(reporter.GetLogs)(pod1Request)
if resp.Pipe == "" {
t.Errorf("Expected pipe id to be returned, but got %#v", resp)
}

View File

@@ -66,5 +66,9 @@ func (s *service) GetNode() report.Node {
if s.Spec.LoadBalancerIP != "" {
latest[ServicePublicIP] = s.Spec.LoadBalancerIP
}
return report.MakeNodeWith(report.MakeServiceNodeID(s.Namespace(), s.Name()), latest).AddTable(ServiceLabelPrefix, s.Labels)
return report.MakeNodeWith(
report.MakeServiceNodeID(s.Namespace(), s.Name()),
latest,
).
AddTable(ServiceLabelPrefix, s.Labels)
}

94
probe/kubernetes/store.go Normal file
View File

@@ -0,0 +1,94 @@
package kubernetes
import (
"sync"
"k8s.io/kubernetes/pkg/client/cache"
)
// Event type is an enum of ADD, UPDATE and DELETE
type Event int
// Watch type is for callbacks when somethings happens to the store.
type Watch func(Event, interface{})
// Event enum values.
const (
ADD Event = iota
UPDATE
DELETE
)
type eventStore struct {
mtx sync.Mutex
watch Watch
keyFunc cache.KeyFunc
cache.Store
}
// NewEventStore creates a new Store which triggers watch whenever
// an object is added, removed or updated.
func NewEventStore(watch Watch, keyFunc cache.KeyFunc) cache.Store {
return &eventStore{
keyFunc: keyFunc,
watch: watch,
Store: cache.NewStore(keyFunc),
}
}
func (e *eventStore) Add(o interface{}) error {
e.mtx.Lock()
defer e.mtx.Unlock()
e.watch(ADD, o)
return e.Store.Add(o)
}
func (e *eventStore) Update(o interface{}) error {
e.mtx.Lock()
defer e.mtx.Unlock()
e.watch(UPDATE, o)
return e.Store.Update(o)
}
func (e *eventStore) Delete(o interface{}) error {
e.mtx.Lock()
defer e.mtx.Unlock()
e.watch(DELETE, o)
return e.Store.Delete(o)
}
func (e *eventStore) Replace(os []interface{}, ver string) error {
e.mtx.Lock()
defer e.mtx.Unlock()
indexed := map[string]interface{}{}
for _, o := range os {
key, err := e.keyFunc(o)
if err != nil {
return err
}
indexed[key] = o
}
existing := map[string]interface{}{}
for _, o := range e.Store.List() {
key, err := e.keyFunc(o)
if err != nil {
return err
}
existing[key] = o
if _, ok := indexed[key]; !ok {
e.watch(DELETE, o)
}
}
for key, o := range indexed {
if _, ok := existing[key]; !ok {
e.watch(ADD, o)
} else {
e.watch(UPDATE, o)
}
}
return e.Store.Replace(os, ver)
}

View File

@@ -149,7 +149,7 @@ func probeMain(flags probeFlags) {
if flags.kubernetesEnabled {
if client, err := kubernetes.NewClient(flags.kubernetesAPI, flags.kubernetesInterval); err == nil {
defer client.Stop()
reporter := kubernetes.NewReporter(client, clients, probeID)
reporter := kubernetes.NewReporter(client, clients, probeID, p)
defer reporter.Stop()
p.AddReporter(reporter)
} else {

View File

@@ -15,7 +15,12 @@ const (
// PodRenderer is a Renderer which produces a renderable kubernetes
// graph by merging the container graph and the pods topology.
var PodRenderer = FilterEmpty(report.Container,
var PodRenderer = MakeFilter(
func(n report.Node) bool {
// Drop deleted or empty pods
state, ok := n.Latest.Lookup(kubernetes.PodState)
return HasChildren(report.Container)(n) && (!ok || state != kubernetes.StateDeleted)
},
MakeReduce(
MakeFilter(
func(n report.Node) bool {