package utils import ( "context" "errors" "time" batchv1 "k8s.io/api/batch/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" ) // JobAdapter implements WorkloadAdapter for Kubernetes Jobs. type JobAdapter struct { client kubernetes.Interface } // NewJobAdapter creates a new JobAdapter. func NewJobAdapter(client kubernetes.Interface) *JobAdapter { return &JobAdapter{client: client} } // Type returns the workload type. func (a *JobAdapter) Type() WorkloadType { return WorkloadJob } // Create creates a Job with the given config. func (a *JobAdapter) Create(ctx context.Context, namespace, name string, cfg WorkloadConfig) error { opts := buildJobOptions(cfg) _, err := CreateJob(ctx, a.client, namespace, name, opts...) return err } // Delete removes the Job. func (a *JobAdapter) Delete(ctx context.Context, namespace, name string) error { return DeleteJob(ctx, a.client, namespace, name) } // WaitReady waits for the Job to be ready (has active or succeeded pods) using watches. func (a *JobAdapter) WaitReady(ctx context.Context, namespace, name string, timeout time.Duration) error { watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { return a.client.BatchV1().Jobs(namespace).Watch(ctx, opts) } _, err := WatchUntil(ctx, watchFunc, name, IsReady(JobIsReady), timeout) return err } // WaitReloaded returns an error because Jobs are recreated, not updated. // Use the Recreatable interface (GetOriginalUID + WaitRecreated) instead. func (a *JobAdapter) WaitReloaded(ctx context.Context, namespace, name, annotationKey string, timeout time.Duration) (bool, error) { return false, ErrUnsupportedOperation } // WaitEnvVar returns an error because Jobs don't support env var reload strategy. func (a *JobAdapter) WaitEnvVar(ctx context.Context, namespace, name, prefix string, timeout time.Duration) (bool, error) { return false, ErrUnsupportedOperation } // WaitRecreated waits for the Job to be recreated with a different UID using watches. func (a *JobAdapter) WaitRecreated(ctx context.Context, namespace, name, originalUID string, timeout time.Duration) (string, bool, error) { watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { return a.client.BatchV1().Jobs(namespace).Watch(ctx, opts) } job, err := WatchUntil(ctx, watchFunc, name, HasDifferentUID(JobUID, types.UID(originalUID)), timeout) if errors.Is(err, ErrWatchTimeout) { return "", false, nil } if err != nil { return "", false, err } return string(job.UID), true, nil } // SupportsEnvVarStrategy returns false as Jobs don't support env var reload strategy. func (a *JobAdapter) SupportsEnvVarStrategy() bool { return false } // RequiresSpecialHandling returns true as Jobs are recreated by Reloader. func (a *JobAdapter) RequiresSpecialHandling() bool { return true } // GetOriginalUID retrieves the current UID of the Job for recreation verification. func (a *JobAdapter) GetOriginalUID(ctx context.Context, namespace, name string) (string, error) { job, err := a.client.BatchV1().Jobs(namespace).Get(ctx, name, metav1.GetOptions{}) if err != nil { return "", err } return string(job.UID), nil } // GetPodTemplateAnnotation returns the value of a pod template annotation. func (a *JobAdapter) GetPodTemplateAnnotation(ctx context.Context, namespace, name, annotationKey string) (string, error) { job, err := a.client.BatchV1().Jobs(namespace).Get(ctx, name, metav1.GetOptions{}) if err != nil { return "", err } return job.Spec.Template.Annotations[annotationKey], nil } // buildJobOptions converts WorkloadConfig to JobOption slice. func buildJobOptions(cfg WorkloadConfig) []JobOption { return []JobOption{ func(job *batchv1.Job) { if len(cfg.Annotations) > 0 { if job.Annotations == nil { job.Annotations = make(map[string]string) } for k, v := range cfg.Annotations { job.Annotations[k] = v } } ApplyWorkloadConfig(&job.Spec.Template, cfg) }, } }