From 481258d8fc7c1b340579abb8b9a40444e82bc85b Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Thu, 13 Jul 2017 17:46:30 -0700 Subject: [PATCH] kubernetes probe: Collect info on cronjobs and statefulsets Most of the time you only care about cronjobs, not the jobs that make them up, so we only collect full cronjob data. We associate pods of jobs with the parent cronjob --- probe/kubernetes/client.go | 73 ++++++++++++++++++++---- probe/kubernetes/cronjob.go | 75 +++++++++++++++++++++++++ probe/kubernetes/reporter.go | 92 ++++++++++++++++++++++++++++++- probe/kubernetes/reporter_test.go | 6 ++ probe/kubernetes/statefulset.go | 55 ++++++++++++++++++ 5 files changed, 289 insertions(+), 12 deletions(-) create mode 100644 probe/kubernetes/cronjob.go create mode 100644 probe/kubernetes/statefulset.go diff --git a/probe/kubernetes/client.go b/probe/kubernetes/client.go index 0fabde108..15bcefc57 100644 --- a/probe/kubernetes/client.go +++ b/probe/kubernetes/client.go @@ -13,7 +13,10 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/client-go/kubernetes" apiv1 "k8s.io/client-go/pkg/api/v1" - apiv1beta1 "k8s.io/client-go/pkg/apis/extensions/v1beta1" + apiappsv1beta1 "k8s.io/client-go/pkg/apis/apps/v1beta1" + apibatchv1 "k8s.io/client-go/pkg/apis/batch/v1" + apibatchv2alpha1 "k8s.io/client-go/pkg/apis/batch/v2alpha1" + apiextensionsv1beta1 "k8s.io/client-go/pkg/apis/extensions/v1beta1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" @@ -28,6 +31,8 @@ type Client interface { WalkDeployments(f func(Deployment) error) error WalkReplicaSets(f func(ReplicaSet) error) error WalkDaemonSets(f func(DaemonSet) error) error + WalkStatefulSets(f func(StatefulSet) error) error + WalkCronJobs(f func(CronJob) error) error WalkReplicationControllers(f func(ReplicationController) error) error WalkNodes(f func(*apiv1.Node) error) error @@ -48,6 +53,9 @@ type client struct { deploymentStore cache.Store replicaSetStore cache.Store daemonSetStore cache.Store + statefulSetStore cache.Store + jobStore cache.Store + cronJobStore cache.Store replicationControllerStore cache.Store nodeStore cache.Store @@ -157,9 +165,21 @@ func NewClient(config ClientConfig) (Client, error) { if _, err := c.Extensions().Deployments(metav1.NamespaceAll).List(metav1.ListOptions{}); err != nil { log.Infof("Deployments, ReplicaSets and DaemonSets are not supported by this Kubernetes version: %v", err) } else { - result.deploymentStore = result.setupStore(c.ExtensionsV1beta1Client.RESTClient(), "deployments", &apiv1beta1.Deployment{}, nil) - result.replicaSetStore = result.setupStore(c.ExtensionsV1beta1Client.RESTClient(), "replicasets", &apiv1beta1.ReplicaSet{}, nil) - result.daemonSetStore = result.setupStore(c.ExtensionsV1beta1Client.RESTClient(), "daemonsets", &apiv1beta1.DaemonSet{}, nil) + result.deploymentStore = result.setupStore(c.ExtensionsV1beta1Client.RESTClient(), "deployments", &apiextensionsv1beta1.Deployment{}, nil) + result.replicaSetStore = result.setupStore(c.ExtensionsV1beta1Client.RESTClient(), "replicasets", &apiextensionsv1beta1.ReplicaSet{}, nil) + result.daemonSetStore = result.setupStore(c.ExtensionsV1beta1Client.RESTClient(), "daemonsets", &apiextensionsv1beta1.DaemonSet{}, nil) + } + // CronJobs and StatefulSets were introduced later. Easiest to use the same technique. + if _, err := c.BatchV2alpha1().CronJobs(metav1.NamespaceAll).List(metav1.ListOptions{}); err != nil { + log.Infof("CronJobs are not supported by this Kubernetes version: %v", err) + } else { + result.jobStore = result.setupStore(c.BatchV1Client.RESTClient(), "jobs", &apibatchv1.Job{}, nil) + result.cronJobStore = result.setupStore(c.BatchV2alpha1Client.RESTClient(), "cronjobs", &apibatchv2alpha1.CronJob{}, nil) + } + if _, err := c.Apps().StatefulSets(metav1.NamespaceAll).List(metav1.ListOptions{}); err != nil { + log.Infof("StatefulSets are not supported by this Kubernetes version: %v", err) + } else { + result.statefulSetStore = result.setupStore(c.AppsV1beta1Client.RESTClient(), "statefulsets", &apiappsv1beta1.StatefulSet{}, nil) } return result, nil @@ -214,7 +234,7 @@ func (c *client) WalkDeployments(f func(Deployment) error) error { return nil } for _, m := range c.deploymentStore.List() { - d := m.(*apiv1beta1.Deployment) + d := m.(*apiextensionsv1beta1.Deployment) if err := f(NewDeployment(d)); err != nil { return err } @@ -228,7 +248,7 @@ func (c *client) WalkReplicaSets(f func(ReplicaSet) error) error { return nil } for _, m := range c.replicaSetStore.List() { - rs := m.(*apiv1beta1.ReplicaSet) + rs := m.(*apiextensionsv1beta1.ReplicaSet) if err := f(NewReplicaSet(rs)); err != nil { return err } @@ -254,7 +274,7 @@ func (c *client) WalkDaemonSets(f func(DaemonSet) error) error { return nil } for _, m := range c.daemonSetStore.List() { - ds := m.(*apiv1beta1.DaemonSet) + ds := m.(*apiextensionsv1beta1.DaemonSet) if err := f(NewDaemonSet(ds)); err != nil { return err } @@ -262,6 +282,39 @@ func (c *client) WalkDaemonSets(f func(DaemonSet) error) error { return nil } +// WalkStatefulSets calls f for each statefulset +func (c *client) WalkStatefulSets(f func(StatefulSet) error) error { + if c.statefulSetStore == nil { + return nil + } + for _, m := range c.statefulSetStore.List() { + s := m.(*apiappsv1beta1.StatefulSet) + if err := f(NewStatefulSet(s)); err != nil { + return err + } + } + return nil +} + +// WalkCronJobs calls f for each cronjob +func (c *client) WalkCronJobs(f func(CronJob) error) error { + if c.cronJobStore == nil { + return nil + } + jobs := []*apibatchv1.Job{} + for _, m := range c.jobStore.List() { + j := m.(*apibatchv1.Job) + jobs = append(jobs, j) + } + for _, m := range c.cronJobStore.List() { + cj := m.(*apibatchv2alpha1.CronJob) + if err := f(NewCronJob(cj, jobs)); err != nil { + return err + } + } + return nil +} + func (c *client) WalkNodes(f func(*apiv1.Node) error) error { for _, m := range c.nodeStore.List() { node := m.(*apiv1.Node) @@ -288,18 +341,18 @@ func (c *client) DeletePod(namespaceID, podID string) error { } func (c *client) ScaleUp(resource, namespaceID, id string) error { - return c.modifyScale(resource, namespaceID, id, func(scale *apiv1beta1.Scale) { + return c.modifyScale(resource, namespaceID, id, func(scale *apiextensionsv1beta1.Scale) { scale.Spec.Replicas++ }) } func (c *client) ScaleDown(resource, namespaceID, id string) error { - return c.modifyScale(resource, namespaceID, id, func(scale *apiv1beta1.Scale) { + return c.modifyScale(resource, namespaceID, id, func(scale *apiextensionsv1beta1.Scale) { scale.Spec.Replicas-- }) } -func (c *client) modifyScale(resource, namespace, id string, f func(*apiv1beta1.Scale)) error { +func (c *client) modifyScale(resource, namespace, id string, f func(*apiextensionsv1beta1.Scale)) error { scaler := c.client.Extensions().Scales(namespace) scale, err := scaler.Get(resource, id) if err != nil { diff --git a/probe/kubernetes/cronjob.go b/probe/kubernetes/cronjob.go new file mode 100644 index 000000000..c188a7d1f --- /dev/null +++ b/probe/kubernetes/cronjob.go @@ -0,0 +1,75 @@ +package kubernetes + +import ( + "fmt" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + batchv1 "k8s.io/client-go/pkg/apis/batch/v1" + batchv2alpha1 "k8s.io/client-go/pkg/apis/batch/v2alpha1" + + "github.com/weaveworks/scope/report" +) + +// These constants are keys used in node metadata +const ( + Schedule = "kubernetes_schedule" + Suspended = "kubernetes_suspended" + LastScheduled = "kubernetes_last_scheduled" + ActiveJobs = "kubernetes_active_jobs" +) + +// CronJob represents a Kubernetes cron job +type CronJob interface { + Meta + Selectors() ([]labels.Selector, error) + GetNode() report.Node +} + +type cronJob struct { + *batchv2alpha1.CronJob + Meta + jobs []*batchv1.Job +} + +// NewCronJob creates a new cron job. jobs should be all jobs, which will be filtered +// for those matching this cron job. +func NewCronJob(cj *batchv2alpha1.CronJob, jobs []*batchv1.Job) CronJob { + myJobs := []*batchv1.Job{} + for _, j := range jobs { + for _, o := range cj.Status.Active { + if j.UID == o.UID { + myJobs = append(myJobs, j) + break + } + } + } + return &cronJob{ + CronJob: cj, + Meta: meta{cj.ObjectMeta}, + jobs: myJobs, + } +} + +func (cj *cronJob) Selectors() ([]labels.Selector, error) { + selectors := []labels.Selector{} + for _, j := range cj.jobs { + selector, err := metav1.LabelSelectorAsSelector(j.Spec.Selector) + if err != nil { + return nil, err + } + selectors = append(selectors, selector) + } + return selectors, nil +} + +func (cj *cronJob) GetNode() report.Node { + return cj.MetaNode(report.MakeCronJobNodeID(cj.UID())).WithLatests(map[string]string{ + NodeType: "Cron Job", + Schedule: cj.Spec.Schedule, + Suspended: fmt.Sprint(cj.Spec.Suspend != nil && *cj.Spec.Suspend), // nil -> false + LastScheduled: cj.Status.LastScheduleTime.Format(time.RFC3339Nano), + ActiveJobs: fmt.Sprint(len(cj.jobs)), + }) +} diff --git a/probe/kubernetes/reporter.go b/probe/kubernetes/reporter.go index e93a6ccd1..369dc1038 100644 --- a/probe/kubernetes/reporter.go +++ b/probe/kubernetes/reporter.go @@ -78,6 +78,30 @@ var ( DaemonSetMetricTemplates = PodMetricTemplates + StatefulSetMetadataTemplates = report.MetadataTemplates{ + NodeType: {ID: NodeType, Label: "Type", From: report.FromLatest, Priority: 1}, + Namespace: {ID: Namespace, Label: "Namespace", From: report.FromLatest, Priority: 2}, + Created: {ID: Created, Label: "Created", From: report.FromLatest, Datatype: "datetime", Priority: 3}, + ObservedGeneration: {ID: ObservedGeneration, Label: "Observed Gen.", From: report.FromLatest, Datatype: "number", Priority: 4}, + DesiredReplicas: {ID: DesiredReplicas, Label: "Desired Replicas", From: report.FromLatest, Datatype: "number", Priority: 5}, + report.Pod: {ID: report.Pod, Label: "# Pods", From: report.FromCounters, Datatype: "number", Priority: 6}, + } + + StatefulSetMetricTemplates = PodMetricTemplates + + CronJobMetadataTemplates = report.MetadataTemplates{ + NodeType: {ID: NodeType, Label: "Type", From: report.FromLatest, Priority: 1}, + Namespace: {ID: Namespace, Label: "Namespace", From: report.FromLatest, Priority: 2}, + Created: {ID: Created, Label: "Created", From: report.FromLatest, Datatype: "datetime", Priority: 3}, + Schedule: {ID: Schedule, Label: "Schedule", From: report.FromLatest, Priority: 4}, + LastScheduled: {ID: LastScheduled, Label: "Last Scheduled", From: report.FromLatest, Datatype: "datetime", Priority: 5}, + Suspended: {ID: Suspended, Label: "Suspended", From: report.FromLatest, Priority: 6}, + ActiveJobs: {ID: ActiveJobs, Label: "# Jobs", From: report.FromLatest, Datatype: "number", Priority: 7}, + report.Pod: {ID: report.Pod, Label: "# Pods", From: report.FromCounters, Datatype: "number", Priority: 8}, + } + + CronJobMetricTemplates = PodMetricTemplates + TableTemplates = report.TableTemplates{ LabelPrefix: { ID: LabelPrefix, @@ -222,6 +246,14 @@ func (r *Reporter) Report() (report.Report, error) { if err != nil { return result, err } + statefulSetTopology, statefulSets, err := r.statefulSetTopology() + if err != nil { + return result, err + } + cronJobTopology, cronJobs, err := r.cronJobTopology() + if err != nil { + return result, err + } deploymentTopology, deployments, err := r.deploymentTopology(r.probeID) if err != nil { return result, err @@ -230,7 +262,7 @@ func (r *Reporter) Report() (report.Report, error) { if err != nil { return result, err } - podTopology, err := r.podTopology(services, replicaSets, daemonSets) + podTopology, err := r.podTopology(services, replicaSets, daemonSets, statefulSets, cronJobs) if err != nil { return result, err } @@ -238,6 +270,8 @@ func (r *Reporter) Report() (report.Report, error) { result.Service = result.Service.Merge(serviceTopology) result.Host = result.Host.Merge(hostTopology) result.DaemonSet = result.DaemonSet.Merge(daemonSetTopology) + result.StatefulSet = result.StatefulSet.Merge(statefulSetTopology) + result.CronJob = result.CronJob.Merge(cronJobTopology) result.Deployment = result.Deployment.Merge(deploymentTopology) result.ReplicaSet = result.ReplicaSet.Merge(replicaSetTopology) return result, nil @@ -310,6 +344,34 @@ func (r *Reporter) daemonSetTopology() (report.Topology, []DaemonSet, error) { return result, daemonSets, err } +func (r *Reporter) statefulSetTopology() (report.Topology, []StatefulSet, error) { + statefulSets := []StatefulSet{} + result := report.MakeTopology(). + WithMetadataTemplates(StatefulSetMetadataTemplates). + WithMetricTemplates(StatefulSetMetricTemplates). + WithTableTemplates(TableTemplates) + err := r.client.WalkStatefulSets(func(s StatefulSet) error { + result = result.AddNode(s.GetNode()) + statefulSets = append(statefulSets, s) + return nil + }) + return result, statefulSets, err +} + +func (r *Reporter) cronJobTopology() (report.Topology, []CronJob, error) { + cronJobs := []CronJob{} + result := report.MakeTopology(). + WithMetadataTemplates(CronJobMetadataTemplates). + WithMetricTemplates(CronJobMetricTemplates). + WithTableTemplates(TableTemplates) + err := r.client.WalkCronJobs(func(c CronJob) error { + result = result.AddNode(c.GetNode()) + cronJobs = append(cronJobs, c) + return nil + }) + return result, cronJobs, err +} + func (r *Reporter) replicaSetTopology(probeID string, deployments []Deployment) (report.Topology, []ReplicaSet, error) { var ( result = report.MakeTopology(). @@ -373,7 +435,7 @@ func match(namespace string, selector labels.Selector, topology, id string) func } } -func (r *Reporter) podTopology(services []Service, replicaSets []ReplicaSet, daemonSets []DaemonSet) (report.Topology, error) { +func (r *Reporter) podTopology(services []Service, replicaSets []ReplicaSet, daemonSets []DaemonSet, statefulSets []StatefulSet, cronJobs []CronJob) (report.Topology, error) { var ( pods = report.MakeTopology(). WithMetadataTemplates(PodMetadataTemplates). @@ -425,6 +487,32 @@ func (r *Reporter) podTopology(services []Service, replicaSets []ReplicaSet, dae report.MakeDaemonSetNodeID(daemonSet.UID()), )) } + for _, statefulSet := range statefulSets { + selector, err := statefulSet.Selector() + if err != nil { + return pods, err + } + selectors = append(selectors, match( + statefulSet.Namespace(), + selector, + report.StatefulSet, + report.MakeStatefulSetNodeID(statefulSet.UID()), + )) + } + for _, cronJob := range cronJobs { + cronJobSelectors, err := cronJob.Selectors() + if err != nil { + return pods, err + } + for _, selector := range cronJobSelectors { + selectors = append(selectors, match( + cronJob.Namespace(), + selector, + report.CronJob, + report.MakeCronJobNodeID(cronJob.UID()), + )) + } + } var localPodUIDs map[string]struct{} if r.nodeName == "" { diff --git a/probe/kubernetes/reporter_test.go b/probe/kubernetes/reporter_test.go index f9de8945e..f1bd1d6e5 100644 --- a/probe/kubernetes/reporter_test.go +++ b/probe/kubernetes/reporter_test.go @@ -136,6 +136,12 @@ func (c *mockClient) WalkServices(f func(kubernetes.Service) error) error { func (c *mockClient) WalkDaemonSets(f func(kubernetes.DaemonSet) error) error { return nil } +func (c *mockClient) WalkStatefulSets(f func(kubernetes.StatefulSet) error) error { + return nil +} +func (c *mockClient) WalkCronJobs(f func(kubernetes.CronJob) error) error { + return nil +} func (c *mockClient) WalkDeployments(f func(kubernetes.Deployment) error) error { return nil } diff --git a/probe/kubernetes/statefulset.go b/probe/kubernetes/statefulset.go new file mode 100644 index 000000000..a60cad822 --- /dev/null +++ b/probe/kubernetes/statefulset.go @@ -0,0 +1,55 @@ +package kubernetes + +import ( + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/pkg/apis/apps/v1beta1" + + "github.com/weaveworks/scope/report" +) + +// StatefulSet represents a Kubernetes statefulset +type StatefulSet interface { + Meta + Selector() (labels.Selector, error) + GetNode() report.Node +} + +type statefulSet struct { + *v1beta1.StatefulSet + Meta +} + +// NewStatefulSet creates a new statefulset +func NewStatefulSet(s *v1beta1.StatefulSet) StatefulSet { + return &statefulSet{ + StatefulSet: s, + Meta: meta{s.ObjectMeta}, + } +} + +func (s *statefulSet) Selector() (labels.Selector, error) { + selector, err := metav1.LabelSelectorAsSelector(s.Spec.Selector) + if err != nil { + return nil, err + } + return selector, nil +} + +func (s *statefulSet) GetNode() report.Node { + desiredReplicas := 1 + if s.Spec.Replicas != nil { + desiredReplicas = int(*s.Spec.Replicas) + } + latests := map[string]string{ + NodeType: "Stateful Set", + DesiredReplicas: fmt.Sprint(desiredReplicas), + Replicas: fmt.Sprint(s.Status.Replicas), + } + if s.Status.ObservedGeneration != nil { + latests[ObservedGeneration] = fmt.Sprint(*s.Status.ObservedGeneration) + } + return s.MetaNode(report.MakeStatefulSetNodeID(s.UID())).WithLatests(latests) +}