support recreating k8s jobs when configmap/secret changed (#808)

* support recreating k8s jobs when configmap/secret changed

Co-authored-by: Mohamed Sekour <mohamed.sekour@exfo.com>

* add unit tests

* fix tests
This commit is contained in:
Mohamed Sekour
2025-02-05 10:50:25 +01:00
committed by GitHub
parent 919d0cc3ca
commit 574129d637
11 changed files with 627 additions and 12 deletions

View File

@@ -92,6 +92,26 @@ func GetCronJobItems(clients kube.Clients, namespace string) []runtime.Object {
return items
}
// GetJobItems returns the jobs in given namespace
func GetJobItems(clients kube.Clients, namespace string) []runtime.Object {
jobs, err := clients.KubernetesClient.BatchV1().Jobs(namespace).List(context.TODO(), meta_v1.ListOptions{})
if err != nil {
logrus.Errorf("Failed to list jobs %v", err)
}
items := make([]runtime.Object, len(jobs.Items))
// Ensure we always have pod annotations to add to
for i, v := range jobs.Items {
if v.Spec.Template.ObjectMeta.Annotations == nil {
annotations := make(map[string]string)
jobs.Items[i].Spec.Template.ObjectMeta.Annotations = annotations
}
items[i] = &jobs.Items[i]
}
return items
}
// GetDaemonSetItems returns the daemonSets in given namespace
func GetDaemonSetItems(clients kube.Clients, namespace string) []runtime.Object {
daemonSets, err := clients.KubernetesClient.AppsV1().DaemonSets(namespace).List(context.TODO(), meta_v1.ListOptions{})
@@ -178,6 +198,11 @@ func GetCronJobAnnotations(item runtime.Object) map[string]string {
return item.(*batchv1.CronJob).ObjectMeta.Annotations
}
// GetJobAnnotations returns the annotations of given job
func GetJobAnnotations(item runtime.Object) map[string]string {
return item.(*batchv1.Job).ObjectMeta.Annotations
}
// GetDaemonSetAnnotations returns the annotations of given daemonSet
func GetDaemonSetAnnotations(item runtime.Object) map[string]string {
return item.(*appsv1.DaemonSet).ObjectMeta.Annotations
@@ -208,6 +233,11 @@ func GetCronJobPodAnnotations(item runtime.Object) map[string]string {
return item.(*batchv1.CronJob).Spec.JobTemplate.Spec.Template.ObjectMeta.Annotations
}
// GetJobPodAnnotations returns the pod's annotations of given job
func GetJobPodAnnotations(item runtime.Object) map[string]string {
return item.(*batchv1.Job).Spec.Template.ObjectMeta.Annotations
}
// GetDaemonSetPodAnnotations returns the pod's annotations of given daemonSet
func GetDaemonSetPodAnnotations(item runtime.Object) map[string]string {
return item.(*appsv1.DaemonSet).Spec.Template.ObjectMeta.Annotations
@@ -238,6 +268,11 @@ func GetCronJobContainers(item runtime.Object) []v1.Container {
return item.(*batchv1.CronJob).Spec.JobTemplate.Spec.Template.Spec.Containers
}
// GetJobContainers returns the containers of given job
func GetJobContainers(item runtime.Object) []v1.Container {
return item.(*batchv1.Job).Spec.Template.Spec.Containers
}
// GetDaemonSetContainers returns the containers of given daemonSet
func GetDaemonSetContainers(item runtime.Object) []v1.Container {
return item.(*appsv1.DaemonSet).Spec.Template.Spec.Containers
@@ -268,6 +303,11 @@ func GetCronJobInitContainers(item runtime.Object) []v1.Container {
return item.(*batchv1.CronJob).Spec.JobTemplate.Spec.Template.Spec.InitContainers
}
// GetJobInitContainers returns the containers of given job
func GetJobInitContainers(item runtime.Object) []v1.Container {
return item.(*batchv1.Job).Spec.Template.Spec.InitContainers
}
// GetDaemonSetInitContainers returns the containers of given daemonSet
func GetDaemonSetInitContainers(item runtime.Object) []v1.Container {
return item.(*appsv1.DaemonSet).Spec.Template.Spec.InitContainers
@@ -307,6 +347,38 @@ func CreateJobFromCronjob(clients kube.Clients, namespace string, resource runti
return err
}
// ReCreateJobFromjob performs rolling upgrade on job
func ReCreateJobFromjob(clients kube.Clients, namespace string, resource runtime.Object) error {
oldJob := resource.(*batchv1.Job)
job := oldJob.DeepCopy()
// Delete the old job
policy := meta_v1.DeletePropagationBackground
err := clients.KubernetesClient.BatchV1().Jobs(namespace).Delete(context.TODO(), job.Name, meta_v1.DeleteOptions{PropagationPolicy: &policy})
if err != nil {
return err
}
// Remove fields that should not be specified when creating a new Job
job.ObjectMeta.ResourceVersion = ""
job.ObjectMeta.UID = ""
job.ObjectMeta.CreationTimestamp = meta_v1.Time{}
job.Status = batchv1.JobStatus{}
// Remove problematic labels
delete(job.Spec.Template.Labels, "controller-uid")
delete(job.Spec.Template.Labels, batchv1.ControllerUidLabel)
delete(job.Spec.Template.Labels, batchv1.JobNameLabel)
delete(job.Spec.Template.Labels, "job-name")
// Remove the selector to allow it to be auto-generated
job.Spec.Selector = nil
// Create the new job with same spec
_, err = clients.KubernetesClient.BatchV1().Jobs(namespace).Create(context.TODO(), job, meta_v1.CreateOptions{FieldManager: "Reloader"})
return err
}
// UpdateDaemonSet performs rolling upgrade on daemonSet
func UpdateDaemonSet(clients kube.Clients, namespace string, resource runtime.Object) error {
daemonSet := resource.(*appsv1.DaemonSet)
@@ -352,6 +424,11 @@ func GetCronJobVolumes(item runtime.Object) []v1.Volume {
return item.(*batchv1.CronJob).Spec.JobTemplate.Spec.Template.Spec.Volumes
}
// GetJobVolumes returns the Volumes of given job
func GetJobVolumes(item runtime.Object) []v1.Volume {
return item.(*batchv1.Job).Spec.Template.Spec.Volumes
}
// GetDaemonSetVolumes returns the Volumes of given daemonSet
func GetDaemonSetVolumes(item runtime.Object) []v1.Volume {
return item.(*appsv1.DaemonSet).Spec.Template.Spec.Volumes

View File

@@ -2,13 +2,22 @@ package callbacks_test
import (
"context"
"fmt"
"testing"
"time"
argorolloutv1alpha1 "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
argorollouts "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned/fake"
"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
watch "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes/fake"
argorolloutv1alpha1 "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
fakeargoclientset "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned/fake"
"github.com/stakater/Reloader/internal/pkg/callbacks"
"github.com/stakater/Reloader/internal/pkg/options"
@@ -17,9 +26,32 @@ import (
)
var (
clients = kube.Clients{ArgoRolloutClient: argorollouts.NewSimpleClientset()}
clients = setupTestClients()
)
type testFixtures struct {
defaultContainers []v1.Container
defaultInitContainers []v1.Container
defaultVolumes []v1.Volume
namespace string
}
func newTestFixtures() testFixtures {
return testFixtures{
defaultContainers: []v1.Container{{Name: "container1"}, {Name: "container2"}},
defaultInitContainers: []v1.Container{{Name: "init-container1"}, {Name: "init-container2"}},
defaultVolumes: []v1.Volume{{Name: "volume1"}, {Name: "volume2"}},
namespace: "default",
}
}
func setupTestClients() kube.Clients {
return kube.Clients{
KubernetesClient: fake.NewSimpleClientset(),
ArgoRolloutClient: fakeargoclientset.NewSimpleClientset(),
}
}
// TestUpdateRollout test update rollout strategy annotation
func TestUpdateRollout(t *testing.T) {
namespace := "test-ns"
@@ -79,6 +111,218 @@ func TestUpdateRollout(t *testing.T) {
}
}
func TestResourceItems(t *testing.T) {
fixtures := newTestFixtures()
tests := []struct {
name string
createFunc func(kube.Clients, string) error
getItemsFunc func(kube.Clients, string) []runtime.Object
expectedCount int
}{
{
name: "Deployments",
createFunc: createTestDeployments,
getItemsFunc: callbacks.GetDeploymentItems,
expectedCount: 2,
},
{
name: "CronJobs",
createFunc: createTestCronJobs,
getItemsFunc: callbacks.GetCronJobItems,
expectedCount: 2,
},
{
name: "Jobs",
createFunc: createTestJobs,
getItemsFunc: callbacks.GetJobItems,
expectedCount: 2,
},
{
name: "DaemonSets",
createFunc: createTestDaemonSets,
getItemsFunc: callbacks.GetDaemonSetItems,
expectedCount: 2,
},
{
name: "StatefulSets",
createFunc: createTestStatefulSets,
getItemsFunc: callbacks.GetStatefulSetItems,
expectedCount: 2,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.createFunc(clients, fixtures.namespace)
assert.NoError(t, err)
items := tt.getItemsFunc(clients, fixtures.namespace)
assert.Equal(t, tt.expectedCount, len(items))
})
}
}
func TestGetAnnotations(t *testing.T) {
testAnnotations := map[string]string{"version": "1"}
tests := []struct {
name string
resource runtime.Object
getFunc func(runtime.Object) map[string]string
}{
{"Deployment", &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Annotations: testAnnotations}}, callbacks.GetDeploymentAnnotations},
{"CronJob", &batchv1.CronJob{ObjectMeta: metav1.ObjectMeta{Annotations: testAnnotations}}, callbacks.GetCronJobAnnotations},
{"Job", &batchv1.Job{ObjectMeta: metav1.ObjectMeta{Annotations: testAnnotations}}, callbacks.GetJobAnnotations},
{"DaemonSet", &appsv1.DaemonSet{ObjectMeta: metav1.ObjectMeta{Annotations: testAnnotations}}, callbacks.GetDaemonSetAnnotations},
{"StatefulSet", &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Annotations: testAnnotations}}, callbacks.GetStatefulSetAnnotations},
{"Rollout", &argorolloutv1alpha1.Rollout{ObjectMeta: metav1.ObjectMeta{Annotations: testAnnotations}}, callbacks.GetRolloutAnnotations},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, testAnnotations, tt.getFunc(tt.resource))
})
}
}
func TestGetPodAnnotations(t *testing.T) {
testAnnotations := map[string]string{"version": "1"}
tests := []struct {
name string
resource runtime.Object
getFunc func(runtime.Object) map[string]string
}{
{"Deployment", createResourceWithPodAnnotations(&appsv1.Deployment{}, testAnnotations), callbacks.GetDeploymentPodAnnotations},
{"CronJob", createResourceWithPodAnnotations(&batchv1.CronJob{}, testAnnotations), callbacks.GetCronJobPodAnnotations},
{"Job", createResourceWithPodAnnotations(&batchv1.Job{}, testAnnotations), callbacks.GetJobPodAnnotations},
{"DaemonSet", createResourceWithPodAnnotations(&appsv1.DaemonSet{}, testAnnotations), callbacks.GetDaemonSetPodAnnotations},
{"StatefulSet", createResourceWithPodAnnotations(&appsv1.StatefulSet{}, testAnnotations), callbacks.GetStatefulSetPodAnnotations},
{"Rollout", createResourceWithPodAnnotations(&argorolloutv1alpha1.Rollout{}, testAnnotations), callbacks.GetRolloutPodAnnotations},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, testAnnotations, tt.getFunc(tt.resource))
})
}
}
func TestGetContainers(t *testing.T) {
fixtures := newTestFixtures()
tests := []struct {
name string
resource runtime.Object
getFunc func(runtime.Object) []v1.Container
}{
{"Deployment", createResourceWithContainers(&appsv1.Deployment{}, fixtures.defaultContainers), callbacks.GetDeploymentContainers},
{"DaemonSet", createResourceWithContainers(&appsv1.DaemonSet{}, fixtures.defaultContainers), callbacks.GetDaemonSetContainers},
{"StatefulSet", createResourceWithContainers(&appsv1.StatefulSet{}, fixtures.defaultContainers), callbacks.GetStatefulSetContainers},
{"CronJob", createResourceWithContainers(&batchv1.CronJob{}, fixtures.defaultContainers), callbacks.GetCronJobContainers},
{"Job", createResourceWithContainers(&batchv1.Job{}, fixtures.defaultContainers), callbacks.GetJobContainers},
{"Rollout", createResourceWithContainers(&argorolloutv1alpha1.Rollout{}, fixtures.defaultContainers), callbacks.GetRolloutContainers},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, fixtures.defaultContainers, tt.getFunc(tt.resource))
})
}
}
func TestGetInitContainers(t *testing.T) {
fixtures := newTestFixtures()
tests := []struct {
name string
resource runtime.Object
getFunc func(runtime.Object) []v1.Container
}{
{"Deployment", createResourceWithInitContainers(&appsv1.Deployment{}, fixtures.defaultInitContainers), callbacks.GetDeploymentInitContainers},
{"DaemonSet", createResourceWithInitContainers(&appsv1.DaemonSet{}, fixtures.defaultInitContainers), callbacks.GetDaemonSetInitContainers},
{"StatefulSet", createResourceWithInitContainers(&appsv1.StatefulSet{}, fixtures.defaultInitContainers), callbacks.GetStatefulSetInitContainers},
{"CronJob", createResourceWithInitContainers(&batchv1.CronJob{}, fixtures.defaultInitContainers), callbacks.GetCronJobInitContainers},
{"Job", createResourceWithInitContainers(&batchv1.Job{}, fixtures.defaultInitContainers), callbacks.GetJobInitContainers},
{"Rollout", createResourceWithInitContainers(&argorolloutv1alpha1.Rollout{}, fixtures.defaultInitContainers), callbacks.GetRolloutInitContainers},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, fixtures.defaultInitContainers, tt.getFunc(tt.resource))
})
}
}
func TestUpdateResources(t *testing.T) {
fixtures := newTestFixtures()
tests := []struct {
name string
createFunc func(kube.Clients, string, string) (runtime.Object, error)
updateFunc func(kube.Clients, string, runtime.Object) error
}{
{"Deployment", createTestDeploymentWithAnnotations, callbacks.UpdateDeployment},
{"DaemonSet", createTestDaemonSetWithAnnotations, callbacks.UpdateDaemonSet},
{"StatefulSet", createTestStatefulSetWithAnnotations, callbacks.UpdateStatefulSet},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
resource, err := tt.createFunc(clients, fixtures.namespace, "1")
assert.NoError(t, err)
err = tt.updateFunc(clients, fixtures.namespace, resource)
assert.NoError(t, err)
})
}
}
func TestCreateJobFromCronjob(t *testing.T) {
fixtures := newTestFixtures()
cronJob, err := createTestCronJobWithAnnotations(clients, fixtures.namespace, "1")
assert.NoError(t, err)
err = callbacks.CreateJobFromCronjob(clients, fixtures.namespace, cronJob.(*batchv1.CronJob))
assert.NoError(t, err)
}
func TestReCreateJobFromJob(t *testing.T) {
fixtures := newTestFixtures()
job, err := createTestJobWithAnnotations(clients, fixtures.namespace, "1")
assert.NoError(t, err)
err = callbacks.ReCreateJobFromjob(clients, fixtures.namespace, job.(*batchv1.Job))
assert.NoError(t, err)
}
func TestGetVolumes(t *testing.T) {
fixtures := newTestFixtures()
tests := []struct {
name string
resource runtime.Object
getFunc func(runtime.Object) []v1.Volume
}{
{"Deployment", createResourceWithVolumes(&appsv1.Deployment{}, fixtures.defaultVolumes), callbacks.GetDeploymentVolumes},
{"CronJob", createResourceWithVolumes(&batchv1.CronJob{}, fixtures.defaultVolumes), callbacks.GetCronJobVolumes},
{"Job", createResourceWithVolumes(&batchv1.Job{}, fixtures.defaultVolumes), callbacks.GetJobVolumes},
{"DaemonSet", createResourceWithVolumes(&appsv1.DaemonSet{}, fixtures.defaultVolumes), callbacks.GetDaemonSetVolumes},
{"StatefulSet", createResourceWithVolumes(&appsv1.StatefulSet{}, fixtures.defaultVolumes), callbacks.GetStatefulSetVolumes},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, fixtures.defaultVolumes, tt.getFunc(tt.resource))
})
}
}
// Helper functions
func isRestartStrategy(rollout *argorolloutv1alpha1.Rollout) bool {
return rollout.Spec.RestartAt == nil
}
@@ -103,3 +347,178 @@ func watchModified(watcher watch.Interface, name string, modifiedChan chan inter
}
}
}
func createTestDeployments(clients kube.Clients, namespace string) error {
for i := 1; i <= 2; i++ {
_, err := testutil.CreateDeployment(clients.KubernetesClient, fmt.Sprintf("test-deployment-%d", i), namespace, false)
if err != nil {
return err
}
}
return nil
}
func createTestCronJobs(clients kube.Clients, namespace string) error {
for i := 1; i <= 2; i++ {
_, err := testutil.CreateCronJob(clients.KubernetesClient, fmt.Sprintf("test-cron-%d", i), namespace, false)
if err != nil {
return err
}
}
return nil
}
func createTestJobs(clients kube.Clients, namespace string) error {
for i := 1; i <= 2; i++ {
_, err := testutil.CreateJob(clients.KubernetesClient, fmt.Sprintf("test-job-%d", i), namespace, false)
if err != nil {
return err
}
}
return nil
}
func createTestDaemonSets(clients kube.Clients, namespace string) error {
for i := 1; i <= 2; i++ {
_, err := testutil.CreateDaemonSet(clients.KubernetesClient, fmt.Sprintf("test-daemonset-%d", i), namespace, false)
if err != nil {
return err
}
}
return nil
}
func createTestStatefulSets(clients kube.Clients, namespace string) error {
for i := 1; i <= 2; i++ {
_, err := testutil.CreateStatefulSet(clients.KubernetesClient, fmt.Sprintf("test-statefulset-%d", i), namespace, false)
if err != nil {
return err
}
}
return nil
}
func createResourceWithPodAnnotations(obj runtime.Object, annotations map[string]string) runtime.Object {
switch v := obj.(type) {
case *appsv1.Deployment:
v.Spec.Template.ObjectMeta.Annotations = annotations
case *appsv1.DaemonSet:
v.Spec.Template.ObjectMeta.Annotations = annotations
case *appsv1.StatefulSet:
v.Spec.Template.ObjectMeta.Annotations = annotations
case *batchv1.CronJob:
v.Spec.JobTemplate.Spec.Template.ObjectMeta.Annotations = annotations
case *batchv1.Job:
v.Spec.Template.ObjectMeta.Annotations = annotations
case *argorolloutv1alpha1.Rollout:
v.Spec.Template.ObjectMeta.Annotations = annotations
}
return obj
}
func createResourceWithContainers(obj runtime.Object, containers []v1.Container) runtime.Object {
switch v := obj.(type) {
case *appsv1.Deployment:
v.Spec.Template.Spec.Containers = containers
case *appsv1.DaemonSet:
v.Spec.Template.Spec.Containers = containers
case *appsv1.StatefulSet:
v.Spec.Template.Spec.Containers = containers
case *batchv1.CronJob:
v.Spec.JobTemplate.Spec.Template.Spec.Containers = containers
case *batchv1.Job:
v.Spec.Template.Spec.Containers = containers
case *argorolloutv1alpha1.Rollout:
v.Spec.Template.Spec.Containers = containers
}
return obj
}
func createResourceWithInitContainers(obj runtime.Object, initContainers []v1.Container) runtime.Object {
switch v := obj.(type) {
case *appsv1.Deployment:
v.Spec.Template.Spec.InitContainers = initContainers
case *appsv1.DaemonSet:
v.Spec.Template.Spec.InitContainers = initContainers
case *appsv1.StatefulSet:
v.Spec.Template.Spec.InitContainers = initContainers
case *batchv1.CronJob:
v.Spec.JobTemplate.Spec.Template.Spec.InitContainers = initContainers
case *batchv1.Job:
v.Spec.Template.Spec.InitContainers = initContainers
case *argorolloutv1alpha1.Rollout:
v.Spec.Template.Spec.InitContainers = initContainers
}
return obj
}
func createResourceWithVolumes(obj runtime.Object, volumes []v1.Volume) runtime.Object {
switch v := obj.(type) {
case *appsv1.Deployment:
v.Spec.Template.Spec.Volumes = volumes
case *batchv1.CronJob:
v.Spec.JobTemplate.Spec.Template.Spec.Volumes = volumes
case *batchv1.Job:
v.Spec.Template.Spec.Volumes = volumes
case *appsv1.DaemonSet:
v.Spec.Template.Spec.Volumes = volumes
case *appsv1.StatefulSet:
v.Spec.Template.Spec.Volumes = volumes
}
return obj
}
func createTestDeploymentWithAnnotations(clients kube.Clients, namespace, version string) (runtime.Object, error) {
deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-deployment",
Namespace: namespace,
Annotations: map[string]string{"version": version},
},
}
return clients.KubernetesClient.AppsV1().Deployments(namespace).Create(context.TODO(), deployment, metav1.CreateOptions{})
}
func createTestDaemonSetWithAnnotations(clients kube.Clients, namespace, version string) (runtime.Object, error) {
daemonSet := &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: "test-daemonset",
Namespace: namespace,
Annotations: map[string]string{"version": version},
},
}
return clients.KubernetesClient.AppsV1().DaemonSets(namespace).Create(context.TODO(), daemonSet, metav1.CreateOptions{})
}
func createTestStatefulSetWithAnnotations(clients kube.Clients, namespace, version string) (runtime.Object, error) {
statefulSet := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: "test-statefulset",
Namespace: namespace,
Annotations: map[string]string{"version": version},
},
}
return clients.KubernetesClient.AppsV1().StatefulSets(namespace).Create(context.TODO(), statefulSet, metav1.CreateOptions{})
}
func createTestCronJobWithAnnotations(clients kube.Clients, namespace, version string) (runtime.Object, error) {
cronJob := &batchv1.CronJob{
ObjectMeta: metav1.ObjectMeta{
Name: "test-cronjob",
Namespace: namespace,
Annotations: map[string]string{"version": version},
},
}
return clients.KubernetesClient.BatchV1().CronJobs(namespace).Create(context.TODO(), cronJob, metav1.CreateOptions{})
}
func createTestJobWithAnnotations(clients kube.Clients, namespace, version string) (runtime.Object, error) {
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job",
Namespace: namespace,
Annotations: map[string]string{"version": version},
},
}
return clients.KubernetesClient.BatchV1().Jobs(namespace).Create(context.TODO(), job, metav1.CreateOptions{})
}

