mirror of
https://github.com/stakater/Reloader.git
synced 2026-05-17 06:06:39 +00:00
feat: Switch to using watches instead of manual sleeps
This commit is contained in:
171
test/e2e/utils/accessors.go
Normal file
171
test/e2e/utils/accessors.go
Normal file
@@ -0,0 +1,171 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
batchv1 "k8s.io/api/batch/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
csiv1 "sigs.k8s.io/secrets-store-csi-driver/apis/v1"
|
||||
|
||||
rolloutsv1alpha1 "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
|
||||
openshiftappsv1 "github.com/openshift/api/apps/v1"
|
||||
)
|
||||
|
||||
// Deployment accessors
|
||||
var (
|
||||
DeploymentPodTemplate PodTemplateAccessor[*appsv1.Deployment] = func(d *appsv1.Deployment) *corev1.PodTemplateSpec {
|
||||
return &d.Spec.Template
|
||||
}
|
||||
DeploymentAnnotations AnnotationAccessor[*appsv1.Deployment] = func(d *appsv1.Deployment) map[string]string {
|
||||
return d.Annotations
|
||||
}
|
||||
DeploymentContainers ContainerAccessor[*appsv1.Deployment] = func(d *appsv1.Deployment) []corev1.Container {
|
||||
return d.Spec.Template.Spec.Containers
|
||||
}
|
||||
DeploymentIsReady StatusAccessor[*appsv1.Deployment] = func(d *appsv1.Deployment) bool {
|
||||
if d.Spec.Replicas == nil {
|
||||
return false
|
||||
}
|
||||
return d.Status.ReadyReplicas == *d.Spec.Replicas &&
|
||||
d.Status.UpdatedReplicas == *d.Spec.Replicas &&
|
||||
d.Status.AvailableReplicas == *d.Spec.Replicas
|
||||
}
|
||||
)
|
||||
|
||||
// DaemonSet accessors
|
||||
var (
|
||||
DaemonSetPodTemplate PodTemplateAccessor[*appsv1.DaemonSet] = func(d *appsv1.DaemonSet) *corev1.PodTemplateSpec {
|
||||
return &d.Spec.Template
|
||||
}
|
||||
DaemonSetAnnotations AnnotationAccessor[*appsv1.DaemonSet] = func(d *appsv1.DaemonSet) map[string]string {
|
||||
return d.Annotations
|
||||
}
|
||||
DaemonSetContainers ContainerAccessor[*appsv1.DaemonSet] = func(d *appsv1.DaemonSet) []corev1.Container {
|
||||
return d.Spec.Template.Spec.Containers
|
||||
}
|
||||
DaemonSetIsReady StatusAccessor[*appsv1.DaemonSet] = func(d *appsv1.DaemonSet) bool {
|
||||
return d.Status.DesiredNumberScheduled > 0 &&
|
||||
d.Status.NumberReady == d.Status.DesiredNumberScheduled
|
||||
}
|
||||
)
|
||||
|
||||
// StatefulSet accessors
|
||||
var (
|
||||
StatefulSetPodTemplate PodTemplateAccessor[*appsv1.StatefulSet] = func(s *appsv1.StatefulSet) *corev1.PodTemplateSpec {
|
||||
return &s.Spec.Template
|
||||
}
|
||||
StatefulSetAnnotations AnnotationAccessor[*appsv1.StatefulSet] = func(s *appsv1.StatefulSet) map[string]string {
|
||||
return s.Annotations
|
||||
}
|
||||
StatefulSetContainers ContainerAccessor[*appsv1.StatefulSet] = func(s *appsv1.StatefulSet) []corev1.Container {
|
||||
return s.Spec.Template.Spec.Containers
|
||||
}
|
||||
StatefulSetIsReady StatusAccessor[*appsv1.StatefulSet] = func(s *appsv1.StatefulSet) bool {
|
||||
if s.Spec.Replicas == nil {
|
||||
return false
|
||||
}
|
||||
return s.Status.ReadyReplicas == *s.Spec.Replicas
|
||||
}
|
||||
)
|
||||
|
||||
// Job accessors
|
||||
var (
|
||||
JobPodTemplate PodTemplateAccessor[*batchv1.Job] = func(j *batchv1.Job) *corev1.PodTemplateSpec {
|
||||
return &j.Spec.Template
|
||||
}
|
||||
JobAnnotations AnnotationAccessor[*batchv1.Job] = func(j *batchv1.Job) map[string]string {
|
||||
return j.Annotations
|
||||
}
|
||||
JobContainers ContainerAccessor[*batchv1.Job] = func(j *batchv1.Job) []corev1.Container {
|
||||
return j.Spec.Template.Spec.Containers
|
||||
}
|
||||
JobIsReady StatusAccessor[*batchv1.Job] = func(j *batchv1.Job) bool {
|
||||
return j.Status.Active > 0 || j.Status.Succeeded > 0
|
||||
}
|
||||
JobUID UIDAccessor[*batchv1.Job] = func(j *batchv1.Job) types.UID {
|
||||
return j.UID
|
||||
}
|
||||
)
|
||||
|
||||
// CronJob accessors
|
||||
var (
|
||||
CronJobPodTemplate PodTemplateAccessor[*batchv1.CronJob] = func(c *batchv1.CronJob) *corev1.PodTemplateSpec {
|
||||
return &c.Spec.JobTemplate.Spec.Template
|
||||
}
|
||||
CronJobAnnotations AnnotationAccessor[*batchv1.CronJob] = func(c *batchv1.CronJob) map[string]string {
|
||||
return c.Annotations
|
||||
}
|
||||
CronJobContainers ContainerAccessor[*batchv1.CronJob] = func(c *batchv1.CronJob) []corev1.Container {
|
||||
return c.Spec.JobTemplate.Spec.Template.Spec.Containers
|
||||
}
|
||||
CronJobExists StatusAccessor[*batchv1.CronJob] = func(c *batchv1.CronJob) bool {
|
||||
return true // Just existence check
|
||||
}
|
||||
)
|
||||
|
||||
// Argo Rollout accessors
|
||||
var (
|
||||
RolloutPodTemplate PodTemplateAccessor[*rolloutsv1alpha1.Rollout] = func(r *rolloutsv1alpha1.Rollout) *corev1.PodTemplateSpec {
|
||||
return &r.Spec.Template
|
||||
}
|
||||
RolloutAnnotations AnnotationAccessor[*rolloutsv1alpha1.Rollout] = func(r *rolloutsv1alpha1.Rollout) map[string]string {
|
||||
return r.Annotations
|
||||
}
|
||||
RolloutContainers ContainerAccessor[*rolloutsv1alpha1.Rollout] = func(r *rolloutsv1alpha1.Rollout) []corev1.Container {
|
||||
return r.Spec.Template.Spec.Containers
|
||||
}
|
||||
RolloutIsReady StatusAccessor[*rolloutsv1alpha1.Rollout] = func(r *rolloutsv1alpha1.Rollout) bool {
|
||||
if r.Spec.Replicas == nil {
|
||||
return false
|
||||
}
|
||||
return r.Status.ReadyReplicas == *r.Spec.Replicas
|
||||
}
|
||||
RolloutHasRestartAt StatusAccessor[*rolloutsv1alpha1.Rollout] = func(r *rolloutsv1alpha1.Rollout) bool {
|
||||
return r.Spec.RestartAt != nil
|
||||
}
|
||||
)
|
||||
|
||||
// OpenShift DeploymentConfig accessors
|
||||
var (
|
||||
DeploymentConfigPodTemplate PodTemplateAccessor[*openshiftappsv1.DeploymentConfig] = func(d *openshiftappsv1.DeploymentConfig) *corev1.PodTemplateSpec {
|
||||
return d.Spec.Template
|
||||
}
|
||||
DeploymentConfigAnnotations AnnotationAccessor[*openshiftappsv1.DeploymentConfig] = func(d *openshiftappsv1.DeploymentConfig) map[string]string {
|
||||
return d.Annotations
|
||||
}
|
||||
DeploymentConfigContainers ContainerAccessor[*openshiftappsv1.DeploymentConfig] = func(d *openshiftappsv1.DeploymentConfig) []corev1.Container {
|
||||
if d.Spec.Template == nil {
|
||||
return nil
|
||||
}
|
||||
return d.Spec.Template.Spec.Containers
|
||||
}
|
||||
DeploymentConfigIsReady StatusAccessor[*openshiftappsv1.DeploymentConfig] = func(d *openshiftappsv1.DeploymentConfig) bool {
|
||||
return d.Status.ReadyReplicas == d.Spec.Replicas
|
||||
}
|
||||
)
|
||||
|
||||
// SecretProviderClassPodStatus accessors
|
||||
var (
|
||||
SPCPSIsMounted StatusAccessor[*csiv1.SecretProviderClassPodStatus] = func(s *csiv1.SecretProviderClassPodStatus) bool {
|
||||
return s.Status.Mounted
|
||||
}
|
||||
SPCPSClassName ValueAccessor[*csiv1.SecretProviderClassPodStatus, string] = func(s *csiv1.SecretProviderClassPodStatus) string {
|
||||
return s.Status.SecretProviderClassName
|
||||
}
|
||||
SPCPSPodName ValueAccessor[*csiv1.SecretProviderClassPodStatus, string] = func(s *csiv1.SecretProviderClassPodStatus) string {
|
||||
return s.Status.PodName
|
||||
}
|
||||
// SPCPSVersions returns concatenated versions of all objects for change detection.
|
||||
SPCPSVersions ValueAccessor[*csiv1.SecretProviderClassPodStatus, string] = func(s *csiv1.SecretProviderClassPodStatus) string {
|
||||
if len(s.Status.Objects) == 0 {
|
||||
return ""
|
||||
}
|
||||
var versions []string
|
||||
for _, obj := range s.Status.Objects {
|
||||
versions = append(versions, obj.Version)
|
||||
}
|
||||
return strings.Join(versions, ",")
|
||||
}
|
||||
)
|
||||
188
test/e2e/utils/conditions.go
Normal file
188
test/e2e/utils/conditions.go
Normal file
@@ -0,0 +1,188 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
batchv1 "k8s.io/api/batch/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
csiv1 "sigs.k8s.io/secrets-store-csi-driver/apis/v1"
|
||||
)
|
||||
|
||||
// PodTemplateAccessor extracts PodTemplateSpec from a workload.
|
||||
type PodTemplateAccessor[T any] func(T) *corev1.PodTemplateSpec
|
||||
|
||||
// AnnotationAccessor extracts annotations from a resource.
|
||||
type AnnotationAccessor[T any] func(T) map[string]string
|
||||
|
||||
// ContainerAccessor extracts containers from a resource.
|
||||
type ContainerAccessor[T any] func(T) []corev1.Container
|
||||
|
||||
// StatusAccessor extracts ready status from a resource.
|
||||
type StatusAccessor[T any] func(T) bool
|
||||
|
||||
// UIDAccessor extracts UID from a resource.
|
||||
type UIDAccessor[T any] func(T) types.UID
|
||||
|
||||
// ValueAccessor extracts a comparable value from a resource.
|
||||
type ValueAccessor[T any, V comparable] func(T) V
|
||||
|
||||
// HasPodTemplateAnnotation returns a condition that checks for an annotation on the pod template.
|
||||
func HasPodTemplateAnnotation[T any](accessor PodTemplateAccessor[T], key string) Condition[T] {
|
||||
return func(obj T) bool {
|
||||
template := accessor(obj)
|
||||
if template == nil || template.Annotations == nil {
|
||||
return false
|
||||
}
|
||||
_, ok := template.Annotations[key]
|
||||
return ok
|
||||
}
|
||||
}
|
||||
|
||||
// HasAnnotation returns a condition that checks for an annotation on the resource.
|
||||
func HasAnnotation[T any](accessor AnnotationAccessor[T], key string) Condition[T] {
|
||||
return func(obj T) bool {
|
||||
annotations := accessor(obj)
|
||||
if annotations == nil {
|
||||
return false
|
||||
}
|
||||
_, ok := annotations[key]
|
||||
return ok
|
||||
}
|
||||
}
|
||||
|
||||
// NoAnnotation returns a condition that checks an annotation is absent.
|
||||
func NoAnnotation[T any](accessor AnnotationAccessor[T], key string) Condition[T] {
|
||||
return func(obj T) bool {
|
||||
annotations := accessor(obj)
|
||||
if annotations == nil {
|
||||
return true
|
||||
}
|
||||
_, ok := annotations[key]
|
||||
return !ok
|
||||
}
|
||||
}
|
||||
|
||||
// HasEnvVarPrefix returns a condition that checks for an env var with the given prefix.
|
||||
func HasEnvVarPrefix[T any](accessor ContainerAccessor[T], prefix string) Condition[T] {
|
||||
return func(obj T) bool {
|
||||
containers := accessor(obj)
|
||||
for _, container := range containers {
|
||||
for _, env := range container.Env {
|
||||
if strings.HasPrefix(env.Name, prefix) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// IsReady returns a condition that checks if the resource is ready.
|
||||
func IsReady[T any](accessor StatusAccessor[T]) Condition[T] {
|
||||
return func(obj T) bool {
|
||||
return accessor(obj)
|
||||
}
|
||||
}
|
||||
|
||||
// HasDifferentUID returns a condition that checks if the UID differs from original.
|
||||
func HasDifferentUID[T any](accessor UIDAccessor[T], originalUID types.UID) Condition[T] {
|
||||
return func(obj T) bool {
|
||||
return accessor(obj) != originalUID
|
||||
}
|
||||
}
|
||||
|
||||
// HasDifferentValue returns a condition that checks if a value differs from original.
|
||||
func HasDifferentValue[T any, V comparable](accessor ValueAccessor[T, V], original V) Condition[T] {
|
||||
return func(obj T) bool {
|
||||
return accessor(obj) != original
|
||||
}
|
||||
}
|
||||
|
||||
// And combines multiple conditions with AND logic.
|
||||
func And[T any](conditions ...Condition[T]) Condition[T] {
|
||||
return func(obj T) bool {
|
||||
for _, cond := range conditions {
|
||||
if !cond(obj) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// Or combines multiple conditions with OR logic.
|
||||
func Or[T any](conditions ...Condition[T]) Condition[T] {
|
||||
return func(obj T) bool {
|
||||
for _, cond := range conditions {
|
||||
if cond(obj) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// Always returns a condition that always returns true (for existence checks).
|
||||
func Always[T any]() Condition[T] {
|
||||
return func(obj T) bool {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// IsTriggeredJobForCronJob returns a condition that checks if a Job was triggered
|
||||
// by Reloader for the specified CronJob (has owner reference and instantiate annotation).
|
||||
func IsTriggeredJobForCronJob(cronJobName string) Condition[*batchv1.Job] {
|
||||
return func(job *batchv1.Job) bool {
|
||||
for _, ownerRef := range job.OwnerReferences {
|
||||
if ownerRef.Kind == "CronJob" && ownerRef.Name == cronJobName {
|
||||
if job.Annotations != nil {
|
||||
if _, ok := job.Annotations["cronjob.kubernetes.io/instantiate"]; ok {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// SPCPSVersionChanged returns a condition that checks if the SPCPS version has changed
|
||||
// from the initial version and the SPCPS is mounted.
|
||||
func SPCPSVersionChanged(initialVersion string) Condition[*csiv1.SecretProviderClassPodStatus] {
|
||||
return func(spcps *csiv1.SecretProviderClassPodStatus) bool {
|
||||
if !spcps.Status.Mounted || len(spcps.Status.Objects) == 0 {
|
||||
return false
|
||||
}
|
||||
for _, obj := range spcps.Status.Objects {
|
||||
if obj.Version != initialVersion {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// SPCPSForSPC returns a condition that checks if the SPCPS references a specific
|
||||
// SecretProviderClass and is mounted.
|
||||
func SPCPSForSPC(spcName string) Condition[*csiv1.SecretProviderClassPodStatus] {
|
||||
return func(spcps *csiv1.SecretProviderClassPodStatus) bool {
|
||||
return spcps.Status.SecretProviderClassName == spcName && spcps.Status.Mounted
|
||||
}
|
||||
}
|
||||
|
||||
// SPCPSForPod returns a condition that checks if the SPCPS references a specific
|
||||
// pod and is mounted.
|
||||
func SPCPSForPod(podName string) Condition[*csiv1.SecretProviderClassPodStatus] {
|
||||
return func(spcps *csiv1.SecretProviderClassPodStatus) bool {
|
||||
return spcps.Status.PodName == podName && spcps.Status.Mounted
|
||||
}
|
||||
}
|
||||
|
||||
// SPCPSForPods returns a condition that checks if the SPCPS references any of the
|
||||
// specified pods and is mounted.
|
||||
func SPCPSForPods(podNames map[string]bool) Condition[*csiv1.SecretProviderClassPodStatus] {
|
||||
return func(spcps *csiv1.SecretProviderClassPodStatus) bool {
|
||||
return podNames[spcps.Status.PodName] && spcps.Status.Mounted
|
||||
}
|
||||
}
|
||||
@@ -3,12 +3,14 @@ package utils
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
"k8s.io/client-go/rest"
|
||||
@@ -259,112 +261,72 @@ func execInVaultPod(ctx context.Context, kubeClient kubernetes.Interface, restCo
|
||||
return nil
|
||||
}
|
||||
|
||||
// WaitForSPCPSVersionChange waits for the SecretProviderClassPodStatus objects to change
|
||||
// from the initial version. This is used after updating a Vault secret to wait for CSI
|
||||
// driver to sync the new version.
|
||||
// WaitForSPCPSVersionChange waits for the SecretProviderClassPodStatus version to change
|
||||
// from the initial version using watches. This is used after updating a Vault secret to
|
||||
// wait for CSI driver to sync the new version.
|
||||
func WaitForSPCPSVersionChange(ctx context.Context, client csiclient.Interface, namespace, spcpsName, initialVersion string, timeout time.Duration) error {
|
||||
deadline := time.Now().Add(timeout)
|
||||
for time.Now().Before(deadline) {
|
||||
spcps, err := client.SecretsstoreV1().SecretProviderClassPodStatuses(namespace).Get(ctx, spcpsName, metav1.GetOptions{})
|
||||
if err == nil && spcps.Status.Mounted && len(spcps.Status.Objects) > 0 {
|
||||
// Check if any object version has changed
|
||||
for _, obj := range spcps.Status.Objects {
|
||||
if obj.Version != initialVersion {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-time.After(1 * time.Second):
|
||||
}
|
||||
watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
|
||||
return client.SecretsstoreV1().SecretProviderClassPodStatuses(namespace).Watch(ctx, opts)
|
||||
}
|
||||
return fmt.Errorf("timeout waiting for SecretProviderClassPodStatus %s/%s version to change from %s", namespace, spcpsName, initialVersion)
|
||||
|
||||
_, err := WatchUntil(ctx, watchFunc, spcpsName, SPCPSVersionChanged(initialVersion), timeout)
|
||||
if errors.Is(err, ErrWatchTimeout) {
|
||||
return fmt.Errorf("timeout waiting for SecretProviderClassPodStatus %s/%s version to change from %s", namespace, spcpsName, initialVersion)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// FindSPCPSForDeployment finds the SecretProviderClassPodStatus created by CSI driver
|
||||
// for pods of a given deployment. Returns the first matching SPCPS name.
|
||||
// for pods of a given deployment using watches. Returns the first matching SPCPS name.
|
||||
func FindSPCPSForDeployment(ctx context.Context, csiClient csiclient.Interface, kubeClient kubernetes.Interface, namespace, deploymentName string, timeout time.Duration) (
|
||||
string, error,
|
||||
) {
|
||||
deadline := time.Now().Add(timeout)
|
||||
|
||||
for time.Now().Before(deadline) {
|
||||
// Get pods for the deployment
|
||||
pods, err := kubeClient.CoreV1().Pods(namespace).List(
|
||||
ctx, metav1.ListOptions{
|
||||
LabelSelector: fmt.Sprintf("app=%s", deploymentName),
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return "", ctx.Err()
|
||||
case <-time.After(1 * time.Second):
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Look for SPCPS that references any of these pods
|
||||
spcpsList, err := csiClient.SecretsstoreV1().SecretProviderClassPodStatuses(namespace).List(ctx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return "", ctx.Err()
|
||||
case <-time.After(1 * time.Second):
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
for _, pod := range pods.Items {
|
||||
for _, spcps := range spcpsList.Items {
|
||||
if spcps.Status.PodName == pod.Name && spcps.Status.Mounted {
|
||||
return spcps.Name, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return "", ctx.Err()
|
||||
case <-time.After(1 * time.Second):
|
||||
}
|
||||
// Get pods for the deployment
|
||||
pods, err := kubeClient.CoreV1().Pods(namespace).List(
|
||||
ctx, metav1.ListOptions{
|
||||
LabelSelector: fmt.Sprintf("app=%s", deploymentName),
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("listing pods for deployment %s: %w", deploymentName, err)
|
||||
}
|
||||
|
||||
return "", fmt.Errorf("timeout finding SecretProviderClassPodStatus for deployment %s/%s", namespace, deploymentName)
|
||||
podNames := make(map[string]bool)
|
||||
for _, pod := range pods.Items {
|
||||
podNames[pod.Name] = true
|
||||
}
|
||||
|
||||
watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
|
||||
return csiClient.SecretsstoreV1().SecretProviderClassPodStatuses(namespace).Watch(ctx, opts)
|
||||
}
|
||||
|
||||
// Watch all SPCPS (empty name) and find one that matches any pod
|
||||
spcps, err := WatchUntil(ctx, watchFunc, "", SPCPSForPods(podNames), timeout)
|
||||
if errors.Is(err, ErrWatchTimeout) {
|
||||
return "", fmt.Errorf("timeout finding SecretProviderClassPodStatus for deployment %s/%s", namespace, deploymentName)
|
||||
}
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return spcps.Name, nil
|
||||
}
|
||||
|
||||
// FindSPCPSForSPC finds the SecretProviderClassPodStatus created by CSI driver
|
||||
// that references a specific SecretProviderClass. Returns the first matching SPCPS name.
|
||||
// that references a specific SecretProviderClass using watches. Returns the first matching SPCPS name.
|
||||
func FindSPCPSForSPC(ctx context.Context, csiClient csiclient.Interface, namespace, spcName string, timeout time.Duration) (string, error) {
|
||||
deadline := time.Now().Add(timeout)
|
||||
|
||||
for time.Now().Before(deadline) {
|
||||
spcpsList, err := csiClient.SecretsstoreV1().SecretProviderClassPodStatuses(namespace).List(ctx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return "", ctx.Err()
|
||||
case <-time.After(1 * time.Second):
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
for _, spcps := range spcpsList.Items {
|
||||
if spcps.Status.SecretProviderClassName == spcName && spcps.Status.Mounted {
|
||||
return spcps.Name, nil
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return "", ctx.Err()
|
||||
case <-time.After(1 * time.Second):
|
||||
}
|
||||
watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
|
||||
return csiClient.SecretsstoreV1().SecretProviderClassPodStatuses(namespace).Watch(ctx, opts)
|
||||
}
|
||||
|
||||
return "", fmt.Errorf("timeout finding SecretProviderClassPodStatus for SPC %s/%s", namespace, spcName)
|
||||
// Watch all SPCPS (empty name) and find one that matches the SPC
|
||||
spcps, err := WatchUntil(ctx, watchFunc, "", SPCPSForSPC(spcName), timeout)
|
||||
if errors.Is(err, ErrWatchTimeout) {
|
||||
return "", fmt.Errorf("timeout finding SecretProviderClassPodStatus for SPC %s/%s", namespace, spcName)
|
||||
}
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return spcps.Name, nil
|
||||
}
|
||||
|
||||
// GetSPCPSVersion gets the current version string from a SecretProviderClassPodStatus.
|
||||
|
||||
@@ -3,6 +3,7 @@ package utils
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
batchv1 "k8s.io/api/batch/v1"
|
||||
@@ -969,3 +970,38 @@ func csiVolumeName(spcName string) string {
|
||||
func csiMountPath(spcName string) string {
|
||||
return fmt.Sprintf("/mnt/secrets-store/%s", spcName)
|
||||
}
|
||||
|
||||
// GetDeployment retrieves a deployment by name.
|
||||
func GetDeployment(ctx context.Context, client kubernetes.Interface, namespace, name string) (*appsv1.Deployment, error) {
|
||||
return client.AppsV1().Deployments(namespace).Get(ctx, name, metav1.GetOptions{})
|
||||
}
|
||||
|
||||
// GetPodLogs retrieves logs from pods matching the given label selector.
|
||||
func GetPodLogs(ctx context.Context, client kubernetes.Interface, namespace, labelSelector string) (string, error) {
|
||||
pods, err := client.CoreV1().Pods(namespace).List(
|
||||
ctx, metav1.ListOptions{
|
||||
LabelSelector: labelSelector,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to list pods: %w", err)
|
||||
}
|
||||
|
||||
var allLogs strings.Builder
|
||||
for _, pod := range pods.Items {
|
||||
for _, container := range pod.Spec.Containers {
|
||||
logs, err := client.CoreV1().Pods(namespace).GetLogs(
|
||||
pod.Name, &corev1.PodLogOptions{
|
||||
Container: container.Name,
|
||||
},
|
||||
).Do(ctx).Raw()
|
||||
if err != nil {
|
||||
allLogs.WriteString(fmt.Sprintf("Error getting logs for %s/%s: %v\n", pod.Name, container.Name, err))
|
||||
continue
|
||||
}
|
||||
allLogs.WriteString(fmt.Sprintf("=== %s/%s ===\n%s\n", pod.Name, container.Name, string(logs)))
|
||||
}
|
||||
}
|
||||
|
||||
return allLogs.String(), nil
|
||||
}
|
||||
|
||||
@@ -158,8 +158,8 @@ func (e *TestEnvironment) DeployReloaderWithValues(values map[string]string) err
|
||||
// WaitForReloader waits for the Reloader deployment to be ready.
|
||||
func (e *TestEnvironment) WaitForReloader() error {
|
||||
ginkgo.GinkgoWriter.Println("Waiting for Reloader to be ready...")
|
||||
return WaitForDeploymentReady(e.Ctx, e.KubeClient, e.Namespace, ReloaderDeploymentName(e.ReleaseName),
|
||||
DeploymentReady)
|
||||
adapter := NewDeploymentAdapter(e.KubeClient)
|
||||
return adapter.WaitReady(e.Ctx, e.Namespace, ReloaderDeploymentName(e.ReleaseName), DeploymentReady)
|
||||
}
|
||||
|
||||
// DeployAndWait deploys Reloader with the given values and waits for it to be ready.
|
||||
|
||||
@@ -1,339 +0,0 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
batchv1 "k8s.io/api/batch/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
)
|
||||
|
||||
// Timeout and interval constants for polling operations.
|
||||
const (
|
||||
DefaultInterval = 1 * time.Second // Polling interval (faster feedback)
|
||||
ShortTimeout = 5 * time.Second // Quick checks
|
||||
NegativeTestWait = 3 * time.Second // Wait before checking negative conditions
|
||||
DeploymentReady = 60 * time.Second // Workload readiness (buffer for CI)
|
||||
ReloadTimeout = 15 * time.Second // Time for reload to trigger
|
||||
)
|
||||
|
||||
// WaitForDeploymentReady waits for a deployment to have all replicas available.
|
||||
func WaitForDeploymentReady(ctx context.Context, client kubernetes.Interface, namespace, name string, timeout time.Duration) error {
|
||||
return wait.PollUntilContextTimeout(
|
||||
ctx, DefaultInterval, timeout, true, func(ctx context.Context) (bool, error) {
|
||||
deploy, err := client.AppsV1().Deployments(namespace).Get(ctx, name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if deploy.Status.ReadyReplicas == *deploy.Spec.Replicas &&
|
||||
deploy.Status.UpdatedReplicas == *deploy.Spec.Replicas &&
|
||||
deploy.Status.AvailableReplicas == *deploy.Spec.Replicas {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return false, nil
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// WaitForDeploymentReloaded waits for a deployment's pod template to have the reloader annotation.
|
||||
// Returns true if the annotation was found, false if timeout occurred.
|
||||
func WaitForDeploymentReloaded(ctx context.Context, client kubernetes.Interface, namespace, name, annotationKey string, timeout time.Duration) (bool, error) {
|
||||
return WaitForAnnotation(ctx, func(ctx context.Context) (map[string]string, error) {
|
||||
deploy, err := client.AppsV1().Deployments(namespace).Get(ctx, name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return deploy.Spec.Template.Annotations, nil
|
||||
}, annotationKey, timeout)
|
||||
}
|
||||
|
||||
// WaitForDaemonSetReloaded waits for a DaemonSet's pod template to have the reloader annotation.
|
||||
func WaitForDaemonSetReloaded(ctx context.Context, client kubernetes.Interface, namespace, name, annotationKey string, timeout time.Duration) (bool, error) {
|
||||
return WaitForAnnotation(ctx, func(ctx context.Context) (map[string]string, error) {
|
||||
ds, err := client.AppsV1().DaemonSets(namespace).Get(ctx, name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ds.Spec.Template.Annotations, nil
|
||||
}, annotationKey, timeout)
|
||||
}
|
||||
|
||||
// WaitForStatefulSetReloaded waits for a StatefulSet's pod template to have the reloader annotation.
|
||||
func WaitForStatefulSetReloaded(ctx context.Context, client kubernetes.Interface, namespace, name, annotationKey string, timeout time.Duration) (bool, error) {
|
||||
return WaitForAnnotation(ctx, func(ctx context.Context) (map[string]string, error) {
|
||||
ss, err := client.AppsV1().StatefulSets(namespace).Get(ctx, name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ss.Spec.Template.Annotations, nil
|
||||
}, annotationKey, timeout)
|
||||
}
|
||||
|
||||
// WaitForCronJobReloaded waits for a CronJob's pod template to have the reloader annotation.
|
||||
func WaitForCronJobReloaded(ctx context.Context, client kubernetes.Interface, namespace, name, annotationKey string, timeout time.Duration) (bool, error) {
|
||||
return WaitForAnnotation(ctx, func(ctx context.Context) (map[string]string, error) {
|
||||
cj, err := client.BatchV1().CronJobs(namespace).Get(ctx, name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return cj.Spec.JobTemplate.Spec.Template.Annotations, nil
|
||||
}, annotationKey, timeout)
|
||||
}
|
||||
|
||||
// WaitForCronJobTriggeredJob waits for a Job to be created by the specified CronJob.
|
||||
// It checks owner references to find Jobs created by Reloader's manual trigger.
|
||||
func WaitForCronJobTriggeredJob(ctx context.Context, client kubernetes.Interface, namespace, cronJobName string, timeout time.Duration) (
|
||||
bool, error,
|
||||
) {
|
||||
var found bool
|
||||
err := wait.PollUntilContextTimeout(
|
||||
ctx, DefaultInterval, timeout, true, func(ctx context.Context) (bool, error) {
|
||||
jobs, err := client.BatchV1().Jobs(namespace).List(ctx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
for _, job := range jobs.Items {
|
||||
for _, ownerRef := range job.OwnerReferences {
|
||||
if ownerRef.Kind == "CronJob" && ownerRef.Name == cronJobName {
|
||||
if job.Annotations != nil {
|
||||
if _, ok := job.Annotations["cronjob.kubernetes.io/instantiate"]; ok {
|
||||
found = true
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false, nil
|
||||
},
|
||||
)
|
||||
|
||||
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
|
||||
return false, err
|
||||
}
|
||||
return found, nil
|
||||
}
|
||||
|
||||
// WaitForDeploymentEnvVar waits for a deployment's containers to have an environment variable
|
||||
// with the given prefix (e.g., "STAKATER_").
|
||||
func WaitForDeploymentEnvVar(ctx context.Context, client kubernetes.Interface, namespace, name, prefix string, timeout time.Duration) (bool, error) {
|
||||
return WaitForEnvVarPrefix(ctx, func(ctx context.Context) ([]corev1.Container, error) {
|
||||
deploy, err := client.AppsV1().Deployments(namespace).Get(ctx, name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return deploy.Spec.Template.Spec.Containers, nil
|
||||
}, prefix, timeout)
|
||||
}
|
||||
|
||||
// WaitForDaemonSetEnvVar waits for a DaemonSet's containers to have an environment variable
|
||||
// with the given prefix.
|
||||
func WaitForDaemonSetEnvVar(ctx context.Context, client kubernetes.Interface, namespace, name, prefix string, timeout time.Duration) (bool, error) {
|
||||
return WaitForEnvVarPrefix(ctx, func(ctx context.Context) ([]corev1.Container, error) {
|
||||
ds, err := client.AppsV1().DaemonSets(namespace).Get(ctx, name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ds.Spec.Template.Spec.Containers, nil
|
||||
}, prefix, timeout)
|
||||
}
|
||||
|
||||
// WaitForStatefulSetEnvVar waits for a StatefulSet's containers to have an environment variable
|
||||
// with the given prefix.
|
||||
func WaitForStatefulSetEnvVar(ctx context.Context, client kubernetes.Interface, namespace, name, prefix string, timeout time.Duration) (bool, error) {
|
||||
return WaitForEnvVarPrefix(ctx, func(ctx context.Context) ([]corev1.Container, error) {
|
||||
ss, err := client.AppsV1().StatefulSets(namespace).Get(ctx, name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ss.Spec.Template.Spec.Containers, nil
|
||||
}, prefix, timeout)
|
||||
}
|
||||
|
||||
// WaitForDeploymentPaused waits for a deployment to have the paused-at annotation.
|
||||
func WaitForDeploymentPaused(ctx context.Context, client kubernetes.Interface, namespace, name, pausedAtAnnotation string, timeout time.Duration) (bool, error) {
|
||||
return WaitForAnnotation(ctx, func(ctx context.Context) (map[string]string, error) {
|
||||
deploy, err := client.AppsV1().Deployments(namespace).Get(ctx, name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return deploy.Annotations, nil
|
||||
}, pausedAtAnnotation, timeout)
|
||||
}
|
||||
|
||||
// WaitForDeploymentUnpaused waits for a deployment to NOT have the paused-at annotation.
|
||||
func WaitForDeploymentUnpaused(ctx context.Context, client kubernetes.Interface, namespace, name, pausedAtAnnotation string, timeout time.Duration) (bool, error) {
|
||||
return WaitForNoAnnotation(ctx, func(ctx context.Context) (map[string]string, error) {
|
||||
deploy, err := client.AppsV1().Deployments(namespace).Get(ctx, name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return deploy.Annotations, nil
|
||||
}, pausedAtAnnotation, timeout)
|
||||
}
|
||||
|
||||
// WaitForDaemonSetReady waits for a DaemonSet to have all pods ready.
|
||||
func WaitForDaemonSetReady(ctx context.Context, client kubernetes.Interface, namespace, name string, timeout time.Duration) error {
|
||||
return wait.PollUntilContextTimeout(
|
||||
ctx, DefaultInterval, timeout, true, func(ctx context.Context) (bool, error) {
|
||||
ds, err := client.AppsV1().DaemonSets(namespace).Get(ctx, name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if ds.Status.DesiredNumberScheduled > 0 &&
|
||||
ds.Status.NumberReady == ds.Status.DesiredNumberScheduled {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return false, nil
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// WaitForStatefulSetReady waits for a StatefulSet to have all replicas ready.
|
||||
func WaitForStatefulSetReady(ctx context.Context, client kubernetes.Interface, namespace, name string, timeout time.Duration) error {
|
||||
return wait.PollUntilContextTimeout(
|
||||
ctx, DefaultInterval, timeout, true, func(ctx context.Context) (bool, error) {
|
||||
ss, err := client.AppsV1().StatefulSets(namespace).Get(ctx, name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if ss.Status.ReadyReplicas == *ss.Spec.Replicas {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return false, nil
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// GetDeployment retrieves a deployment by name.
|
||||
func GetDeployment(ctx context.Context, client kubernetes.Interface, namespace, name string) (*appsv1.Deployment, error) {
|
||||
return client.AppsV1().Deployments(namespace).Get(ctx, name, metav1.GetOptions{})
|
||||
}
|
||||
|
||||
// WaitForCronJobExists waits for a CronJob to exist in the cluster.
|
||||
// This is useful for giving Reloader time to detect and index the CronJob before making changes.
|
||||
func WaitForCronJobExists(ctx context.Context, client kubernetes.Interface, namespace, name string, timeout time.Duration) error {
|
||||
return wait.PollUntilContextTimeout(
|
||||
ctx, DefaultInterval, timeout, true, func(ctx context.Context) (bool, error) {
|
||||
_, err := client.BatchV1().CronJobs(namespace).Get(ctx, name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// GetJob retrieves a Job by name.
|
||||
func GetJob(ctx context.Context, client kubernetes.Interface, namespace, name string) (*batchv1.Job, error) {
|
||||
return client.BatchV1().Jobs(namespace).Get(ctx, name, metav1.GetOptions{})
|
||||
}
|
||||
|
||||
// WaitForJobRecreated waits for a Job to be deleted and recreated with a new UID.
|
||||
// Returns the new Job's UID if recreation was detected.
|
||||
func WaitForJobRecreated(ctx context.Context, client kubernetes.Interface, namespace, name, originalUID string, timeout time.Duration) (
|
||||
string, bool, error,
|
||||
) {
|
||||
var newUID string
|
||||
var recreated bool
|
||||
|
||||
err := wait.PollUntilContextTimeout(
|
||||
ctx, DefaultInterval, timeout, true, func(ctx context.Context) (bool, error) {
|
||||
job, err := client.BatchV1().Jobs(namespace).Get(ctx, name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if string(job.UID) != originalUID {
|
||||
newUID = string(job.UID)
|
||||
recreated = true
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return false, nil
|
||||
},
|
||||
)
|
||||
|
||||
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
|
||||
return "", false, err
|
||||
}
|
||||
return newUID, recreated, nil
|
||||
}
|
||||
|
||||
// WaitForJobExists waits for a Job to exist in the cluster.
|
||||
func WaitForJobExists(ctx context.Context, client kubernetes.Interface, namespace, name string, timeout time.Duration) error {
|
||||
return wait.PollUntilContextTimeout(
|
||||
ctx, DefaultInterval, timeout, true, func(ctx context.Context) (bool, error) {
|
||||
_, err := client.BatchV1().Jobs(namespace).Get(ctx, name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, nil // Keep polling
|
||||
}
|
||||
return true, nil
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// WaitForJobReady waits for a Job to have at least one active or succeeded pod.
|
||||
// This ensures the Job has actually started running before proceeding.
|
||||
func WaitForJobReady(ctx context.Context, client kubernetes.Interface, namespace, name string, timeout time.Duration) error {
|
||||
return wait.PollUntilContextTimeout(
|
||||
ctx, DefaultInterval, timeout, true, func(ctx context.Context) (bool, error) {
|
||||
job, err := client.BatchV1().Jobs(namespace).Get(ctx, name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Job is ready if it has at least one active or succeeded pod
|
||||
if job.Status.Active > 0 || job.Status.Succeeded > 0 {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return false, nil
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// GetPodLogs retrieves logs from pods matching the given label selector.
|
||||
func GetPodLogs(ctx context.Context, client kubernetes.Interface, namespace, labelSelector string) (string, error) {
|
||||
pods, err := client.CoreV1().Pods(namespace).List(
|
||||
ctx, metav1.ListOptions{
|
||||
LabelSelector: labelSelector,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to list pods: %w", err)
|
||||
}
|
||||
|
||||
var allLogs strings.Builder
|
||||
for _, pod := range pods.Items {
|
||||
for _, container := range pod.Spec.Containers {
|
||||
logs, err := client.CoreV1().Pods(namespace).GetLogs(
|
||||
pod.Name, &corev1.PodLogOptions{
|
||||
Container: container.Name,
|
||||
},
|
||||
).Do(ctx).Raw()
|
||||
if err != nil {
|
||||
allLogs.WriteString(fmt.Sprintf("Error getting logs for %s/%s: %v\n", pod.Name, container.Name, err))
|
||||
continue
|
||||
}
|
||||
allLogs.WriteString(fmt.Sprintf("=== %s/%s ===\n%s\n", pod.Name, container.Name, string(logs)))
|
||||
}
|
||||
}
|
||||
|
||||
return allLogs.String(), nil
|
||||
}
|
||||
@@ -1,87 +0,0 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
)
|
||||
|
||||
// AnnotationGetter retrieves annotations from a workload's pod template.
|
||||
type AnnotationGetter func(ctx context.Context) (map[string]string, error)
|
||||
|
||||
// ContainerGetter retrieves containers from a workload's pod template.
|
||||
type ContainerGetter func(ctx context.Context) ([]corev1.Container, error)
|
||||
|
||||
// WaitForAnnotation polls until an annotation key exists.
|
||||
func WaitForAnnotation(ctx context.Context, getter AnnotationGetter, key string, timeout time.Duration) (bool, error) {
|
||||
var found bool
|
||||
err := wait.PollUntilContextTimeout(ctx, DefaultInterval, timeout, true, func(ctx context.Context) (bool, error) {
|
||||
annotations, err := getter(ctx)
|
||||
if err != nil {
|
||||
return false, nil // Keep polling on errors
|
||||
}
|
||||
if annotations != nil {
|
||||
if _, ok := annotations[key]; ok {
|
||||
found = true
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
|
||||
return false, err
|
||||
}
|
||||
return found, nil
|
||||
}
|
||||
|
||||
// WaitForNoAnnotation polls until an annotation key is absent.
|
||||
func WaitForNoAnnotation(ctx context.Context, getter AnnotationGetter, key string, timeout time.Duration) (bool, error) {
|
||||
var absent bool
|
||||
err := wait.PollUntilContextTimeout(ctx, DefaultInterval, timeout, true, func(ctx context.Context) (bool, error) {
|
||||
annotations, err := getter(ctx)
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
if annotations == nil {
|
||||
absent = true
|
||||
return true, nil
|
||||
}
|
||||
if _, ok := annotations[key]; !ok {
|
||||
absent = true
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
|
||||
return false, err
|
||||
}
|
||||
return absent, nil
|
||||
}
|
||||
|
||||
// WaitForEnvVarPrefix polls until a container has an env var with given prefix.
|
||||
func WaitForEnvVarPrefix(ctx context.Context, getter ContainerGetter, prefix string, timeout time.Duration) (bool, error) {
|
||||
var found bool
|
||||
err := wait.PollUntilContextTimeout(ctx, DefaultInterval, timeout, true, func(ctx context.Context) (bool, error) {
|
||||
containers, err := getter(ctx)
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
for _, container := range containers {
|
||||
for _, env := range container.Env {
|
||||
if strings.HasPrefix(env.Name, prefix) {
|
||||
found = true
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
|
||||
return false, err
|
||||
}
|
||||
return found, nil
|
||||
}
|
||||
191
test/e2e/utils/watch.go
Normal file
191
test/e2e/utils/watch.go
Normal file
@@ -0,0 +1,191 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
)
|
||||
|
||||
// Timeout constants for watch operations.
|
||||
const (
|
||||
DefaultInterval = 1 * time.Second // Polling interval (legacy, will be removed)
|
||||
ShortTimeout = 5 * time.Second // Quick checks
|
||||
NegativeTestWait = 3 * time.Second // Wait before checking negative conditions
|
||||
DeploymentReady = 60 * time.Second // Workload readiness (buffer for CI)
|
||||
ReloadTimeout = 15 * time.Second // Time for reload to trigger
|
||||
)
|
||||
|
||||
// ErrWatchTimeout is returned when a watch times out waiting for condition.
|
||||
var ErrWatchTimeout = errors.New("watch timeout waiting for condition")
|
||||
|
||||
// WatchFunc is a function that starts a watch for a specific resource.
|
||||
type WatchFunc func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
|
||||
|
||||
// Condition is a function that checks if the desired state is reached.
|
||||
type Condition[T any] func(T) bool
|
||||
|
||||
// WatchUntil watches a resource until the condition is met or timeout occurs.
|
||||
// It handles watch reconnection automatically on errors.
|
||||
// If name is empty, it watches all resources and returns the first matching one.
|
||||
func WatchUntil[T runtime.Object](ctx context.Context, watchFunc WatchFunc, name string, condition Condition[T], timeout time.Duration) (T, error) {
|
||||
var zero T
|
||||
ctx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
|
||||
opts := metav1.ListOptions{Watch: true}
|
||||
if name != "" {
|
||||
opts.FieldSelector = fields.OneTermEqualSelector("metadata.name", name).String()
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return zero, ErrWatchTimeout
|
||||
default:
|
||||
}
|
||||
|
||||
result, done, err := watchOnce(ctx, watchFunc, opts, condition)
|
||||
if done {
|
||||
return result, err
|
||||
}
|
||||
// Watch disconnected, retry after brief pause
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return zero, ErrWatchTimeout
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// watchOnce starts a single watch and processes events until condition met or watch ends.
|
||||
func watchOnce[T runtime.Object](
|
||||
ctx context.Context,
|
||||
watchFunc WatchFunc,
|
||||
opts metav1.ListOptions,
|
||||
condition Condition[T],
|
||||
) (T, bool, error) {
|
||||
var zero T
|
||||
|
||||
watcher, err := watchFunc(ctx, opts)
|
||||
if err != nil {
|
||||
return zero, false, nil // Retry
|
||||
}
|
||||
defer watcher.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return zero, true, ErrWatchTimeout
|
||||
case event, ok := <-watcher.ResultChan():
|
||||
if !ok {
|
||||
return zero, false, nil // Watch closed, retry
|
||||
}
|
||||
|
||||
switch event.Type {
|
||||
case watch.Added, watch.Modified:
|
||||
obj, ok := event.Object.(T)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if condition(obj) {
|
||||
return obj, true, nil
|
||||
}
|
||||
case watch.Deleted:
|
||||
// Resource deleted, keep watching for recreation
|
||||
continue
|
||||
case watch.Error:
|
||||
return zero, false, nil // Retry on error
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// WatchUntilDeleted watches until the resource is deleted or timeout occurs.
|
||||
func WatchUntilDeleted(
|
||||
ctx context.Context,
|
||||
watchFunc WatchFunc,
|
||||
name string,
|
||||
timeout time.Duration,
|
||||
) error {
|
||||
ctx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
|
||||
opts := metav1.ListOptions{
|
||||
FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String(),
|
||||
Watch: true,
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ErrWatchTimeout
|
||||
default:
|
||||
}
|
||||
|
||||
deleted, err := watchDeleteOnce(ctx, watchFunc, opts)
|
||||
if deleted {
|
||||
return err
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ErrWatchTimeout
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func watchDeleteOnce(
|
||||
ctx context.Context,
|
||||
watchFunc WatchFunc,
|
||||
opts metav1.ListOptions,
|
||||
) (bool, error) {
|
||||
watcher, err := watchFunc(ctx, opts)
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
defer watcher.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return true, ErrWatchTimeout
|
||||
case event, ok := <-watcher.ResultChan():
|
||||
if !ok {
|
||||
return false, nil
|
||||
}
|
||||
if event.Type == watch.Deleted {
|
||||
return true, nil
|
||||
}
|
||||
if event.Type == watch.Error {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// WatchUntilDifferentUID watches until the resource has a different UID (recreated).
|
||||
func WatchUntilDifferentUID[T runtime.Object](
|
||||
ctx context.Context,
|
||||
watchFunc WatchFunc,
|
||||
name string,
|
||||
originalUID string,
|
||||
timeout time.Duration,
|
||||
getUID func(T) string,
|
||||
) (T, bool, error) {
|
||||
var zero T
|
||||
result, err := WatchUntil(ctx, watchFunc, name, func(obj T) bool {
|
||||
return getUID(obj) != originalUID
|
||||
}, timeout)
|
||||
if errors.Is(err, ErrWatchTimeout) {
|
||||
return zero, false, nil
|
||||
}
|
||||
if err != nil {
|
||||
return zero, false, err
|
||||
}
|
||||
return result, true, nil
|
||||
}
|
||||
@@ -84,6 +84,32 @@ type WorkloadAdapter interface {
|
||||
RequiresSpecialHandling() bool
|
||||
}
|
||||
|
||||
// Pausable is implemented by workloads that support pause/unpause.
|
||||
// Currently only Deployment supports this capability.
|
||||
type Pausable interface {
|
||||
WaitPaused(ctx context.Context, namespace, name, annotationKey string, timeout time.Duration) (bool, error)
|
||||
WaitUnpaused(ctx context.Context, namespace, name, annotationKey string, timeout time.Duration) (bool, error)
|
||||
}
|
||||
|
||||
// Recreatable is implemented by workloads that are recreated instead of updated.
|
||||
// Currently only Job supports this capability (Jobs are immutable, so Reloader recreates them).
|
||||
type Recreatable interface {
|
||||
GetOriginalUID(ctx context.Context, namespace, name string) (string, error)
|
||||
WaitRecreated(ctx context.Context, namespace, name, originalUID string, timeout time.Duration) (string, bool, error)
|
||||
}
|
||||
|
||||
// JobTriggerer is implemented by workloads that trigger jobs on reload.
|
||||
// Currently only CronJob supports this capability.
|
||||
type JobTriggerer interface {
|
||||
WaitForTriggeredJob(ctx context.Context, namespace, name string, timeout time.Duration) (bool, error)
|
||||
}
|
||||
|
||||
// RestartAtSupporter is implemented by workloads that support the restartAt field.
|
||||
// Currently only ArgoRollout supports this capability.
|
||||
type RestartAtSupporter interface {
|
||||
WaitRestartAt(ctx context.Context, namespace, name string, timeout time.Duration) (bool, error)
|
||||
}
|
||||
|
||||
// AdapterRegistry holds adapters for all workload types.
|
||||
type AdapterRegistry struct {
|
||||
kubeClient kubernetes.Interface
|
||||
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
rolloutsclient "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/utils/ptr"
|
||||
)
|
||||
|
||||
@@ -46,19 +46,50 @@ func (a *ArgoRolloutAdapter) Delete(ctx context.Context, namespace, name string)
|
||||
return a.rolloutsClient.ArgoprojV1alpha1().Rollouts(namespace).Delete(ctx, name, metav1.DeleteOptions{})
|
||||
}
|
||||
|
||||
// WaitReady waits for the Argo Rollout to be ready.
|
||||
// WaitReady waits for the Argo Rollout to be ready using watches.
|
||||
func (a *ArgoRolloutAdapter) WaitReady(ctx context.Context, namespace, name string, timeout time.Duration) error {
|
||||
return WaitForRolloutReady(ctx, a.rolloutsClient, namespace, name, timeout)
|
||||
watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
|
||||
return a.rolloutsClient.ArgoprojV1alpha1().Rollouts(namespace).Watch(ctx, opts)
|
||||
}
|
||||
_, err := WatchUntil(ctx, watchFunc, name, IsReady(RolloutIsReady), timeout)
|
||||
return err
|
||||
}
|
||||
|
||||
// WaitReloaded waits for the Argo Rollout to have the reload annotation.
|
||||
// WaitReloaded waits for the Argo Rollout to have the reload annotation using watches.
|
||||
func (a *ArgoRolloutAdapter) WaitReloaded(ctx context.Context, namespace, name, annotationKey string, timeout time.Duration) (bool, error) {
|
||||
return WaitForRolloutReloaded(ctx, a.rolloutsClient, namespace, name, annotationKey, timeout)
|
||||
watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
|
||||
return a.rolloutsClient.ArgoprojV1alpha1().Rollouts(namespace).Watch(ctx, opts)
|
||||
}
|
||||
_, err := WatchUntil(ctx, watchFunc, name, HasPodTemplateAnnotation(RolloutPodTemplate, annotationKey), timeout)
|
||||
if errors.Is(err, ErrWatchTimeout) {
|
||||
return false, nil
|
||||
}
|
||||
return err == nil, err
|
||||
}
|
||||
|
||||
// WaitEnvVar waits for the Argo Rollout to have a STAKATER_ env var.
|
||||
// WaitEnvVar waits for the Argo Rollout to have a STAKATER_ env var using watches.
|
||||
func (a *ArgoRolloutAdapter) WaitEnvVar(ctx context.Context, namespace, name, prefix string, timeout time.Duration) (bool, error) {
|
||||
return WaitForRolloutEnvVar(ctx, a.rolloutsClient, namespace, name, prefix, timeout)
|
||||
watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
|
||||
return a.rolloutsClient.ArgoprojV1alpha1().Rollouts(namespace).Watch(ctx, opts)
|
||||
}
|
||||
_, err := WatchUntil(ctx, watchFunc, name, HasEnvVarPrefix(RolloutContainers, prefix), timeout)
|
||||
if errors.Is(err, ErrWatchTimeout) {
|
||||
return false, nil
|
||||
}
|
||||
return err == nil, err
|
||||
}
|
||||
|
||||
// WaitRestartAt waits for the Argo Rollout to have the restartAt field set using watches.
|
||||
// This is used when Reloader is configured with rollout strategy=restart.
|
||||
func (a *ArgoRolloutAdapter) WaitRestartAt(ctx context.Context, namespace, name string, timeout time.Duration) (bool, error) {
|
||||
watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
|
||||
return a.rolloutsClient.ArgoprojV1alpha1().Rollouts(namespace).Watch(ctx, opts)
|
||||
}
|
||||
_, err := WatchUntil(ctx, watchFunc, name, IsReady(RolloutHasRestartAt), timeout)
|
||||
if errors.Is(err, ErrWatchTimeout) {
|
||||
return false, nil
|
||||
}
|
||||
return err == nil, err
|
||||
}
|
||||
|
||||
// SupportsEnvVarStrategy returns true as Argo Rollouts support env var reload strategy.
|
||||
@@ -120,70 +151,3 @@ func buildRolloutOptions(cfg WorkloadConfig) []RolloutOption {
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// WaitForRolloutReady waits for an Argo Rollout to be ready using typed client.
|
||||
func WaitForRolloutReady(ctx context.Context, client rolloutsclient.Interface, namespace, name string, timeout time.Duration) error {
|
||||
return wait.PollUntilContextTimeout(ctx, DefaultInterval, timeout, true, func(ctx context.Context) (bool, error) {
|
||||
rollout, err := client.ArgoprojV1alpha1().Rollouts(namespace).Get(ctx, name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Check status.phase == "Healthy" or replicas == availableReplicas
|
||||
if rollout.Status.Phase == rolloutv1alpha1.RolloutPhaseHealthy {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
if rollout.Spec.Replicas != nil && *rollout.Spec.Replicas > 0 &&
|
||||
rollout.Status.AvailableReplicas == *rollout.Spec.Replicas {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return false, nil
|
||||
})
|
||||
}
|
||||
|
||||
// WaitForRolloutReloaded waits for an Argo Rollout's pod template to have the reloader annotation.
|
||||
func WaitForRolloutReloaded(ctx context.Context, client rolloutsclient.Interface, namespace, name, annotationKey string, timeout time.Duration) (bool, error) {
|
||||
return WaitForAnnotation(ctx, func(ctx context.Context) (map[string]string, error) {
|
||||
rollout, err := client.ArgoprojV1alpha1().Rollouts(namespace).Get(ctx, name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return rollout.Spec.Template.Annotations, nil
|
||||
}, annotationKey, timeout)
|
||||
}
|
||||
|
||||
// WaitForRolloutEnvVar waits for an Argo Rollout's container to have an env var with the given prefix.
|
||||
func WaitForRolloutEnvVar(ctx context.Context, client rolloutsclient.Interface, namespace, name, prefix string, timeout time.Duration) (bool, error) {
|
||||
return WaitForEnvVarPrefix(ctx, func(ctx context.Context) ([]corev1.Container, error) {
|
||||
rollout, err := client.ArgoprojV1alpha1().Rollouts(namespace).Get(ctx, name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return rollout.Spec.Template.Spec.Containers, nil
|
||||
}, prefix, timeout)
|
||||
}
|
||||
|
||||
// WaitForRolloutRestartAt waits for an Argo Rollout's spec.restartAt field to be set.
|
||||
func WaitForRolloutRestartAt(ctx context.Context, client rolloutsclient.Interface, namespace, name string, timeout time.Duration) (bool, error) {
|
||||
var found bool
|
||||
err := wait.PollUntilContextTimeout(ctx, DefaultInterval, timeout, true, func(ctx context.Context) (bool, error) {
|
||||
rollout, err := client.ArgoprojV1alpha1().Rollouts(namespace).Get(ctx, name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if rollout.Spec.RestartAt != nil && !rollout.Spec.RestartAt.IsZero() {
|
||||
found = true
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return false, nil
|
||||
})
|
||||
|
||||
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
|
||||
return false, err
|
||||
}
|
||||
return found, nil
|
||||
}
|
||||
|
||||
@@ -2,9 +2,12 @@ package utils
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
batchv1 "k8s.io/api/batch/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
)
|
||||
|
||||
@@ -35,19 +38,29 @@ func (a *CronJobAdapter) Delete(ctx context.Context, namespace, name string) err
|
||||
return DeleteCronJob(ctx, a.client, namespace, name)
|
||||
}
|
||||
|
||||
// WaitReady waits for the CronJob to exist (CronJobs are "ready" immediately after creation).
|
||||
// WaitReady waits for the CronJob to exist using watches.
|
||||
func (a *CronJobAdapter) WaitReady(ctx context.Context, namespace, name string, timeout time.Duration) error {
|
||||
return WaitForCronJobExists(ctx, a.client, namespace, name, timeout)
|
||||
watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
|
||||
return a.client.BatchV1().CronJobs(namespace).Watch(ctx, opts)
|
||||
}
|
||||
_, err := WatchUntil(ctx, watchFunc, name, Always[*batchv1.CronJob](), timeout)
|
||||
return err
|
||||
}
|
||||
|
||||
// WaitReloaded waits for the CronJob to have the reload annotation OR for a triggered Job.
|
||||
// WaitReloaded waits for the CronJob pod template to have the reload annotation using watches.
|
||||
func (a *CronJobAdapter) WaitReloaded(ctx context.Context, namespace, name, annotationKey string, timeout time.Duration) (bool, error) {
|
||||
return WaitForCronJobReloaded(ctx, a.client, namespace, name, annotationKey, timeout)
|
||||
watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
|
||||
return a.client.BatchV1().CronJobs(namespace).Watch(ctx, opts)
|
||||
}
|
||||
_, err := WatchUntil(ctx, watchFunc, name, HasPodTemplateAnnotation(CronJobPodTemplate, annotationKey), timeout)
|
||||
if errors.Is(err, ErrWatchTimeout) {
|
||||
return false, nil
|
||||
}
|
||||
return err == nil, err
|
||||
}
|
||||
|
||||
// WaitEnvVar is not supported for CronJobs as they don't use env var reload strategy.
|
||||
func (a *CronJobAdapter) WaitEnvVar(ctx context.Context, namespace, name, prefix string, timeout time.Duration) (bool, error) {
|
||||
// CronJobs don't support env var strategy
|
||||
return false, nil
|
||||
}
|
||||
|
||||
@@ -61,9 +74,16 @@ func (a *CronJobAdapter) RequiresSpecialHandling() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// WaitForTriggeredJob waits for Reloader to trigger a new Job from this CronJob.
|
||||
// WaitForTriggeredJob waits for Reloader to trigger a new Job from this CronJob using watches.
|
||||
func (a *CronJobAdapter) WaitForTriggeredJob(ctx context.Context, namespace, cronJobName string, timeout time.Duration) (bool, error) {
|
||||
return WaitForCronJobTriggeredJob(ctx, a.client, namespace, cronJobName, timeout)
|
||||
watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
|
||||
return a.client.BatchV1().Jobs(namespace).Watch(ctx, opts)
|
||||
}
|
||||
_, err := WatchUntil(ctx, watchFunc, "", IsTriggeredJobForCronJob(cronJobName), timeout)
|
||||
if errors.Is(err, ErrWatchTimeout) {
|
||||
return false, nil
|
||||
}
|
||||
return err == nil, err
|
||||
}
|
||||
|
||||
// buildCronJobOptions converts WorkloadConfig to CronJobOption slice.
|
||||
|
||||
@@ -2,9 +2,12 @@ package utils
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
)
|
||||
|
||||
@@ -35,19 +38,37 @@ func (a *DaemonSetAdapter) Delete(ctx context.Context, namespace, name string) e
|
||||
return DeleteDaemonSet(ctx, a.client, namespace, name)
|
||||
}
|
||||
|
||||
// WaitReady waits for the DaemonSet to be ready.
|
||||
// WaitReady waits for the DaemonSet to be ready using watches.
|
||||
func (a *DaemonSetAdapter) WaitReady(ctx context.Context, namespace, name string, timeout time.Duration) error {
|
||||
return WaitForDaemonSetReady(ctx, a.client, namespace, name, timeout)
|
||||
watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
|
||||
return a.client.AppsV1().DaemonSets(namespace).Watch(ctx, opts)
|
||||
}
|
||||
_, err := WatchUntil(ctx, watchFunc, name, IsReady(DaemonSetIsReady), timeout)
|
||||
return err
|
||||
}
|
||||
|
||||
// WaitReloaded waits for the DaemonSet to have the reload annotation.
|
||||
// WaitReloaded waits for the DaemonSet to have the reload annotation using watches.
|
||||
func (a *DaemonSetAdapter) WaitReloaded(ctx context.Context, namespace, name, annotationKey string, timeout time.Duration) (bool, error) {
|
||||
return WaitForDaemonSetReloaded(ctx, a.client, namespace, name, annotationKey, timeout)
|
||||
watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
|
||||
return a.client.AppsV1().DaemonSets(namespace).Watch(ctx, opts)
|
||||
}
|
||||
_, err := WatchUntil(ctx, watchFunc, name, HasPodTemplateAnnotation(DaemonSetPodTemplate, annotationKey), timeout)
|
||||
if errors.Is(err, ErrWatchTimeout) {
|
||||
return false, nil
|
||||
}
|
||||
return err == nil, err
|
||||
}
|
||||
|
||||
// WaitEnvVar waits for the DaemonSet to have a STAKATER_ env var.
|
||||
// WaitEnvVar waits for the DaemonSet to have a STAKATER_ env var using watches.
|
||||
func (a *DaemonSetAdapter) WaitEnvVar(ctx context.Context, namespace, name, prefix string, timeout time.Duration) (bool, error) {
|
||||
return WaitForDaemonSetEnvVar(ctx, a.client, namespace, name, prefix, timeout)
|
||||
watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
|
||||
return a.client.AppsV1().DaemonSets(namespace).Watch(ctx, opts)
|
||||
}
|
||||
_, err := WatchUntil(ctx, watchFunc, name, HasEnvVarPrefix(DaemonSetContainers, prefix), timeout)
|
||||
if errors.Is(err, ErrWatchTimeout) {
|
||||
return false, nil
|
||||
}
|
||||
return err == nil, err
|
||||
}
|
||||
|
||||
// SupportsEnvVarStrategy returns true as DaemonSets support env var reload strategy.
|
||||
|
||||
@@ -2,9 +2,12 @@ package utils
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
)
|
||||
|
||||
@@ -35,19 +38,61 @@ func (a *DeploymentAdapter) Delete(ctx context.Context, namespace, name string)
|
||||
return DeleteDeployment(ctx, a.client, namespace, name)
|
||||
}
|
||||
|
||||
// WaitReady waits for the Deployment to be ready.
|
||||
// WaitReady waits for the Deployment to be ready using watches.
|
||||
func (a *DeploymentAdapter) WaitReady(ctx context.Context, namespace, name string, timeout time.Duration) error {
|
||||
return WaitForDeploymentReady(ctx, a.client, namespace, name, timeout)
|
||||
watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
|
||||
return a.client.AppsV1().Deployments(namespace).Watch(ctx, opts)
|
||||
}
|
||||
_, err := WatchUntil(ctx, watchFunc, name, IsReady(DeploymentIsReady), timeout)
|
||||
return err
|
||||
}
|
||||
|
||||
// WaitReloaded waits for the Deployment to have the reload annotation.
|
||||
// WaitReloaded waits for the Deployment to have the reload annotation using watches.
|
||||
func (a *DeploymentAdapter) WaitReloaded(ctx context.Context, namespace, name, annotationKey string, timeout time.Duration) (bool, error) {
|
||||
return WaitForDeploymentReloaded(ctx, a.client, namespace, name, annotationKey, timeout)
|
||||
watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
|
||||
return a.client.AppsV1().Deployments(namespace).Watch(ctx, opts)
|
||||
}
|
||||
_, err := WatchUntil(ctx, watchFunc, name, HasPodTemplateAnnotation(DeploymentPodTemplate, annotationKey), timeout)
|
||||
if errors.Is(err, ErrWatchTimeout) {
|
||||
return false, nil
|
||||
}
|
||||
return err == nil, err
|
||||
}
|
||||
|
||||
// WaitEnvVar waits for the Deployment to have a STAKATER_ env var.
|
||||
// WaitEnvVar waits for the Deployment to have a STAKATER_ env var using watches.
|
||||
func (a *DeploymentAdapter) WaitEnvVar(ctx context.Context, namespace, name, prefix string, timeout time.Duration) (bool, error) {
|
||||
return WaitForDeploymentEnvVar(ctx, a.client, namespace, name, prefix, timeout)
|
||||
watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
|
||||
return a.client.AppsV1().Deployments(namespace).Watch(ctx, opts)
|
||||
}
|
||||
_, err := WatchUntil(ctx, watchFunc, name, HasEnvVarPrefix(DeploymentContainers, prefix), timeout)
|
||||
if errors.Is(err, ErrWatchTimeout) {
|
||||
return false, nil
|
||||
}
|
||||
return err == nil, err
|
||||
}
|
||||
|
||||
// WaitPaused waits for the Deployment to have the paused annotation using watches.
|
||||
func (a *DeploymentAdapter) WaitPaused(ctx context.Context, namespace, name, annotationKey string, timeout time.Duration) (bool, error) {
|
||||
watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
|
||||
return a.client.AppsV1().Deployments(namespace).Watch(ctx, opts)
|
||||
}
|
||||
_, err := WatchUntil(ctx, watchFunc, name, HasAnnotation(DeploymentAnnotations, annotationKey), timeout)
|
||||
if errors.Is(err, ErrWatchTimeout) {
|
||||
return false, nil
|
||||
}
|
||||
return err == nil, err
|
||||
}
|
||||
|
||||
// WaitUnpaused waits for the Deployment to NOT have the paused annotation using watches.
|
||||
func (a *DeploymentAdapter) WaitUnpaused(ctx context.Context, namespace, name, annotationKey string, timeout time.Duration) (bool, error) {
|
||||
watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
|
||||
return a.client.AppsV1().Deployments(namespace).Watch(ctx, opts)
|
||||
}
|
||||
_, err := WatchUntil(ctx, watchFunc, name, NoAnnotation(DeploymentAnnotations, annotationKey), timeout)
|
||||
if errors.Is(err, ErrWatchTimeout) {
|
||||
return false, nil
|
||||
}
|
||||
return err == nil, err
|
||||
}
|
||||
|
||||
// SupportsEnvVarStrategy returns true as Deployments support env var reload strategy.
|
||||
|
||||
@@ -2,9 +2,13 @@ package utils
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
batchv1 "k8s.io/api/batch/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
)
|
||||
|
||||
@@ -36,18 +40,22 @@ func (a *JobAdapter) Delete(ctx context.Context, namespace, name string) error {
|
||||
return DeleteJob(ctx, a.client, namespace, name)
|
||||
}
|
||||
|
||||
// WaitReady waits for the Job to exist.
|
||||
// WaitReady waits for the Job to be ready (has active or succeeded pods) using watches.
|
||||
func (a *JobAdapter) WaitReady(ctx context.Context, namespace, name string, timeout time.Duration) error {
|
||||
return WaitForJobExists(ctx, a.client, namespace, name, timeout)
|
||||
watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
|
||||
return a.client.BatchV1().Jobs(namespace).Watch(ctx, opts)
|
||||
}
|
||||
_, err := WatchUntil(ctx, watchFunc, name, IsReady(JobIsReady), timeout)
|
||||
return err
|
||||
}
|
||||
|
||||
// WaitReloaded waits for the Job to be recreated (new UID).
|
||||
// WaitReloaded waits for the Job to be recreated (new UID) using watches.
|
||||
// For Jobs, Reloader recreates the Job rather than updating annotations.
|
||||
func (a *JobAdapter) WaitReloaded(ctx context.Context, namespace, name, annotationKey string, timeout time.Duration) (bool, error) {
|
||||
// For Jobs, we check if it was recreated by looking for a new UID
|
||||
// This requires storing the original UID before the test
|
||||
// For simplicity, we use the same pattern as other workloads
|
||||
// The test should verify recreation using WaitForJobRecreated instead
|
||||
// The test should verify recreation using WaitForRecreation instead
|
||||
return false, nil
|
||||
}
|
||||
|
||||
@@ -56,6 +64,21 @@ func (a *JobAdapter) WaitEnvVar(ctx context.Context, namespace, name, prefix str
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// WaitRecreated waits for the Job to be recreated with a different UID using watches.
|
||||
func (a *JobAdapter) WaitRecreated(ctx context.Context, namespace, name, originalUID string, timeout time.Duration) (string, bool, error) {
|
||||
watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
|
||||
return a.client.BatchV1().Jobs(namespace).Watch(ctx, opts)
|
||||
}
|
||||
job, err := WatchUntil(ctx, watchFunc, name, HasDifferentUID(JobUID, types.UID(originalUID)), timeout)
|
||||
if errors.Is(err, ErrWatchTimeout) {
|
||||
return "", false, nil
|
||||
}
|
||||
if err != nil {
|
||||
return "", false, err
|
||||
}
|
||||
return string(job.UID), true, nil
|
||||
}
|
||||
|
||||
// SupportsEnvVarStrategy returns false as Jobs don't support env var reload strategy.
|
||||
func (a *JobAdapter) SupportsEnvVarStrategy() bool {
|
||||
return false
|
||||
@@ -68,18 +91,13 @@ func (a *JobAdapter) RequiresSpecialHandling() bool {
|
||||
|
||||
// GetOriginalUID retrieves the current UID of the Job for recreation verification.
|
||||
func (a *JobAdapter) GetOriginalUID(ctx context.Context, namespace, name string) (string, error) {
|
||||
job, err := GetJob(ctx, a.client, namespace, name)
|
||||
job, err := a.client.BatchV1().Jobs(namespace).Get(ctx, name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return string(job.UID), nil
|
||||
}
|
||||
|
||||
// WaitForRecreation waits for the Job to be recreated with a new UID.
|
||||
func (a *JobAdapter) WaitForRecreation(ctx context.Context, namespace, name, originalUID string, timeout time.Duration) (string, bool, error) {
|
||||
return WaitForJobRecreated(ctx, a.client, namespace, name, originalUID, timeout)
|
||||
}
|
||||
|
||||
// buildJobOptions converts WorkloadConfig to JobOption slice.
|
||||
func buildJobOptions(cfg WorkloadConfig) []JobOption {
|
||||
return []JobOption{
|
||||
|
||||
@@ -2,13 +2,14 @@ package utils
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
openshiftappsv1 "github.com/openshift/api/apps/v1"
|
||||
openshiftclient "github.com/openshift/client-go/apps/clientset/versioned"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
)
|
||||
|
||||
// DCOption is a function that modifies a DeploymentConfig.
|
||||
@@ -47,19 +48,37 @@ func (a *DeploymentConfigAdapter) Delete(ctx context.Context, namespace, name st
|
||||
return a.openshiftClient.AppsV1().DeploymentConfigs(namespace).Delete(ctx, name, metav1.DeleteOptions{})
|
||||
}
|
||||
|
||||
// WaitReady waits for the DeploymentConfig to be ready.
|
||||
// WaitReady waits for the DeploymentConfig to be ready using watches.
|
||||
func (a *DeploymentConfigAdapter) WaitReady(ctx context.Context, namespace, name string, timeout time.Duration) error {
|
||||
return WaitForDeploymentConfigReady(ctx, a.openshiftClient, namespace, name, timeout)
|
||||
watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
|
||||
return a.openshiftClient.AppsV1().DeploymentConfigs(namespace).Watch(ctx, opts)
|
||||
}
|
||||
_, err := WatchUntil(ctx, watchFunc, name, IsReady(DeploymentConfigIsReady), timeout)
|
||||
return err
|
||||
}
|
||||
|
||||
// WaitReloaded waits for the DeploymentConfig to have the reload annotation.
|
||||
// WaitReloaded waits for the DeploymentConfig to have the reload annotation using watches.
|
||||
func (a *DeploymentConfigAdapter) WaitReloaded(ctx context.Context, namespace, name, annotationKey string, timeout time.Duration) (bool, error) {
|
||||
return WaitForDeploymentConfigReloaded(ctx, a.openshiftClient, namespace, name, annotationKey, timeout)
|
||||
watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
|
||||
return a.openshiftClient.AppsV1().DeploymentConfigs(namespace).Watch(ctx, opts)
|
||||
}
|
||||
_, err := WatchUntil(ctx, watchFunc, name, HasPodTemplateAnnotation(DeploymentConfigPodTemplate, annotationKey), timeout)
|
||||
if errors.Is(err, ErrWatchTimeout) {
|
||||
return false, nil
|
||||
}
|
||||
return err == nil, err
|
||||
}
|
||||
|
||||
// WaitEnvVar waits for the DeploymentConfig to have a STAKATER_ env var.
|
||||
// WaitEnvVar waits for the DeploymentConfig to have a STAKATER_ env var using watches.
|
||||
func (a *DeploymentConfigAdapter) WaitEnvVar(ctx context.Context, namespace, name, prefix string, timeout time.Duration) (bool, error) {
|
||||
return WaitForDeploymentConfigEnvVar(ctx, a.openshiftClient, namespace, name, prefix, timeout)
|
||||
watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
|
||||
return a.openshiftClient.AppsV1().DeploymentConfigs(namespace).Watch(ctx, opts)
|
||||
}
|
||||
_, err := WatchUntil(ctx, watchFunc, name, HasEnvVarPrefix(DeploymentConfigContainers, prefix), timeout)
|
||||
if errors.Is(err, ErrWatchTimeout) {
|
||||
return false, nil
|
||||
}
|
||||
return err == nil, err
|
||||
}
|
||||
|
||||
// SupportsEnvVarStrategy returns true as DeploymentConfigs support env var reload strategy.
|
||||
@@ -117,47 +136,3 @@ func buildDeploymentConfigOptions(cfg WorkloadConfig) []DCOption {
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// WaitForDeploymentConfigReady waits for a DeploymentConfig to be ready using typed client.
|
||||
func WaitForDeploymentConfigReady(ctx context.Context, client openshiftclient.Interface, namespace, name string, timeout time.Duration) error {
|
||||
return wait.PollUntilContextTimeout(ctx, DefaultInterval, timeout, true, func(ctx context.Context) (bool, error) {
|
||||
dc, err := client.AppsV1().DeploymentConfigs(namespace).Get(ctx, name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if dc.Spec.Replicas > 0 && dc.Status.ReadyReplicas == dc.Spec.Replicas {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return false, nil
|
||||
})
|
||||
}
|
||||
|
||||
// WaitForDeploymentConfigReloaded waits for a DeploymentConfig's pod template to have the reloader annotation.
|
||||
func WaitForDeploymentConfigReloaded(ctx context.Context, client openshiftclient.Interface, namespace, name, annotationKey string, timeout time.Duration) (bool, error) {
|
||||
return WaitForAnnotation(ctx, func(ctx context.Context) (map[string]string, error) {
|
||||
dc, err := client.AppsV1().DeploymentConfigs(namespace).Get(ctx, name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if dc.Spec.Template != nil {
|
||||
return dc.Spec.Template.Annotations, nil
|
||||
}
|
||||
return nil, nil
|
||||
}, annotationKey, timeout)
|
||||
}
|
||||
|
||||
// WaitForDeploymentConfigEnvVar waits for a DeploymentConfig's container to have an env var with the given prefix.
|
||||
func WaitForDeploymentConfigEnvVar(ctx context.Context, client openshiftclient.Interface, namespace, name, prefix string, timeout time.Duration) (bool, error) {
|
||||
return WaitForEnvVarPrefix(ctx, func(ctx context.Context) ([]corev1.Container, error) {
|
||||
dc, err := client.AppsV1().DeploymentConfigs(namespace).Get(ctx, name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if dc.Spec.Template != nil {
|
||||
return dc.Spec.Template.Spec.Containers, nil
|
||||
}
|
||||
return nil, nil
|
||||
}, prefix, timeout)
|
||||
}
|
||||
|
||||
@@ -2,9 +2,12 @@ package utils
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
)
|
||||
|
||||
@@ -35,19 +38,37 @@ func (a *StatefulSetAdapter) Delete(ctx context.Context, namespace, name string)
|
||||
return DeleteStatefulSet(ctx, a.client, namespace, name)
|
||||
}
|
||||
|
||||
// WaitReady waits for the StatefulSet to be ready.
|
||||
// WaitReady waits for the StatefulSet to be ready using watches.
|
||||
func (a *StatefulSetAdapter) WaitReady(ctx context.Context, namespace, name string, timeout time.Duration) error {
|
||||
return WaitForStatefulSetReady(ctx, a.client, namespace, name, timeout)
|
||||
watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
|
||||
return a.client.AppsV1().StatefulSets(namespace).Watch(ctx, opts)
|
||||
}
|
||||
_, err := WatchUntil(ctx, watchFunc, name, IsReady(StatefulSetIsReady), timeout)
|
||||
return err
|
||||
}
|
||||
|
||||
// WaitReloaded waits for the StatefulSet to have the reload annotation.
|
||||
// WaitReloaded waits for the StatefulSet to have the reload annotation using watches.
|
||||
func (a *StatefulSetAdapter) WaitReloaded(ctx context.Context, namespace, name, annotationKey string, timeout time.Duration) (bool, error) {
|
||||
return WaitForStatefulSetReloaded(ctx, a.client, namespace, name, annotationKey, timeout)
|
||||
watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
|
||||
return a.client.AppsV1().StatefulSets(namespace).Watch(ctx, opts)
|
||||
}
|
||||
_, err := WatchUntil(ctx, watchFunc, name, HasPodTemplateAnnotation(StatefulSetPodTemplate, annotationKey), timeout)
|
||||
if errors.Is(err, ErrWatchTimeout) {
|
||||
return false, nil
|
||||
}
|
||||
return err == nil, err
|
||||
}
|
||||
|
||||
// WaitEnvVar waits for the StatefulSet to have a STAKATER_ env var.
|
||||
// WaitEnvVar waits for the StatefulSet to have a STAKATER_ env var using watches.
|
||||
func (a *StatefulSetAdapter) WaitEnvVar(ctx context.Context, namespace, name, prefix string, timeout time.Duration) (bool, error) {
|
||||
return WaitForStatefulSetEnvVar(ctx, a.client, namespace, name, prefix, timeout)
|
||||
watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
|
||||
return a.client.AppsV1().StatefulSets(namespace).Watch(ctx, opts)
|
||||
}
|
||||
_, err := WatchUntil(ctx, watchFunc, name, HasEnvVarPrefix(StatefulSetContainers, prefix), timeout)
|
||||
if errors.Is(err, ErrWatchTimeout) {
|
||||
return false, nil
|
||||
}
|
||||
return err == nil, err
|
||||
}
|
||||
|
||||
// SupportsEnvVarStrategy returns true as StatefulSets support env var reload strategy.
|
||||
|
||||
Reference in New Issue
Block a user