Files
Reloader/test/e2e/utils/watch.go
faizanahmad055 b5c8705395 Refactor code
Signed-off-by: faizanahmad055 <faizan.ahmad55@outlook.com>
2026-05-11 16:18:53 +02:00

232 lines
6.2 KiB
Go

package utils
import (
"context"
"errors"
"fmt"
"time"
. "github.com/onsi/ginkgo/v2" //nolint:revive,staticcheck
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
)
// Timeout constants for watch operations.
const (
DefaultInterval = 1 * time.Second // Polling interval
ShortTimeout = 5 * time.Second // Quick checks
NegativeTestWait = 3 * time.Second // Wait before checking negative conditions
WorkloadReadyTimeout = 60 * time.Second // Workload readiness timeout (buffer for CI)
ReloadTimeout = 15 * time.Second // Time for reload to trigger
)
// ErrWatchTimeout is returned when a watch times out waiting for condition.
var ErrWatchTimeout = errors.New("watch timeout waiting for condition")
// ErrWatchError is returned when the watch receives an error event from the API server.
var ErrWatchError = errors.New("watch received error event from API server")
// ErrUnsupportedOperation is returned when an operation is not supported for a workload type.
var ErrUnsupportedOperation = errors.New("operation not supported for this workload type")
// HandleWatchResult converts watch errors to the standard (bool, error) return pattern.
// Returns (false, nil) for timeout, (true, nil) for success, (false, err) for other errors.
func HandleWatchResult(err error) (bool, error) {
if errors.Is(err, ErrWatchTimeout) {
return false, nil
}
return err == nil, err
}
// WatchFunc is a function that starts a watch for a specific resource.
type WatchFunc func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
// Condition is a function that checks if the desired state is reached.
type Condition[T any] func(T) bool
// WatchUntil watches a resource until the condition is met or timeout occurs.
// It handles watch reconnection automatically on errors.
// If name is empty, it watches all resources and returns the first matching one.
//
// ResourceVersion "0" is used so the API server sends the current state as an
// initial ADDED event before streaming live updates, preventing the TOCTOU window
// where a reload that completes before WatchUntil is called would be missed.
func WatchUntil[T runtime.Object](ctx context.Context, watchFunc WatchFunc, name string, condition Condition[T], timeout time.Duration) (T, error) {
var zero T
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
opts := metav1.ListOptions{
Watch: true,
ResourceVersion: "0", // receive current state as initial ADDED event
}
if name != "" {
opts.FieldSelector = fields.OneTermEqualSelector("metadata.name", name).String()
}
const maxReconnectDelay = 5 * time.Second
reconnectDelay := 100 * time.Millisecond
for {
select {
case <-ctx.Done():
return zero, ErrWatchTimeout
default:
}
result, done, err := watchOnce(ctx, watchFunc, opts, condition)
if done {
return result, err
}
select {
case <-ctx.Done():
return zero, ErrWatchTimeout
case <-time.After(reconnectDelay):
if reconnectDelay < maxReconnectDelay {
reconnectDelay *= 2
}
}
}
}
// watchOnce starts a single watch and processes events until condition met or watch ends.
func watchOnce[T runtime.Object](
ctx context.Context,
watchFunc WatchFunc,
opts metav1.ListOptions,
condition Condition[T],
) (T, bool, error) {
var zero T
watcher, err := watchFunc(ctx, opts)
if err != nil {
// Log and signal retry; transient API errors are expected during CI.
_, _ = fmt.Fprintf(GinkgoWriter, "watch: failed to start watch: %v — retrying\n", err)
return zero, false, nil
}
defer watcher.Stop()
for {
select {
case <-ctx.Done():
return zero, true, ErrWatchTimeout
case event, ok := <-watcher.ResultChan():
if !ok {
return zero, false, nil
}
switch event.Type {
case watch.Added, watch.Modified:
obj, ok := event.Object.(T)
if !ok {
continue
}
if condition(obj) {
return obj, true, nil
}
case watch.Deleted:
continue
case watch.Error:
_, _ = fmt.Fprintf(GinkgoWriter, "watch: received error event: %v — retrying\n", event.Object)
return zero, false, nil
}
}
}
}
// WatchUntilDeleted watches until the resource is deleted or timeout occurs.
func WatchUntilDeleted(
ctx context.Context,
watchFunc WatchFunc,
name string,
timeout time.Duration,
) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
opts := metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String(),
Watch: true,
ResourceVersion: "0",
}
const maxReconnectDelay = 5 * time.Second
reconnectDelay := 100 * time.Millisecond
for {
select {
case <-ctx.Done():
return ErrWatchTimeout
default:
}
deleted, err := watchDeleteOnce(ctx, watchFunc, opts)
if deleted {
return err
}
select {
case <-ctx.Done():
return ErrWatchTimeout
case <-time.After(reconnectDelay):
if reconnectDelay < maxReconnectDelay {
reconnectDelay *= 2
}
}
}
}
func watchDeleteOnce(
ctx context.Context,
watchFunc WatchFunc,
opts metav1.ListOptions,
) (bool, error) {
watcher, err := watchFunc(ctx, opts)
if err != nil {
_, _ = fmt.Fprintf(GinkgoWriter, "watch: failed to start delete watch: %v — retrying\n", err)
return false, nil
}
defer watcher.Stop()
for {
select {
case <-ctx.Done():
return true, ErrWatchTimeout
case event, ok := <-watcher.ResultChan():
if !ok {
return false, nil
}
if event.Type == watch.Deleted {
return true, nil
}
if event.Type == watch.Error {
_, _ = fmt.Fprintf(GinkgoWriter, "watch: received error event during delete watch: %v — retrying\n", event.Object)
return false, nil
}
}
}
}
// WatchUntilDifferentUID watches until the resource has a different UID (recreated).
func WatchUntilDifferentUID[T runtime.Object](
ctx context.Context,
watchFunc WatchFunc,
name string,
originalUID string,
timeout time.Duration,
getUID func(T) string,
) (T, bool, error) {
var zero T
result, err := WatchUntil(ctx, watchFunc, name, func(obj T) bool {
return getUID(obj) != originalUID
}, timeout)
if errors.Is(err, ErrWatchTimeout) {
return zero, false, nil
}
if err != nil {
return zero, false, err
}
return result, true, nil
}