View File

@@ -55,6 +55,20 @@ func GetCronJobCreateJobFuncs() callbacks.RollingUpgradeFuncs {
}
}
// GetDeploymentRollingUpgradeFuncs returns all callback funcs for a cronjob
func GetJobCreateJobFuncs() callbacks.RollingUpgradeFuncs {
return callbacks.RollingUpgradeFuncs{
ItemsFunc: callbacks.GetJobItems,
AnnotationsFunc: callbacks.GetJobAnnotations,
PodAnnotationsFunc: callbacks.GetJobPodAnnotations,
ContainersFunc: callbacks.GetJobContainers,
InitContainersFunc: callbacks.GetJobInitContainers,
UpdateFunc: callbacks.ReCreateJobFromjob,
VolumesFunc: callbacks.GetJobVolumes,
ResourceType: "Job",
}
}
// GetDaemonSetRollingUpgradeFuncs returns all callback funcs for a daemonset
func GetDaemonSetRollingUpgradeFuncs() callbacks.RollingUpgradeFuncs {
return callbacks.RollingUpgradeFuncs{
@@ -153,6 +167,10 @@ func doRollingUpgrade(config util.Config, collectors metrics.Collectors, recorde
if err != nil {
return err
}
err = rollingUpgrade(clients, config, GetJobCreateJobFuncs(), collectors, recorder, invoke)
if err != nil {
return err
}
err = rollingUpgrade(clients, config, GetDaemonSetRollingUpgradeFuncs(), collectors, recorder, invoke)
if err != nil {
return err

View File

@@ -23,6 +23,7 @@ import (
"github.com/stakater/Reloader/internal/pkg/util"
"github.com/stakater/Reloader/pkg/kube"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -177,7 +178,7 @@ func getVolumes(name string) []v1.Volume {
}
}
func getVolumeMounts(name string) []v1.VolumeMount {
func getVolumeMounts() []v1.VolumeMount {
return []v1.VolumeMount{
{
MountPath: "etc/config",
@@ -275,7 +276,7 @@ func getPodTemplateSpecWithVolumes(name string) v1.PodTemplateSpec {
Value: "test",
},
},
VolumeMounts: getVolumeMounts(name),
VolumeMounts: getVolumeMounts(),
},
},
Volumes: getVolumes(name),
@@ -293,7 +294,7 @@ func getPodTemplateSpecWithInitContainer(name string) v1.PodTemplateSpec {
{
Image: "busybox",
Name: "busyBox",
VolumeMounts: getVolumeMounts(name),
VolumeMounts: getVolumeMounts(),
},
},
Containers: []v1.Container{
@@ -637,6 +638,64 @@ func GetSecret(namespace string, secretName string, data string) *v1.Secret {
}
}
func GetCronJob(namespace string, cronJobName string) *batchv1.CronJob {
return &batchv1.CronJob{
ObjectMeta: getObjectMeta(namespace, cronJobName, false, false, false, map[string]string{}),
Spec: batchv1.CronJobSpec{
Schedule: "*/5 * * * *", // Run every 5 minutes
JobTemplate: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"secondLabel": "temp"},
},
Template: getPodTemplateSpecWithVolumes(cronJobName),
},
},
},
}
}
func GetJob(namespace string, jobName string) *batchv1.Job {
return &batchv1.Job{
ObjectMeta: getObjectMeta(namespace, jobName, false, false, false, map[string]string{}),
Spec: batchv1.JobSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"secondLabel": "temp"},
},
Template: getPodTemplateSpecWithVolumes(jobName),
},
}
}
func GetCronJobWithEnvVar(namespace string, cronJobName string) *batchv1.CronJob {
return &batchv1.CronJob{
ObjectMeta: getObjectMeta(namespace, cronJobName, true, false, false, map[string]string{}),
Spec: batchv1.CronJobSpec{
Schedule: "*/5 * * * *", // Run every 5 minutes
JobTemplate: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"secondLabel": "temp"},
},
Template: getPodTemplateSpecWithEnvVars(cronJobName),
},
},
},
}
}
func GetJobWithEnvVar(namespace string, jobName string) *batchv1.Job {
return &batchv1.Job{
ObjectMeta: getObjectMeta(namespace, jobName, true, false, false, map[string]string{}),
Spec: batchv1.JobSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"secondLabel": "temp"},
},
Template: getPodTemplateSpecWithEnvVars(jobName),
},
}
}
// GetSecretWithUpdatedLabel provides secret for testing
func GetSecretWithUpdatedLabel(namespace string, secretName string, label string, data string) *v1.Secret {
return &v1.Secret{
@@ -847,6 +906,36 @@ func CreateStatefulSet(client kubernetes.Interface, statefulsetName string, name
return statefulset, err
}
// CreateCronJob creates a cronjob in given namespace and returns the CronJob
func CreateCronJob(client kubernetes.Interface, cronJobName string, namespace string, volumeMount bool) (*batchv1.CronJob, error) {
logrus.Infof("Creating CronJob")
cronJobClient := client.BatchV1().CronJobs(namespace)
var cronJobObj *batchv1.CronJob
if volumeMount {
cronJobObj = GetCronJob(namespace, cronJobName)
} else {
cronJobObj = GetCronJobWithEnvVar(namespace, cronJobName)
}
cronJob, err := cronJobClient.Create(context.TODO(), cronJobObj, metav1.CreateOptions{})
time.Sleep(3 * time.Second)
return cronJob, err
}
// CreateJob creates a job in given namespace and returns the Job
func CreateJob(client kubernetes.Interface, jobName string, namespace string, volumeMount bool) (*batchv1.Job, error) {
logrus.Infof("Creating Job")
jobClient := client.BatchV1().Jobs(namespace)
var jobObj *batchv1.Job
if volumeMount {
jobObj = GetJob(namespace, jobName)
} else {
jobObj = GetJobWithEnvVar(namespace, jobName)
}
job, err := jobClient.Create(context.TODO(), jobObj, metav1.CreateOptions{})
time.Sleep(3 * time.Second)
return job, err
}
// DeleteDeployment creates a deployment in given namespace and returns the error if any
func DeleteDeployment(client kubernetes.Interface, namespace string, deploymentName string) error {
logrus.Infof("Deleting Deployment")