kubernetes probe: Collect info on cronjobs and statefulsets

Most of the time you only care about cronjobs, not the jobs that make them up,
so we only collect full cronjob data. We associate pods of jobs with the parent cronjob
This commit is contained in:
Mike Lang
2017-07-13 17:46:30 -07:00
parent 17fffb32e1
commit 481258d8fc
5 changed files with 289 additions and 12 deletions

View File

@@ -13,7 +13,10 @@ import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
apiv1 "k8s.io/client-go/pkg/api/v1"
apiv1beta1 "k8s.io/client-go/pkg/apis/extensions/v1beta1"
apiappsv1beta1 "k8s.io/client-go/pkg/apis/apps/v1beta1"
apibatchv1 "k8s.io/client-go/pkg/apis/batch/v1"
apibatchv2alpha1 "k8s.io/client-go/pkg/apis/batch/v2alpha1"
apiextensionsv1beta1 "k8s.io/client-go/pkg/apis/extensions/v1beta1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
@@ -28,6 +31,8 @@ type Client interface {
WalkDeployments(f func(Deployment) error) error
WalkReplicaSets(f func(ReplicaSet) error) error
WalkDaemonSets(f func(DaemonSet) error) error
WalkStatefulSets(f func(StatefulSet) error) error
WalkCronJobs(f func(CronJob) error) error
WalkReplicationControllers(f func(ReplicationController) error) error
WalkNodes(f func(*apiv1.Node) error) error
@@ -48,6 +53,9 @@ type client struct {
deploymentStore cache.Store
replicaSetStore cache.Store
daemonSetStore cache.Store
statefulSetStore cache.Store
jobStore cache.Store
cronJobStore cache.Store
replicationControllerStore cache.Store
nodeStore cache.Store
@@ -157,9 +165,21 @@ func NewClient(config ClientConfig) (Client, error) {
if _, err := c.Extensions().Deployments(metav1.NamespaceAll).List(metav1.ListOptions{}); err != nil {
log.Infof("Deployments, ReplicaSets and DaemonSets are not supported by this Kubernetes version: %v", err)
} else {
result.deploymentStore = result.setupStore(c.ExtensionsV1beta1Client.RESTClient(), "deployments", &apiv1beta1.Deployment{}, nil)
result.replicaSetStore = result.setupStore(c.ExtensionsV1beta1Client.RESTClient(), "replicasets", &apiv1beta1.ReplicaSet{}, nil)
result.daemonSetStore = result.setupStore(c.ExtensionsV1beta1Client.RESTClient(), "daemonsets", &apiv1beta1.DaemonSet{}, nil)
result.deploymentStore = result.setupStore(c.ExtensionsV1beta1Client.RESTClient(), "deployments", &apiextensionsv1beta1.Deployment{}, nil)
result.replicaSetStore = result.setupStore(c.ExtensionsV1beta1Client.RESTClient(), "replicasets", &apiextensionsv1beta1.ReplicaSet{}, nil)
result.daemonSetStore = result.setupStore(c.ExtensionsV1beta1Client.RESTClient(), "daemonsets", &apiextensionsv1beta1.DaemonSet{}, nil)
}
// CronJobs and StatefulSets were introduced later. Easiest to use the same technique.
if _, err := c.BatchV2alpha1().CronJobs(metav1.NamespaceAll).List(metav1.ListOptions{}); err != nil {
log.Infof("CronJobs are not supported by this Kubernetes version: %v", err)
} else {
result.jobStore = result.setupStore(c.BatchV1Client.RESTClient(), "jobs", &apibatchv1.Job{}, nil)
result.cronJobStore = result.setupStore(c.BatchV2alpha1Client.RESTClient(), "cronjobs", &apibatchv2alpha1.CronJob{}, nil)
}
if _, err := c.Apps().StatefulSets(metav1.NamespaceAll).List(metav1.ListOptions{}); err != nil {
log.Infof("StatefulSets are not supported by this Kubernetes version: %v", err)
} else {
result.statefulSetStore = result.setupStore(c.AppsV1beta1Client.RESTClient(), "statefulsets", &apiappsv1beta1.StatefulSet{}, nil)
}
return result, nil
@@ -214,7 +234,7 @@ func (c *client) WalkDeployments(f func(Deployment) error) error {
return nil
}
for _, m := range c.deploymentStore.List() {
d := m.(*apiv1beta1.Deployment)
d := m.(*apiextensionsv1beta1.Deployment)
if err := f(NewDeployment(d)); err != nil {
return err
}
@@ -228,7 +248,7 @@ func (c *client) WalkReplicaSets(f func(ReplicaSet) error) error {
return nil
}
for _, m := range c.replicaSetStore.List() {
rs := m.(*apiv1beta1.ReplicaSet)
rs := m.(*apiextensionsv1beta1.ReplicaSet)
if err := f(NewReplicaSet(rs)); err != nil {
return err
}
@@ -254,7 +274,7 @@ func (c *client) WalkDaemonSets(f func(DaemonSet) error) error {
return nil
}
for _, m := range c.daemonSetStore.List() {
ds := m.(*apiv1beta1.DaemonSet)
ds := m.(*apiextensionsv1beta1.DaemonSet)
if err := f(NewDaemonSet(ds)); err != nil {
return err
}
@@ -262,6 +282,39 @@ func (c *client) WalkDaemonSets(f func(DaemonSet) error) error {
return nil
}
// WalkStatefulSets calls f for each statefulset
func (c *client) WalkStatefulSets(f func(StatefulSet) error) error {
if c.statefulSetStore == nil {
return nil
}
for _, m := range c.statefulSetStore.List() {
s := m.(*apiappsv1beta1.StatefulSet)
if err := f(NewStatefulSet(s)); err != nil {
return err
}
}
return nil
}
// WalkCronJobs calls f for each cronjob
func (c *client) WalkCronJobs(f func(CronJob) error) error {
if c.cronJobStore == nil {
return nil
}
jobs := []*apibatchv1.Job{}
for _, m := range c.jobStore.List() {
j := m.(*apibatchv1.Job)
jobs = append(jobs, j)
}
for _, m := range c.cronJobStore.List() {
cj := m.(*apibatchv2alpha1.CronJob)
if err := f(NewCronJob(cj, jobs)); err != nil {
return err
}
}
return nil
}
func (c *client) WalkNodes(f func(*apiv1.Node) error) error {
for _, m := range c.nodeStore.List() {
node := m.(*apiv1.Node)
@@ -288,18 +341,18 @@ func (c *client) DeletePod(namespaceID, podID string) error {
}
func (c *client) ScaleUp(resource, namespaceID, id string) error {
return c.modifyScale(resource, namespaceID, id, func(scale *apiv1beta1.Scale) {
return c.modifyScale(resource, namespaceID, id, func(scale *apiextensionsv1beta1.Scale) {
scale.Spec.Replicas++
})
}
func (c *client) ScaleDown(resource, namespaceID, id string) error {
return c.modifyScale(resource, namespaceID, id, func(scale *apiv1beta1.Scale) {
return c.modifyScale(resource, namespaceID, id, func(scale *apiextensionsv1beta1.Scale) {
scale.Spec.Replicas--
})
}
func (c *client) modifyScale(resource, namespace, id string, f func(*apiv1beta1.Scale)) error {
func (c *client) modifyScale(resource, namespace, id string, f func(*apiextensionsv1beta1.Scale)) error {
scaler := c.client.Extensions().Scales(namespace)
scale, err := scaler.Get(resource, id)
if err != nil {

View File

@@ -0,0 +1,75 @@
package kubernetes
import (
"fmt"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
batchv1 "k8s.io/client-go/pkg/apis/batch/v1"
batchv2alpha1 "k8s.io/client-go/pkg/apis/batch/v2alpha1"
"github.com/weaveworks/scope/report"
)
// These constants are keys used in node metadata
const (
Schedule = "kubernetes_schedule"
Suspended = "kubernetes_suspended"
LastScheduled = "kubernetes_last_scheduled"
ActiveJobs = "kubernetes_active_jobs"
)
// CronJob represents a Kubernetes cron job
type CronJob interface {
Meta
Selectors() ([]labels.Selector, error)
GetNode() report.Node
}
type cronJob struct {
*batchv2alpha1.CronJob
Meta
jobs []*batchv1.Job
}
// NewCronJob creates a new cron job. jobs should be all jobs, which will be filtered
// for those matching this cron job.
func NewCronJob(cj *batchv2alpha1.CronJob, jobs []*batchv1.Job) CronJob {
myJobs := []*batchv1.Job{}
for _, j := range jobs {
for _, o := range cj.Status.Active {
if j.UID == o.UID {
myJobs = append(myJobs, j)
break
}
}
}
return &cronJob{
CronJob: cj,
Meta: meta{cj.ObjectMeta},
jobs: myJobs,
}
}
func (cj *cronJob) Selectors() ([]labels.Selector, error) {
selectors := []labels.Selector{}
for _, j := range cj.jobs {
selector, err := metav1.LabelSelectorAsSelector(j.Spec.Selector)
if err != nil {
return nil, err
}
selectors = append(selectors, selector)
}
return selectors, nil
}
func (cj *cronJob) GetNode() report.Node {
return cj.MetaNode(report.MakeCronJobNodeID(cj.UID())).WithLatests(map[string]string{
NodeType: "Cron Job",
Schedule: cj.Spec.Schedule,
Suspended: fmt.Sprint(cj.Spec.Suspend != nil && *cj.Spec.Suspend), // nil -> false
LastScheduled: cj.Status.LastScheduleTime.Format(time.RFC3339Nano),
ActiveJobs: fmt.Sprint(len(cj.jobs)),
})
}

View File

@@ -78,6 +78,30 @@ var (
DaemonSetMetricTemplates = PodMetricTemplates
StatefulSetMetadataTemplates = report.MetadataTemplates{
NodeType: {ID: NodeType, Label: "Type", From: report.FromLatest, Priority: 1},
Namespace: {ID: Namespace, Label: "Namespace", From: report.FromLatest, Priority: 2},
Created: {ID: Created, Label: "Created", From: report.FromLatest, Datatype: "datetime", Priority: 3},
ObservedGeneration: {ID: ObservedGeneration, Label: "Observed Gen.", From: report.FromLatest, Datatype: "number", Priority: 4},
DesiredReplicas: {ID: DesiredReplicas, Label: "Desired Replicas", From: report.FromLatest, Datatype: "number", Priority: 5},
report.Pod: {ID: report.Pod, Label: "# Pods", From: report.FromCounters, Datatype: "number", Priority: 6},
}
StatefulSetMetricTemplates = PodMetricTemplates
CronJobMetadataTemplates = report.MetadataTemplates{
NodeType: {ID: NodeType, Label: "Type", From: report.FromLatest, Priority: 1},
Namespace: {ID: Namespace, Label: "Namespace", From: report.FromLatest, Priority: 2},
Created: {ID: Created, Label: "Created", From: report.FromLatest, Datatype: "datetime", Priority: 3},
Schedule: {ID: Schedule, Label: "Schedule", From: report.FromLatest, Priority: 4},
LastScheduled: {ID: LastScheduled, Label: "Last Scheduled", From: report.FromLatest, Datatype: "datetime", Priority: 5},
Suspended: {ID: Suspended, Label: "Suspended", From: report.FromLatest, Priority: 6},
ActiveJobs: {ID: ActiveJobs, Label: "# Jobs", From: report.FromLatest, Datatype: "number", Priority: 7},
report.Pod: {ID: report.Pod, Label: "# Pods", From: report.FromCounters, Datatype: "number", Priority: 8},
}
CronJobMetricTemplates = PodMetricTemplates
TableTemplates = report.TableTemplates{
LabelPrefix: {
ID: LabelPrefix,
@@ -222,6 +246,14 @@ func (r *Reporter) Report() (report.Report, error) {
if err != nil {
return result, err
}
statefulSetTopology, statefulSets, err := r.statefulSetTopology()
if err != nil {
return result, err
}
cronJobTopology, cronJobs, err := r.cronJobTopology()
if err != nil {
return result, err
}
deploymentTopology, deployments, err := r.deploymentTopology(r.probeID)
if err != nil {
return result, err
@@ -230,7 +262,7 @@ func (r *Reporter) Report() (report.Report, error) {
if err != nil {
return result, err
}
podTopology, err := r.podTopology(services, replicaSets, daemonSets)
podTopology, err := r.podTopology(services, replicaSets, daemonSets, statefulSets, cronJobs)
if err != nil {
return result, err
}
@@ -238,6 +270,8 @@ func (r *Reporter) Report() (report.Report, error) {
result.Service = result.Service.Merge(serviceTopology)
result.Host = result.Host.Merge(hostTopology)
result.DaemonSet = result.DaemonSet.Merge(daemonSetTopology)
result.StatefulSet = result.StatefulSet.Merge(statefulSetTopology)
result.CronJob = result.CronJob.Merge(cronJobTopology)
result.Deployment = result.Deployment.Merge(deploymentTopology)
result.ReplicaSet = result.ReplicaSet.Merge(replicaSetTopology)
return result, nil
@@ -310,6 +344,34 @@ func (r *Reporter) daemonSetTopology() (report.Topology, []DaemonSet, error) {
return result, daemonSets, err
}
func (r *Reporter) statefulSetTopology() (report.Topology, []StatefulSet, error) {
statefulSets := []StatefulSet{}
result := report.MakeTopology().
WithMetadataTemplates(StatefulSetMetadataTemplates).
WithMetricTemplates(StatefulSetMetricTemplates).
WithTableTemplates(TableTemplates)
err := r.client.WalkStatefulSets(func(s StatefulSet) error {
result = result.AddNode(s.GetNode())
statefulSets = append(statefulSets, s)
return nil
})
return result, statefulSets, err
}
func (r *Reporter) cronJobTopology() (report.Topology, []CronJob, error) {
cronJobs := []CronJob{}
result := report.MakeTopology().
WithMetadataTemplates(CronJobMetadataTemplates).
WithMetricTemplates(CronJobMetricTemplates).
WithTableTemplates(TableTemplates)
err := r.client.WalkCronJobs(func(c CronJob) error {
result = result.AddNode(c.GetNode())
cronJobs = append(cronJobs, c)
return nil
})
return result, cronJobs, err
}
func (r *Reporter) replicaSetTopology(probeID string, deployments []Deployment) (report.Topology, []ReplicaSet, error) {
var (
result = report.MakeTopology().
@@ -373,7 +435,7 @@ func match(namespace string, selector labels.Selector, topology, id string) func
}
}
func (r *Reporter) podTopology(services []Service, replicaSets []ReplicaSet, daemonSets []DaemonSet) (report.Topology, error) {
func (r *Reporter) podTopology(services []Service, replicaSets []ReplicaSet, daemonSets []DaemonSet, statefulSets []StatefulSet, cronJobs []CronJob) (report.Topology, error) {
var (
pods = report.MakeTopology().
WithMetadataTemplates(PodMetadataTemplates).
@@ -425,6 +487,32 @@ func (r *Reporter) podTopology(services []Service, replicaSets []ReplicaSet, dae
report.MakeDaemonSetNodeID(daemonSet.UID()),
))
}
for _, statefulSet := range statefulSets {
selector, err := statefulSet.Selector()
if err != nil {
return pods, err
}
selectors = append(selectors, match(
statefulSet.Namespace(),
selector,
report.StatefulSet,
report.MakeStatefulSetNodeID(statefulSet.UID()),
))
}
for _, cronJob := range cronJobs {
cronJobSelectors, err := cronJob.Selectors()
if err != nil {
return pods, err
}
for _, selector := range cronJobSelectors {
selectors = append(selectors, match(
cronJob.Namespace(),
selector,
report.CronJob,
report.MakeCronJobNodeID(cronJob.UID()),
))
}
}
var localPodUIDs map[string]struct{}
if r.nodeName == "" {

View File

@@ -136,6 +136,12 @@ func (c *mockClient) WalkServices(f func(kubernetes.Service) error) error {
func (c *mockClient) WalkDaemonSets(f func(kubernetes.DaemonSet) error) error {
return nil
}
func (c *mockClient) WalkStatefulSets(f func(kubernetes.StatefulSet) error) error {
return nil
}
func (c *mockClient) WalkCronJobs(f func(kubernetes.CronJob) error) error {
return nil
}
func (c *mockClient) WalkDeployments(f func(kubernetes.Deployment) error) error {
return nil
}

View File

@@ -0,0 +1,55 @@
package kubernetes
import (
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/pkg/apis/apps/v1beta1"
"github.com/weaveworks/scope/report"
)
// StatefulSet represents a Kubernetes statefulset
type StatefulSet interface {
Meta
Selector() (labels.Selector, error)
GetNode() report.Node
}
type statefulSet struct {
*v1beta1.StatefulSet
Meta
}
// NewStatefulSet creates a new statefulset
func NewStatefulSet(s *v1beta1.StatefulSet) StatefulSet {
return &statefulSet{
StatefulSet: s,
Meta: meta{s.ObjectMeta},
}
}
func (s *statefulSet) Selector() (labels.Selector, error) {
selector, err := metav1.LabelSelectorAsSelector(s.Spec.Selector)
if err != nil {
return nil, err
}
return selector, nil
}
func (s *statefulSet) GetNode() report.Node {
desiredReplicas := 1
if s.Spec.Replicas != nil {
desiredReplicas = int(*s.Spec.Replicas)
}
latests := map[string]string{
NodeType: "Stateful Set",
DesiredReplicas: fmt.Sprint(desiredReplicas),
Replicas: fmt.Sprint(s.Status.Replicas),
}
if s.Status.ObservedGeneration != nil {
latests[ObservedGeneration] = fmt.Sprint(*s.Status.ObservedGeneration)
}
return s.MetaNode(report.MakeStatefulSetNodeID(s.UID())).WithLatests(latests)
}