From 568b1c578baa1aa00fba663a3624935e14d3de20 Mon Sep 17 00:00:00 2001 From: Brian Kane Date: Mon, 19 Jan 2026 11:18:10 +0000 Subject: [PATCH] Feat: 7019 Support re-running workflows and ensure passed data is updated during dispatch (#7025) Signed-off-by: Brian Kane --- apis/core.oam.dev/common/types.go | 6 + .../common/zz_generated.deepcopy.go | 4 + .../core.oam.dev_applicationrevisions.yaml | 7 + .../crds/core.oam.dev_applications.yaml | 7 + .../defwithtemplate/restart-workflow.yaml | 115 +++++ makefiles/dependency.mk | 2 +- .../application/application_controller.go | 8 +- .../v1beta1/application/dispatch_test.go | 218 ++++++++ .../v1beta1/application/dispatcher.go | 58 ++- .../v1beta1/application/generator.go | 86 +--- .../v1beta1/application/generator_test.go | 411 ++++++++++++++- .../v1beta1/application/workflow.go | 206 ++++++++ .../v1beta1/application/workflow_test.go | 467 ++++++++++++++++++ pkg/oam/labels.go | 10 + .../workflowstep/restart-workflow.eg.md | 81 +++ .../workflowstep/restart-workflow.cue | 118 +++++ 16 files changed, 1730 insertions(+), 74 deletions(-) create mode 100644 charts/vela-core/templates/defwithtemplate/restart-workflow.yaml create mode 100644 pkg/controller/core.oam.dev/v1beta1/application/workflow.go create mode 100644 references/docgen/def-doc/workflowstep/restart-workflow.eg.md create mode 100644 vela-templates/definitions/internal/workflowstep/restart-workflow.cue diff --git a/apis/core.oam.dev/common/types.go b/apis/core.oam.dev/common/types.go index 43444f3c9..08e7af890 100644 --- a/apis/core.oam.dev/common/types.go +++ b/apis/core.oam.dev/common/types.go @@ -225,6 +225,12 @@ type AppStatus struct { // Workflow record the status of workflow Workflow *WorkflowStatus `json:"workflow,omitempty"` + // WorkflowRestartScheduledAt schedules a workflow restart at the specified time. + // This field is automatically set when the app.oam.dev/restart-workflow annotation is present, + // and is cleared after the restart is triggered. Use RFC3339 format or set to current time for immediate restart. + // +optional + WorkflowRestartScheduledAt *metav1.Time `json:"workflowRestartScheduledAt,omitempty"` + // LatestRevision of the application configuration it generates // +optional LatestRevision *Revision `json:"latestRevision,omitempty"` diff --git a/apis/core.oam.dev/common/zz_generated.deepcopy.go b/apis/core.oam.dev/common/zz_generated.deepcopy.go index cd70665ae..e2ee1e118 100644 --- a/apis/core.oam.dev/common/zz_generated.deepcopy.go +++ b/apis/core.oam.dev/common/zz_generated.deepcopy.go @@ -48,6 +48,10 @@ func (in *AppStatus) DeepCopyInto(out *AppStatus) { *out = new(WorkflowStatus) (*in).DeepCopyInto(*out) } + if in.WorkflowRestartScheduledAt != nil { + in, out := &in.WorkflowRestartScheduledAt, &out.WorkflowRestartScheduledAt + *out = (*in).DeepCopy() + } if in.LatestRevision != nil { in, out := &in.LatestRevision, &out.LatestRevision *out = new(Revision) diff --git a/charts/vela-core/crds/core.oam.dev_applicationrevisions.yaml b/charts/vela-core/crds/core.oam.dev_applicationrevisions.yaml index cc1541a9d..4ab406158 100644 --- a/charts/vela-core/crds/core.oam.dev_applicationrevisions.yaml +++ b/charts/vela-core/crds/core.oam.dev_applicationrevisions.yaml @@ -817,6 +817,13 @@ spec: - suspend - terminated type: object + workflowRestartScheduledAt: + description: |- + WorkflowRestartScheduledAt schedules a workflow restart at the specified time. + This field is automatically set when the app.oam.dev/restart-workflow annotation is present, + and is cleared after the restart is triggered. Use RFC3339 format or set to current time for immediate restart. + format: date-time + type: string type: object type: object componentDefinitions: diff --git a/charts/vela-core/crds/core.oam.dev_applications.yaml b/charts/vela-core/crds/core.oam.dev_applications.yaml index ad7d95c3b..5e0e06990 100644 --- a/charts/vela-core/crds/core.oam.dev_applications.yaml +++ b/charts/vela-core/crds/core.oam.dev_applications.yaml @@ -760,6 +760,13 @@ spec: - suspend - terminated type: object + workflowRestartScheduledAt: + description: |- + WorkflowRestartScheduledAt schedules a workflow restart at the specified time. + This field is automatically set when the app.oam.dev/restart-workflow annotation is present, + and is cleared after the restart is triggered. Use RFC3339 format or set to current time for immediate restart. + format: date-time + type: string type: object type: object served: true diff --git a/charts/vela-core/templates/defwithtemplate/restart-workflow.yaml b/charts/vela-core/templates/defwithtemplate/restart-workflow.yaml new file mode 100644 index 000000000..193ca1bd3 --- /dev/null +++ b/charts/vela-core/templates/defwithtemplate/restart-workflow.yaml @@ -0,0 +1,115 @@ +# Code generated by KubeVela templates. DO NOT EDIT. Please edit the original cue file. +# Definition source cue file: vela-templates/definitions/internal/restart-workflow.cue +apiVersion: core.oam.dev/v1beta1 +kind: WorkflowStepDefinition +metadata: + annotations: + custom.definition.oam.dev/category: Workflow Control + definition.oam.dev/description: Schedule the current Application's workflow to restart at a specific time, after a duration, or at recurring intervals + labels: + custom.definition.oam.dev/scope: Application + name: restart-workflow + namespace: {{ include "systemDefinitionNamespace" . }} +spec: + schematic: + cue: + template: | + import "vela/kube" + import "vela/builtin" + + // Count how many parameters are provided + _paramCount: len([ + if parameter.at != _|_ {1}, + if parameter.after != _|_ {1}, + if parameter.every != _|_ {1}, + ]) + + // Fail if not exactly one parameter is provided + if _paramCount != 1 { + validateParams: builtin.#Fail & { + $params: message: "Exactly one of 'at', 'after', or 'every' parameters must be specified (found \(_paramCount))" + } + } + + // Build the bash script to calculate annotation value + _script: string + if parameter.at != _|_ { + // Fixed timestamp mode - use as-is + _script: """ + VALUE="\(parameter.at)" + kubectl annotate application \(context.name) -n \(context.namespace) app.oam.dev/restart-workflow="$VALUE" --overwrite + """ + } + if parameter.after != _|_ { + // Relative time mode - calculate timestamp using date + // Convert duration format (5m, 1h, 2d) to seconds, then calculate + _script: """ + DURATION="\(parameter.after)" + + # Convert duration to seconds + SECONDS=0 + if [[ "$DURATION" =~ ^([0-9]+)m$ ]]; then + SECONDS=$((${BASH_REMATCH[1]} * 60)) + elif [[ "$DURATION" =~ ^([0-9]+)h$ ]]; then + SECONDS=$((${BASH_REMATCH[1]} * 3600)) + elif [[ "$DURATION" =~ ^([0-9]+)d$ ]]; then + SECONDS=$((${BASH_REMATCH[1]} * 86400)) + else + echo "ERROR: Invalid duration format: $DURATION (expected format: 5m, 1h, or 2d)" + exit 1 + fi + + # Calculate future timestamp using seconds offset + VALUE=$(date -u -d "@$(($(date +%s) + SECONDS))" +%Y-%m-%dT%H:%M:%SZ) + echo "Calculated timestamp for after '$DURATION' ($SECONDS seconds): $VALUE" + kubectl annotate application \(context.name) -n \(context.namespace) app.oam.dev/restart-workflow="$VALUE" --overwrite + """ + } + if parameter.every != _|_ { + // Recurring interval mode - pass duration directly + _script: """ + VALUE="\(parameter.every)" + kubectl annotate application \(context.name) -n \(context.namespace) app.oam.dev/restart-workflow="$VALUE" --overwrite + """ + } + + // Run kubectl to annotate the Application + job: kube.#Apply & { + $params: value: { + apiVersion: "batch/v1" + kind: "Job" + metadata: { + name: "\(context.name)-restart-workflow-\(context.stepSessionID)" + namespace: "vela-system" + } + spec: { + backoffLimit: 3 + template: spec: { + containers: [{ + name: "kubectl-annotate" + image: "bitnami/kubectl:latest" + command: ["/bin/sh", "-c"] + args: [_script] + }] + restartPolicy: "Never" + serviceAccountName: "kubevela-vela-core" + } + } + } + } + + wait: builtin.#ConditionalWait & { + if job.$returns.value.status != _|_ if job.$returns.value.status.succeeded != _|_ { + $params: continue: job.$returns.value.status.succeeded > 0 + } + } + + parameter: { + // +usage=Schedule restart at a specific RFC3339 timestamp (e.g., "2025-01-15T14:30:00Z") + at?: string & =~"^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(\\.[0-9]+)?(Z|[+-][0-9]{2}:[0-9]{2})$" + // +usage=Schedule restart after a relative duration from now (e.g., "5m", "1h", "2d") + after?: string & =~"^[0-9]+(m|h|d)$" + // +usage=Schedule recurring restarts every specified duration (e.g., "5m", "1h", "24h") + every?: string & =~"^[0-9]+(m|h|d)$" + } + diff --git a/makefiles/dependency.mk b/makefiles/dependency.mk index 6e530f60c..e27b1c6f5 100644 --- a/makefiles/dependency.mk +++ b/makefiles/dependency.mk @@ -106,7 +106,7 @@ tidy: .PHONY: sync-crds PKG_MODULE = github.com/kubevela/pkg # fetch common crds from the pkg repo instead of generating locally sync-crds: ## Copy CRD from pinned module version in go.mod - @moddir=$$(go list -m -f '{{.Dir}}' $(PKG_MODULE) 2>/dev/null); \ + @moddir=$$(go mod download -json $(PKG_MODULE) 2>/dev/null | grep '"Dir"' | cut -d'"' -f4); \ mkdir -p config/crd/base; \ for file in $(COMMON_CRD_FILES); do \ src="$$moddir/crds/$$file"; \ diff --git a/pkg/controller/core.oam.dev/v1beta1/application/application_controller.go b/pkg/controller/core.oam.dev/v1beta1/application/application_controller.go index 94c583fad..50f032f5f 100644 --- a/pkg/controller/core.oam.dev/v1beta1/application/application_controller.go +++ b/pkg/controller/core.oam.dev/v1beta1/application/application_controller.go @@ -1,4 +1,5 @@ /* + /* Copyright 2021 The KubeVela Authors. Licensed under the Apache License, Version 2.0 (the "License"); @@ -142,6 +143,10 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu if err != nil { return r.endWithNegativeCondition(logCtx, app, condition.ReconcileError(err), common.ApplicationStarting) } + + // Handle workflow restart requests - converts annotation to status field + r.handleWorkflowRestartAnnotation(ctx, app) + endReconcile, result, err := r.handleFinalizers(logCtx, app, handler) if err != nil { if app.GetDeletionTimestamp() == nil { @@ -190,7 +195,8 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu app.Status.SetConditions(condition.ReadyCondition(common.PolicyCondition.String())) r.Recorder.Event(app, event.Normal(velatypes.ReasonPolicyGenerated, velatypes.MessagePolicyGenerated)) - handler.CheckWorkflowRestart(logCtx, app) + // Check if workflow needs restart (combines scheduled restart + revision-based restart) + r.checkWorkflowRestart(logCtx, app, handler) workflowInstance, runners, err := handler.GenerateApplicationSteps(logCtx, app, appParser, appFile) if err != nil { diff --git a/pkg/controller/core.oam.dev/v1beta1/application/dispatch_test.go b/pkg/controller/core.oam.dev/v1beta1/application/dispatch_test.go index ba5a287bc..88a4d331a 100644 --- a/pkg/controller/core.oam.dev/v1beta1/application/dispatch_test.go +++ b/pkg/controller/core.oam.dev/v1beta1/application/dispatch_test.go @@ -18,13 +18,17 @@ package application import ( "context" + "encoding/json" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "github.com/oam-dev/kubevela/apis/core.oam.dev/common" "github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1" velatypes "github.com/oam-dev/kubevela/apis/types" + "github.com/oam-dev/kubevela/pkg/appfile" ) var _ = Describe("Test dispatch stage", func() { @@ -73,3 +77,217 @@ var _ = Describe("Test dispatch stage", func() { Expect(stage).Should(BeEquivalentTo(DefaultDispatch)) }) }) + +var _ = Describe("Test componentPropertiesChanged", func() { + It("should return true when component not in revision (first deployment)", func() { + comp := &appfile.Component{ + Name: "test-component", + Type: "webservice", + Params: map[string]interface{}{ + "image": "nginx:latest", + }, + } + appRev := &v1beta1.ApplicationRevision{ + Spec: v1beta1.ApplicationRevisionSpec{ + ApplicationRevisionCompressibleFields: v1beta1.ApplicationRevisionCompressibleFields{ + Application: v1beta1.Application{ + Spec: v1beta1.ApplicationSpec{ + Components: []common.ApplicationComponent{}, + }, + }, + }, + }, + } + Expect(componentPropertiesChanged(comp, appRev)).Should(BeTrue()) + }) + + It("should return false when component properties unchanged", func() { + properties := map[string]interface{}{ + "image": "nginx:latest", + "port": 80, + } + propertiesJSON, _ := json.Marshal(properties) + + comp := &appfile.Component{ + Name: "test-component", + Type: "webservice", + Params: properties, + } + appRev := &v1beta1.ApplicationRevision{ + Spec: v1beta1.ApplicationRevisionSpec{ + ApplicationRevisionCompressibleFields: v1beta1.ApplicationRevisionCompressibleFields{ + Application: v1beta1.Application{ + Spec: v1beta1.ApplicationSpec{ + Components: []common.ApplicationComponent{ + { + Name: "test-component", + Type: "webservice", + Properties: &runtime.RawExtension{ + Raw: propertiesJSON, + }, + }, + }, + }, + }, + }, + }, + } + Expect(componentPropertiesChanged(comp, appRev)).Should(BeFalse()) + }) + + It("should return true when component properties changed", func() { + oldProperties := map[string]interface{}{ + "image": "nginx:1.0", + "port": 80, + } + oldPropertiesJSON, _ := json.Marshal(oldProperties) + + newProperties := map[string]interface{}{ + "image": "nginx:2.0", + "port": 80, + } + + comp := &appfile.Component{ + Name: "test-component", + Type: "webservice", + Params: newProperties, + } + appRev := &v1beta1.ApplicationRevision{ + Spec: v1beta1.ApplicationRevisionSpec{ + ApplicationRevisionCompressibleFields: v1beta1.ApplicationRevisionCompressibleFields{ + Application: v1beta1.Application{ + Spec: v1beta1.ApplicationSpec{ + Components: []common.ApplicationComponent{ + { + Name: "test-component", + Type: "webservice", + Properties: &runtime.RawExtension{ + Raw: oldPropertiesJSON, + }, + }, + }, + }, + }, + }, + }, + } + Expect(componentPropertiesChanged(comp, appRev)).Should(BeTrue()) + }) + + It("should return true when component type changed", func() { + properties := map[string]interface{}{ + "image": "nginx:latest", + } + propertiesJSON, _ := json.Marshal(properties) + + comp := &appfile.Component{ + Name: "test-component", + Type: "worker", + Params: properties, + } + appRev := &v1beta1.ApplicationRevision{ + Spec: v1beta1.ApplicationRevisionSpec{ + ApplicationRevisionCompressibleFields: v1beta1.ApplicationRevisionCompressibleFields{ + Application: v1beta1.Application{ + Spec: v1beta1.ApplicationSpec{ + Components: []common.ApplicationComponent{ + { + Name: "test-component", + Type: "webservice", + Properties: &runtime.RawExtension{ + Raw: propertiesJSON, + }, + }, + }, + }, + }, + }, + }, + } + Expect(componentPropertiesChanged(comp, appRev)).Should(BeTrue()) + }) + + It("should return false when both properties are nil", func() { + comp := &appfile.Component{ + Name: "test-component", + Type: "webservice", + Params: nil, + } + appRev := &v1beta1.ApplicationRevision{ + Spec: v1beta1.ApplicationRevisionSpec{ + ApplicationRevisionCompressibleFields: v1beta1.ApplicationRevisionCompressibleFields{ + Application: v1beta1.Application{ + Spec: v1beta1.ApplicationSpec{ + Components: []common.ApplicationComponent{ + { + Name: "test-component", + Type: "webservice", + Properties: nil, + }, + }, + }, + }, + }, + }, + } + Expect(componentPropertiesChanged(comp, appRev)).Should(BeFalse()) + }) + + It("should return true when properties removed (nil current, non-empty previous)", func() { + comp := &appfile.Component{ + Name: "test-component", + Type: "webservice", + Params: nil, // Properties removed + } + appRev := &v1beta1.ApplicationRevision{ + Spec: v1beta1.ApplicationRevisionSpec{ + ApplicationRevisionCompressibleFields: v1beta1.ApplicationRevisionCompressibleFields{ + Application: v1beta1.Application{ + Spec: v1beta1.ApplicationSpec{ + Components: []common.ApplicationComponent{ + { + Name: "test-component", + Type: "webservice", + Properties: &runtime.RawExtension{ + Raw: []byte(`{"image":"nginx:1.0","port":80}`), + }, + }, + }, + }, + }, + }, + }, + } + Expect(componentPropertiesChanged(comp, appRev)).Should(BeTrue()) + }) + + It("should return true on JSON unmarshal error (conservative)", func() { + comp := &appfile.Component{ + Name: "test-component", + Type: "webservice", + Params: map[string]interface{}{ + "image": "nginx:latest", + }, + } + appRev := &v1beta1.ApplicationRevision{ + Spec: v1beta1.ApplicationRevisionSpec{ + ApplicationRevisionCompressibleFields: v1beta1.ApplicationRevisionCompressibleFields{ + Application: v1beta1.Application{ + Spec: v1beta1.ApplicationSpec{ + Components: []common.ApplicationComponent{ + { + Name: "test-component", + Type: "webservice", + Properties: &runtime.RawExtension{ + Raw: []byte("invalid json"), + }, + }, + }, + }, + }, + }, + }, + } + Expect(componentPropertiesChanged(comp, appRev)).Should(BeTrue()) + }) +}) diff --git a/pkg/controller/core.oam.dev/v1beta1/application/dispatcher.go b/pkg/controller/core.oam.dev/v1beta1/application/dispatcher.go index ed0914d3f..4223ad5d1 100644 --- a/pkg/controller/core.oam.dev/v1beta1/application/dispatcher.go +++ b/pkg/controller/core.oam.dev/v1beta1/application/dispatcher.go @@ -18,10 +18,12 @@ package application import ( "context" + "encoding/json" "sort" "strings" "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "sigs.k8s.io/controller-runtime/pkg/client" @@ -144,7 +146,19 @@ func (h *AppHandler) generateDispatcher(appRev *v1beta1.ApplicationRevision, rea isAutoUpdateEnabled = true } - if isHealth, err := dispatcher.healthCheck(ctx, comp, appRev); !isHealth || err != nil || (!comp.SkipApplyWorkload && isAutoUpdateEnabled) { + isHealth, err := dispatcher.healthCheck(ctx, comp, appRev) + + // Check if component properties have changed (only for healthy components) + // Note: componentPropertiesChanged handles nil comp.Params correctly, so we don't check it here + propertiesChanged := false + if isHealth && err == nil { + propertiesChanged = componentPropertiesChanged(comp, appRev) + } + + // Dispatch if: unhealthy, health error, properties changed, or auto-update enabled + requiresDispatch := !isHealth || err != nil || propertiesChanged || (!comp.SkipApplyWorkload && isAutoUpdateEnabled) + + if requiresDispatch { if err := h.Dispatch(ctx, h.Client, clusterName, common.WorkflowResourceCreator, dispatchManifests...); err != nil { return false, errors.WithMessage(err, "Dispatch") } @@ -235,3 +249,45 @@ func getTraitDispatchStage(client client.Client, traitType string, appRev *v1bet } return stageType, nil } + +// componentPropertiesChanged compares current component properties with the last +// applied version in ApplicationRevision. Returns true if properties have changed +func componentPropertiesChanged(comp *appfile.Component, appRev *v1beta1.ApplicationRevision) bool { + var revComponent *common.ApplicationComponent + for i := range appRev.Spec.Application.Spec.Components { + if appRev.Spec.Application.Spec.Components[i].Name == comp.Name { + revComponent = &appRev.Spec.Application.Spec.Components[i] + break + } + } + + // First deployment or new component + if revComponent == nil { + return true + } + + // Type changed + if revComponent.Type != comp.Type { + return true + } + + // Compare properties as JSON to handle type normalization (e.g. int vs float64) + currentProperties := comp.Params + if currentProperties == nil { + currentProperties = make(map[string]interface{}) + } + + currentJSON, err := json.Marshal(currentProperties) + if err != nil { + return true + } + + var revJSON []byte + if revComponent.Properties != nil && len(revComponent.Properties.Raw) > 0 { + revJSON = revComponent.Properties.Raw + } else { + revJSON, _ = json.Marshal(map[string]interface{}{}) + } + + return !equality.Semantic.DeepEqual(currentJSON, revJSON) +} diff --git a/pkg/controller/core.oam.dev/v1beta1/application/generator.go b/pkg/controller/core.oam.dev/v1beta1/application/generator.go index 057182a7a..1e1102080 100644 --- a/pkg/controller/core.oam.dev/v1beta1/application/generator.go +++ b/pkg/controller/core.oam.dev/v1beta1/application/generator.go @@ -40,7 +40,6 @@ import ( wfTypes "github.com/kubevela/workflow/pkg/types" "github.com/oam-dev/kubevela/apis/core.oam.dev/common" - "github.com/oam-dev/kubevela/apis/core.oam.dev/condition" "github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1" "github.com/oam-dev/kubevela/apis/types" "github.com/oam-dev/kubevela/pkg/appfile" @@ -129,60 +128,28 @@ func (h *AppHandler) GenerateApplicationSteps(ctx monitorContext.Context, return instance, runners, nil } -// CheckWorkflowRestart check if application workflow need restart and return the desired -// rev to be set in status -// 1. If workflow status is empty, it means no previous running record, the -// workflow will restart (cold start) -// 2. If workflow status is not empty, and publishVersion is set, the desired -// rev will be the publishVersion -// 3. If workflow status is not empty, the desired rev will be the -// ApplicationRevision name. For backward compatibility, the legacy style -// : will be recognized and reduced into -func (h *AppHandler) CheckWorkflowRestart(ctx monitorContext.Context, app *v1beta1.Application) { - desiredRev, currentRev := h.currentAppRev.Name, "" - if app.Status.Workflow != nil { - currentRev = app.Status.Workflow.AppRevision - } - if metav1.HasAnnotation(app.ObjectMeta, oam.AnnotationPublishVersion) { - desiredRev = app.GetAnnotations()[oam.AnnotationPublishVersion] - } else { // nolint - // backward compatibility - // legacy versions use : as currentRev, extract - if idx := strings.LastIndexAny(currentRev, ":"); idx >= 0 { - currentRev = currentRev[:idx] - } - } - if currentRev != "" && desiredRev == currentRev { - return - } - // record in revision - if h.latestAppRev != nil && h.latestAppRev.Status.Workflow == nil && app.Status.Workflow != nil { - app.Status.Workflow.Terminated = true - app.Status.Workflow.Finished = true - if app.Status.Workflow.EndTime.IsZero() { - app.Status.Workflow.EndTime = metav1.Now() - } - h.UpdateApplicationRevisionStatus(ctx, h.latestAppRev, app.Status.Workflow) +// copyWorkflowStatusToInstance copies Application workflow status to WorkflowInstance status. +// Returns a WorkflowRunStatus with Mode set and other fields copied from app.Status.Workflow if it exists. +func copyWorkflowStatusToInstance(app *v1beta1.Application, mode *workflowv1alpha1.WorkflowExecuteMode) workflowv1alpha1.WorkflowRunStatus { + status := workflowv1alpha1.WorkflowRunStatus{ + Mode: *mode, } - // clean recorded resources info. - app.Status.Services = nil - app.Status.AppliedResources = nil + // Copy status fields if workflow status exists (may be nil on first run) + if wfStatus := app.Status.Workflow; wfStatus != nil { + status.Phase = wfStatus.Phase + status.Message = wfStatus.Message + status.Suspend = wfStatus.Suspend + status.SuspendState = wfStatus.SuspendState + status.Terminated = wfStatus.Terminated + status.Finished = wfStatus.Finished + status.ContextBackend = wfStatus.ContextBackend + status.Steps = wfStatus.Steps + status.StartTime = wfStatus.StartTime + status.EndTime = wfStatus.EndTime + } - // clean conditions after render - var reservedConditions []condition.Condition - for i, cond := range app.Status.Conditions { - condTpy, err := common.ParseApplicationConditionType(string(cond.Type)) - if err == nil { - if condTpy <= common.RenderCondition { - reservedConditions = append(reservedConditions, app.Status.Conditions[i]) - } - } - } - app.Status.Conditions = reservedConditions - app.Status.Workflow = &common.WorkflowStatus{ - AppRevision: desiredRev, - } + return status } func generateWorkflowInstance(af *appfile.Appfile, app *v1beta1.Application) *wfTypes.WorkflowInstance { @@ -207,20 +174,7 @@ func generateWorkflowInstance(af *appfile.Appfile, app *v1beta1.Application) *wf Steps: af.WorkflowSteps, Mode: af.WorkflowMode, } - status := app.Status.Workflow - instance.Status = workflowv1alpha1.WorkflowRunStatus{ - Mode: *af.WorkflowMode, - Phase: status.Phase, - Message: status.Message, - Suspend: status.Suspend, - SuspendState: status.SuspendState, - Terminated: status.Terminated, - Finished: status.Finished, - ContextBackend: status.ContextBackend, - Steps: status.Steps, - StartTime: status.StartTime, - EndTime: status.EndTime, - } + instance.Status = copyWorkflowStatusToInstance(app, af.WorkflowMode) switch app.Status.Phase { case common.ApplicationRunning: instance.Status.Phase = workflowv1alpha1.WorkflowStateSucceeded diff --git a/pkg/controller/core.oam.dev/v1beta1/application/generator_test.go b/pkg/controller/core.oam.dev/v1beta1/application/generator_test.go index 7314c6dc0..1f5002a11 100644 --- a/pkg/controller/core.oam.dev/v1beta1/application/generator_test.go +++ b/pkg/controller/core.oam.dev/v1beta1/application/generator_test.go @@ -118,7 +118,6 @@ var _ = Describe("Test Application workflow generator", func() { logCtx := monitorContext.NewTraceContext(ctx, "") handler.currentAppRev = &oamcore.ApplicationRevision{} - handler.CheckWorkflowRestart(logCtx, app) _, taskRunner, err := handler.GenerateApplicationSteps(logCtx, app, appParser, af) Expect(err).To(BeNil()) @@ -162,7 +161,6 @@ var _ = Describe("Test Application workflow generator", func() { logCtx := monitorContext.NewTraceContext(ctx, "") handler.currentAppRev = &oamcore.ApplicationRevision{} - handler.CheckWorkflowRestart(logCtx, app) _, taskRunner, err := handler.GenerateApplicationSteps(logCtx, app, appParser, af) Expect(err).To(BeNil()) Expect(len(taskRunner)).Should(BeEquivalentTo(2)) @@ -204,7 +202,6 @@ var _ = Describe("Test Application workflow generator", func() { logCtx := monitorContext.NewTraceContext(ctx, "") handler.currentAppRev = &oamcore.ApplicationRevision{} - handler.CheckWorkflowRestart(logCtx, app) _, taskRunner, err := handler.GenerateApplicationSteps(logCtx, app, appParser, af) Expect(err).To(BeNil()) Expect(len(taskRunner)).Should(BeEquivalentTo(2)) @@ -246,7 +243,6 @@ var _ = Describe("Test Application workflow generator", func() { logCtx := monitorContext.NewTraceContext(ctx, "") handler.currentAppRev = &oamcore.ApplicationRevision{} - handler.CheckWorkflowRestart(logCtx, app) _, _, err = handler.GenerateApplicationSteps(logCtx, app, appParser, af) Expect(err).NotTo(BeNil()) }) @@ -285,7 +281,6 @@ var _ = Describe("Test Application workflow generator", func() { logCtx := monitorContext.NewTraceContext(ctx, "") handler.currentAppRev = &oamcore.ApplicationRevision{} - handler.CheckWorkflowRestart(logCtx, app) _, _, err = handler.GenerateApplicationSteps(logCtx, app, appParser, af) Expect(err).NotTo(BeNil()) }) @@ -316,4 +311,410 @@ var _ = Describe("Test Application workflow generator", func() { Expect(ctxData.AppLabels).To(BeNil()) Expect(ctxData.AppAnnotations).To(BeNil()) }) + + // NOTE: Workflow restart tests have been migrated to workflow_restart_test.go + // They test the new annotation-based restart functionality: + // - handleWorkflowRestartAnnotation() - parses annotation and sets status field + // - checkWorkflowRestart() - triggers restart based on status field + + /* + // Original tests commented out below for reference - DO NOT UNCOMMENT + // See workflow_restart_test.go for the new tests + /* + It("Test workflow restart via annotation with immediate restart", func() { + // Use a past timestamp for immediate restart + pastTime := time.Now().Add(-1 * time.Hour) + pastTimeStr := pastTime.Format(time.RFC3339) + + app := &oamcore.Application{ + TypeMeta: metav1.TypeMeta{Kind: "Application", APIVersion: "core.oam.dev/v1beta1"}, + ObjectMeta: metav1.ObjectMeta{ + Name: "app-with-restart-annotation", + Namespace: namespaceName, + Annotations: map[string]string{ + oam.AnnotationWorkflowRestart: pastTimeStr, // Past timestamp = immediate + }, + }, + Spec: oamcore.ApplicationSpec{ + Components: []common.ApplicationComponent{ + { + Name: "myweb", + Type: "worker-with-health", + Properties: &runtime.RawExtension{Raw: []byte(`{"cmd":["sleep","1000"],"image":"busybox"}`)}, + }, + }, + }, + Status: common.AppStatus{ + Workflow: &common.WorkflowStatus{ + AppRevision: "app-with-restart-annotation-v1", + Finished: true, + }, + Services: []common.ApplicationComponentStatus{ + {Name: "myweb", Healthy: true}, + }, + }, + } + + handler, err := NewAppHandler(ctx, reconciler, app) + Expect(err).Should(Succeed()) + + appRev := &oamcore.ApplicationRevision{ + ObjectMeta: metav1.ObjectMeta{ + Name: "app-with-restart-annotation-v2", + Namespace: namespaceName, + }, + } + handler.currentAppRev = appRev + handler.latestAppRev = appRev + + logCtx := monitorContext.NewTraceContext(ctx, "") + + // Before annotation handling + Expect(app.Annotations).To(HaveKey(oam.AnnotationWorkflowRestart)) + Expect(app.Status.WorkflowRestartScheduledAt).To(BeNil()) + Expect(app.Status.Workflow).NotTo(BeNil()) + Expect(app.Status.Workflow.Finished).To(BeTrue()) + Expect(app.Status.Services).To(HaveLen(1)) + + // Simulate controller processing annotation - sets status field (annotation removed by controller) + app.Status.WorkflowRestartScheduledAt = &metav1.Time{Time: pastTime} + delete(app.Annotations, oam.AnnotationWorkflowRestart) + + // Status field set, annotation removed (done by controller) + Expect(app.Annotations).NotTo(HaveKey(oam.AnnotationWorkflowRestart)) + Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil()) + Expect(app.Status.WorkflowRestartScheduledAt.Time).To(BeTemporally("~", pastTime, 1*time.Second)) + + // Check workflow restart - should trigger restart because time has passed + handler.CheckWorkflowRestart(logCtx, app) + + // After restart - status field cleared, workflow restarted + Expect(app.Status.WorkflowRestartScheduledAt).To(BeNil()) // Status field cleared + Expect(app.Status.Workflow).NotTo(BeNil()) + Expect(app.Status.Workflow.AppRevision).To(Equal("app-with-restart-annotation-v2")) + Expect(app.Status.Workflow.Finished).To(BeFalse()) // Workflow reset + Expect(app.Status.Services).To(BeNil()) // Services cleared + }) + + It("Test workflow restart via annotation with past timestamp", func() { + pastTime := time.Now().Add(-1 * time.Hour) + pastTimeStr := pastTime.Format(time.RFC3339) + app := &oamcore.Application{ + TypeMeta: metav1.TypeMeta{Kind: "Application", APIVersion: "core.oam.dev/v1beta1"}, + ObjectMeta: metav1.ObjectMeta{ + Name: "app-with-past-timestamp", + Namespace: namespaceName, + Annotations: map[string]string{ + oam.AnnotationWorkflowRestart: pastTimeStr, + }, + }, + Spec: oamcore.ApplicationSpec{ + Components: []common.ApplicationComponent{ + { + Name: "myweb", + Type: "worker-with-health", + Properties: &runtime.RawExtension{Raw: []byte(`{"cmd":["sleep","1000"],"image":"busybox"}`)}, + }, + }, + }, + Status: common.AppStatus{ + Workflow: &common.WorkflowStatus{ + AppRevision: "app-v1", + Finished: true, + }, + }, + } + + handler, err := NewAppHandler(ctx, reconciler, app) + Expect(err).Should(Succeed()) + + appRev := &oamcore.ApplicationRevision{ + ObjectMeta: metav1.ObjectMeta{Name: "app-v2", Namespace: namespaceName}, + } + handler.currentAppRev = appRev + handler.latestAppRev = appRev + + logCtx := monitorContext.NewTraceContext(ctx, "") + + // Simulate controller processing annotation + app.Status.WorkflowRestartScheduledAt = &metav1.Time{Time: pastTime} + delete(app.Annotations, oam.AnnotationWorkflowRestart) + + Expect(app.Annotations).NotTo(HaveKey(oam.AnnotationWorkflowRestart)) // Annotation removed + Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil()) // Status field set + Expect(app.Status.WorkflowRestartScheduledAt.Time).To(BeTemporally("~", pastTime, 1*time.Second)) + + // Trigger restart - should restart because time has passed + handler.CheckWorkflowRestart(logCtx, app) + + // Status field cleared, workflow restarted + Expect(app.Status.WorkflowRestartScheduledAt).To(BeNil()) // Cleared after restart + Expect(app.Status.Workflow.AppRevision).To(Equal("app-v2")) + Expect(app.Status.Workflow.Finished).To(BeFalse()) + }) + + It("Test workflow restart via annotation with future timestamp", func() { + futureTime := time.Now().Add(1 * time.Hour) + futureTimeStr := futureTime.Format(time.RFC3339) + app := &oamcore.Application{ + TypeMeta: metav1.TypeMeta{Kind: "Application", APIVersion: "core.oam.dev/v1beta1"}, + ObjectMeta: metav1.ObjectMeta{ + Name: "app-with-future-timestamp", + Namespace: namespaceName, + Annotations: map[string]string{ + oam.AnnotationWorkflowRestart: futureTimeStr, + }, + }, + Spec: oamcore.ApplicationSpec{ + Components: []common.ApplicationComponent{ + { + Name: "myweb", + Type: "worker-with-health", + Properties: &runtime.RawExtension{Raw: []byte(`{"cmd":["sleep","1000"],"image":"busybox"}`)}, + }, + }, + }, + Status: common.AppStatus{ + Workflow: &common.WorkflowStatus{ + AppRevision: "app-v1", + Finished: true, + }, + Services: []common.ApplicationComponentStatus{ + {Name: "myweb", Healthy: true}, + }, + }, + } + + handler, err := NewAppHandler(ctx, reconciler, app) + Expect(err).Should(Succeed()) + + appRev := &oamcore.ApplicationRevision{ + ObjectMeta: metav1.ObjectMeta{Name: "app-v2", Namespace: namespaceName}, + } + handler.currentAppRev = appRev + handler.latestAppRev = appRev + + logCtx := monitorContext.NewTraceContext(ctx, "") + + // Simulate controller processing annotation + app.Status.WorkflowRestartScheduledAt = &metav1.Time{Time: futureTime} + delete(app.Annotations, oam.AnnotationWorkflowRestart) + + Expect(app.Annotations).NotTo(HaveKey(oam.AnnotationWorkflowRestart)) // Annotation removed + Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil()) // Status field set + Expect(app.Status.WorkflowRestartScheduledAt.Time).To(BeTemporally("~", futureTime, 1*time.Second)) + + // Trigger check - should NOT restart because time hasn't arrived + handler.CheckWorkflowRestart(logCtx, app) + + // Workflow NOT restarted - status field still present + Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil()) // Status field remains (time not arrived) + Expect(app.Status.Workflow.AppRevision).To(Equal("app-v1")) // Still old revision + Expect(app.Status.Workflow.Finished).To(BeTrue()) // Still finished + Expect(app.Status.Services).To(HaveLen(1)) // Services not cleared + }) + + It("Test workflow restart via annotation with duration", func() { + // Workflow finished 2 minutes ago + workflowEndTime := time.Now().Add(-2 * time.Minute) + + app := &oamcore.Application{ + TypeMeta: metav1.TypeMeta{Kind: "Application", APIVersion: "core.oam.dev/v1beta1"}, + ObjectMeta: metav1.ObjectMeta{ + Name: "app-with-duration", + Namespace: namespaceName, + Annotations: map[string]string{ + oam.AnnotationWorkflowRestart: "5m", // Restart 5 minutes after last completion + }, + }, + Spec: oamcore.ApplicationSpec{ + Components: []common.ApplicationComponent{ + { + Name: "myweb", + Type: "worker-with-health", + Properties: &runtime.RawExtension{Raw: []byte(`{"cmd":["sleep","1000"],"image":"busybox"}`)}, + }, + }, + }, + Status: common.AppStatus{ + Workflow: &common.WorkflowStatus{ + AppRevision: "app-v1", + Finished: true, + EndTime: metav1.Time{Time: workflowEndTime}, + }, + Services: []common.ApplicationComponentStatus{ + {Name: "myweb", Healthy: true}, + }, + }, + } + + handler, err := NewAppHandler(ctx, reconciler, app) + Expect(err).Should(Succeed()) + + appRev := &oamcore.ApplicationRevision{ + ObjectMeta: metav1.ObjectMeta{Name: "app-v2", Namespace: namespaceName}, + } + handler.currentAppRev = appRev + handler.latestAppRev = appRev + + logCtx := monitorContext.NewTraceContext(ctx, "") + + // Before annotation handling + Expect(app.Annotations).To(HaveKey(oam.AnnotationWorkflowRestart)) + Expect(app.Status.WorkflowRestartScheduledAt).To(BeNil()) + + // Simulate controller processing duration annotation + expectedTime := workflowEndTime.Add(5 * time.Minute) // Last end + 5m = 3m from now + app.Status.WorkflowRestartScheduledAt = &metav1.Time{Time: expectedTime} + // For durations, annotation persists (not removed by controller) + + // For durations, annotation PERSISTS (recurring behavior), status field set + Expect(app.Annotations).To(HaveKey(oam.AnnotationWorkflowRestart)) // Annotation KEPT for recurring + Expect(app.Annotations[oam.AnnotationWorkflowRestart]).To(Equal("5m")) + Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil()) + Expect(app.Status.WorkflowRestartScheduledAt.Time).To(BeTemporally("~", expectedTime, 1*time.Second)) + + // Check workflow restart - should NOT restart yet (time not arrived) + handler.CheckWorkflowRestart(logCtx, app) + + // Status field still present, workflow NOT restarted + Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil()) + Expect(app.Status.Workflow.AppRevision).To(Equal("app-v1")) // Still old revision + Expect(app.Status.Workflow.Finished).To(BeTrue()) // Still finished + Expect(app.Status.Services).To(HaveLen(1)) // Services not cleared + }) + + It("Test workflow restart with duration recurs after completion", func() { + // Initial workflow finished 10 minutes ago + firstEndTime := time.Now().Add(-10 * time.Minute) + + app := &oamcore.Application{ + TypeMeta: metav1.TypeMeta{Kind: "Application", APIVersion: "core.oam.dev/v1beta1"}, + ObjectMeta: metav1.ObjectMeta{ + Name: "app-with-recurring-duration", + Namespace: namespaceName, + Annotations: map[string]string{ + oam.AnnotationWorkflowRestart: "5m", // Recurring every 5m + }, + }, + Spec: oamcore.ApplicationSpec{ + Components: []common.ApplicationComponent{ + { + Name: "myweb", + Type: "worker-with-health", + Properties: &runtime.RawExtension{Raw: []byte(`{"cmd":["sleep","1000"],"image":"busybox"}`)}, + }, + }, + }, + Status: common.AppStatus{ + Workflow: &common.WorkflowStatus{ + AppRevision: "app-v1", + Finished: true, + EndTime: metav1.Time{Time: firstEndTime}, + }, + }, + } + + handler, err := NewAppHandler(ctx, reconciler, app) + Expect(err).Should(Succeed()) + + appRev := &oamcore.ApplicationRevision{ + ObjectMeta: metav1.ObjectMeta{Name: "app-v2", Namespace: namespaceName}, + } + handler.currentAppRev = appRev + handler.latestAppRev = appRev + + logCtx := monitorContext.NewTraceContext(ctx, "") + + // Simulate controller processing first scheduling: firstEndTime + 5m (5 minutes ago, ready to trigger) + firstScheduledTime := firstEndTime.Add(5 * time.Minute) + app.Status.WorkflowRestartScheduledAt = &metav1.Time{Time: firstScheduledTime} + // Duration annotation persists + Expect(app.Annotations).To(HaveKey(oam.AnnotationWorkflowRestart)) // Annotation persists + Expect(app.Status.WorkflowRestartScheduledAt.Time).To(BeTemporally("~", firstScheduledTime, 1*time.Second)) + + // Trigger restart (time has passed) + handler.CheckWorkflowRestart(logCtx, app) + Expect(app.Status.WorkflowRestartScheduledAt).To(BeNil()) // Cleared after restart + Expect(app.Status.Workflow.AppRevision).To(Equal("app-v2")) + + // Simulate workflow completing again (new EndTime) + secondEndTime := time.Now().Add(-2 * time.Minute) + app.Status.Workflow.Finished = true + app.Status.Workflow.EndTime = metav1.Time{Time: secondEndTime} + + // Simulate controller processing second scheduling: should recalculate based on NEW EndTime + secondScheduledTime := secondEndTime.Add(5 * time.Minute) // 2 min ago + 5m = 3m from now + app.Status.WorkflowRestartScheduledAt = &metav1.Time{Time: secondScheduledTime} + Expect(app.Annotations).To(HaveKey(oam.AnnotationWorkflowRestart)) // Still persists + Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil()) // Rescheduled + Expect(app.Status.WorkflowRestartScheduledAt.Time).To(BeTemporally("~", secondScheduledTime, 1*time.Second)) + + // This time it shouldn't trigger yet (time not arrived) + handler.CheckWorkflowRestart(logCtx, app) + Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil()) // Still scheduled + Expect(app.Status.Workflow.AppRevision).To(Equal("app-v2")) // No change + }) + + It("Test workflow restart ignored when workflow is not finished", func() { + pastTime := time.Now().Add(-1 * time.Hour) + pastTimeStr := pastTime.Format(time.RFC3339) + + app := &oamcore.Application{ + TypeMeta: metav1.TypeMeta{Kind: "Application", APIVersion: "core.oam.dev/v1beta1"}, + ObjectMeta: metav1.ObjectMeta{ + Name: "app-with-running-workflow", + Namespace: namespaceName, + Annotations: map[string]string{ + oam.AnnotationWorkflowRestart: pastTimeStr, + }, + }, + Spec: oamcore.ApplicationSpec{ + Components: []common.ApplicationComponent{ + { + Name: "myweb", + Type: "worker-with-health", + Properties: &runtime.RawExtension{Raw: []byte(`{"cmd":["sleep","1000"],"image":"busybox"}`)}, + }, + }, + }, + Status: common.AppStatus{ + Workflow: &common.WorkflowStatus{ + AppRevision: "app-v1", + Finished: false, // Workflow is still running + }, + Services: []common.ApplicationComponentStatus{ + {Name: "myweb", Healthy: true}, + }, + }, + } + + handler, err := NewAppHandler(ctx, reconciler, app) + Expect(err).Should(Succeed()) + + appRev := &oamcore.ApplicationRevision{ + ObjectMeta: metav1.ObjectMeta{Name: "app-v2", Namespace: namespaceName}, + } + handler.currentAppRev = appRev + handler.latestAppRev = appRev + + logCtx := monitorContext.NewTraceContext(ctx, "") + + // Simulate controller processing annotation + app.Status.WorkflowRestartScheduledAt = &metav1.Time{Time: pastTime} + delete(app.Annotations, oam.AnnotationWorkflowRestart) + + Expect(app.Annotations).NotTo(HaveKey(oam.AnnotationWorkflowRestart)) // Annotation removed + Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil()) // Status field set + + // Check workflow restart - should be IGNORED because workflow not finished + handler.CheckWorkflowRestart(logCtx, app) + + // Restart ignored - status field cleared but workflow NOT restarted + Expect(app.Status.WorkflowRestartScheduledAt).To(BeNil()) // Status field cleared (consumed) + Expect(app.Status.Workflow.AppRevision).To(Equal("app-v1")) // Still old revision + Expect(app.Status.Workflow.Finished).To(BeFalse()) // Still not finished + Expect(app.Status.Services).To(HaveLen(1)) // Services NOT cleared + }) + */ }) diff --git a/pkg/controller/core.oam.dev/v1beta1/application/workflow.go b/pkg/controller/core.oam.dev/v1beta1/application/workflow.go new file mode 100644 index 000000000..9489edd02 --- /dev/null +++ b/pkg/controller/core.oam.dev/v1beta1/application/workflow.go @@ -0,0 +1,206 @@ +/* +Copyright 2021 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package application + +import ( + "context" + "strings" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" + + monitorContext "github.com/kubevela/pkg/monitor/context" + + "github.com/oam-dev/kubevela/apis/core.oam.dev/common" + "github.com/oam-dev/kubevela/apis/core.oam.dev/condition" + "github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1" + "github.com/oam-dev/kubevela/pkg/oam" +) + +// handleWorkflowRestartAnnotation processes the app.oam.dev/restart-workflow annotation +// and converts it to status.workflowRestartScheduledAt for GitOps safety. +// For timestamps, it deletes the annotation after copying to status (persisted via Client.Update). +// For durations, it keeps the annotation and reschedules after each execution based on time comparison. +func (r *Reconciler) handleWorkflowRestartAnnotation(ctx context.Context, app *v1beta1.Application) { + if !metav1.HasAnnotation(app.ObjectMeta, oam.AnnotationWorkflowRestart) { + return + } + + restartValue := app.Annotations[oam.AnnotationWorkflowRestart] + + var scheduledTime time.Time + var isDuration bool + var statusFieldNeedsUpdate bool + + if restartValue == "true" { + // "true" is a convenience value supplied for an immediate restart + scheduledTime = time.Now() + isDuration = false + statusFieldNeedsUpdate = true + } else if parsedTime, err := time.Parse(time.RFC3339, restartValue); err == nil { + // explicit timestamp - restart on first reconcile > time + scheduledTime = parsedTime + isDuration = false + statusFieldNeedsUpdate = true + } else if duration, err := time.ParseDuration(restartValue); err == nil { + // recurring duration - calculate relative to last successful workflow completion + baseTime := time.Now() + if app.Status.Workflow != nil && app.Status.Workflow.Finished && !app.Status.Workflow.EndTime.IsZero() { + baseTime = app.Status.Workflow.EndTime.Time + } + scheduledTime = baseTime.Add(duration) + isDuration = true + + // Only update if status is nil OR the calculated value differs from current status + statusFieldNeedsUpdate = app.Status.WorkflowRestartScheduledAt == nil || + !app.Status.WorkflowRestartScheduledAt.Time.Equal(scheduledTime) + } else { + klog.Warningf("Invalid workflow restart annotation value for Application %s/%s: %q. Expected 'true', RFC3339 timestamp, or duration (e.g., '5m', '1h')", + app.Namespace, app.Name, restartValue) + return + } + + if statusFieldNeedsUpdate { + app.Status.WorkflowRestartScheduledAt = &metav1.Time{Time: scheduledTime} + if err := r.Status().Update(ctx, app); err != nil { + klog.Errorf("Failed to update workflow restart status for Application %s/%s: %v. Will retry on next reconcile.", + app.Namespace, app.Name, err) + // Don't fail reconciliation - will retry naturally on next reconcile + } + } + + // For timestamps, delete the annotation (one-time behavior) + // For durations, keep the annotation (recurring behavior) + if !isDuration { + delete(app.Annotations, oam.AnnotationWorkflowRestart) + if err := r.Client.Update(ctx, app); err != nil { + klog.Errorf("Failed to remove workflow restart annotation for Application %s/%s: %v. Will retry on next reconcile.", + app.Namespace, app.Name, err) + // Don't fail reconciliation - will retry naturally on next reconcile + } + } +} + +// checkWorkflowRestart checks if application workflow needs restart. +// Handles three restart scenarios: +// 1. Scheduled restart (via workflowRestartScheduledAt status field) +// 2. PublishVersion annotation change +// 3. Application revision change +func (r *Reconciler) checkWorkflowRestart(ctx monitorContext.Context, app *v1beta1.Application, handler *AppHandler) { + // Check for scheduled restart in status field + if app.Status.WorkflowRestartScheduledAt != nil { + restartTime := app.Status.WorkflowRestartScheduledAt.Time + + if time.Now().Before(restartTime) { + // Not yet time to restart, skip for now + return + } + if app.Status.Workflow == nil || !app.Status.Workflow.Finished { + // Workflow is still running or hasn't started - don't restart yet + return + } + if app.Status.Workflow != nil && !app.Status.Workflow.EndTime.IsZero() { + lastEndTime := app.Status.Workflow.EndTime.Time + if !restartTime.After(lastEndTime) { + // Restart time is not after last execution, skip + return + } + } + + // All conditions met: time arrived, workflow finished, and restart time > last execution + // Clear the status field and proceed with restart + app.Status.WorkflowRestartScheduledAt = nil + if err := r.Status().Update(ctx, app); err != nil { + ctx.Error(err, "failed to clear workflow restart scheduled time") + return + } + if app.Status.Workflow != nil { + if handler.latestAppRev != nil && handler.latestAppRev.Status.Workflow == nil { + app.Status.Workflow.Terminated = true + app.Status.Workflow.Finished = true + if app.Status.Workflow.EndTime.IsZero() { + app.Status.Workflow.EndTime = metav1.Now() + } + handler.UpdateApplicationRevisionStatus(ctx, handler.latestAppRev, app.Status.Workflow) + } + } + + app.Status.Services = nil + app.Status.AppliedResources = nil + var reservedConditions []condition.Condition + for i, cond := range app.Status.Conditions { + condTpy, err := common.ParseApplicationConditionType(string(cond.Type)) + if err == nil { + if condTpy <= common.RenderCondition { + reservedConditions = append(reservedConditions, app.Status.Conditions[i]) + } + } + } + app.Status.Conditions = reservedConditions + app.Status.Workflow = &common.WorkflowStatus{ + AppRevision: handler.currentAppRev.Name, + } + return + } + + // Check for revision-based restart (publishVersion or normal revision change) + desiredRev, currentRev := handler.currentAppRev.Name, "" + if app.Status.Workflow != nil { + currentRev = app.Status.Workflow.AppRevision + } + if metav1.HasAnnotation(app.ObjectMeta, oam.AnnotationPublishVersion) { + desiredRev = app.GetAnnotations()[oam.AnnotationPublishVersion] + } else { // nolint + // backward compatibility + // legacy versions use : as currentRev, extract + if idx := strings.LastIndexAny(currentRev, ":"); idx >= 0 { + currentRev = currentRev[:idx] + } + } + if currentRev != "" && desiredRev == currentRev { + return + } + + // Restart needed - record in revision and clean up + if app.Status.Workflow != nil { + if handler.latestAppRev != nil && handler.latestAppRev.Status.Workflow == nil { + app.Status.Workflow.Terminated = true + app.Status.Workflow.Finished = true + if app.Status.Workflow.EndTime.IsZero() { + app.Status.Workflow.EndTime = metav1.Now() + } + handler.UpdateApplicationRevisionStatus(ctx, handler.latestAppRev, app.Status.Workflow) + } + } + + app.Status.Services = nil + app.Status.AppliedResources = nil + var reservedConditions []condition.Condition + for i, cond := range app.Status.Conditions { + condTpy, err := common.ParseApplicationConditionType(string(cond.Type)) + if err == nil { + if condTpy <= common.RenderCondition { + reservedConditions = append(reservedConditions, app.Status.Conditions[i]) + } + } + } + app.Status.Conditions = reservedConditions + app.Status.Workflow = &common.WorkflowStatus{ + AppRevision: desiredRev, + } +} diff --git a/pkg/controller/core.oam.dev/v1beta1/application/workflow_test.go b/pkg/controller/core.oam.dev/v1beta1/application/workflow_test.go index ebe8f40a2..f8f7ac119 100644 --- a/pkg/controller/core.oam.dev/v1beta1/application/workflow_test.go +++ b/pkg/controller/core.oam.dev/v1beta1/application/workflow_test.go @@ -33,9 +33,11 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/yaml" + monitorContext "github.com/kubevela/pkg/monitor/context" workflowv1alpha1 "github.com/kubevela/workflow/api/v1alpha1" wfTypes "github.com/kubevela/workflow/pkg/types" @@ -836,3 +838,468 @@ spec: } ` ) + +var _ = Describe("Test workflow restart annotation functionality", func() { + var ctx context.Context + var namespace string + var reconciler *Reconciler + var scheme *runtime.Scheme + + BeforeEach(func() { + ctx = context.Background() + namespace = "test-workflow-restart" + + scheme = runtime.NewScheme() + Expect(corev1.AddToScheme(scheme)).Should(Succeed()) + Expect(oamcore.AddToScheme(scheme)).Should(Succeed()) + + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithStatusSubresource(&oamcore.Application{}). + Build() + + reconciler = &Reconciler{ + Client: fakeClient, + Scheme: scheme, + } + }) + + It("Test workflow restart when scheduled time is past but newer than last execution", func() { + pastTime := time.Now().Add(-1 * time.Hour) + pastTimeStr := pastTime.Format(time.RFC3339) + + app := &oamcore.Application{ + TypeMeta: metav1.TypeMeta{Kind: "Application", APIVersion: "core.oam.dev/v1beta1"}, + ObjectMeta: metav1.ObjectMeta{ + Name: "app-with-restart-annotation", + Namespace: namespace, + Annotations: map[string]string{ + "app.oam.dev/restart-workflow": pastTimeStr, + }, + }, + Spec: oamcore.ApplicationSpec{ + Components: []common.ApplicationComponent{ + {Name: "myweb", Type: "worker"}, + }, + }, + Status: common.AppStatus{ + Workflow: &common.WorkflowStatus{ + AppRevision: "app-v1", + Finished: true, + // EndTime is 2 hours ago - BEFORE the restart time (1 hour ago) + EndTime: metav1.Time{Time: time.Now().Add(-2 * time.Hour)}, + }, + }, + } + + Expect(reconciler.Client.Create(ctx, app)).Should(Succeed()) + + reconciler.handleWorkflowRestartAnnotation(ctx, app) + + Expect(app.Annotations).NotTo(HaveKey("app.oam.dev/restart-workflow")) + Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil()) + Expect(app.Status.WorkflowRestartScheduledAt.Time).To(BeTemporally("~", pastTime, 1*time.Second)) + + handler := &AppHandler{ + currentAppRev: &oamcore.ApplicationRevision{ + ObjectMeta: metav1.ObjectMeta{Name: "app-v1"}, + }, + latestAppRev: &oamcore.ApplicationRevision{ + ObjectMeta: metav1.ObjectMeta{Name: "app-v1"}, + Status: oamcore.ApplicationRevisionStatus{ + // Set non-nil to avoid UpdateApplicationRevisionStatus call in tests + Workflow: &common.WorkflowStatus{}, + }, + }, + } + logCtx := monitorContext.NewTraceContext(ctx, "") + reconciler.checkWorkflowRestart(logCtx, app, handler) + + Expect(app.Status.WorkflowRestartScheduledAt).To(BeNil()) + Expect(app.Status.Workflow.AppRevision).To(Equal("app-v1")) + Expect(app.Status.Workflow.Finished).To(BeFalse()) + }) + + It("Test workflow restart with 'true' annotation triggers immediate restart", func() { + app := &oamcore.Application{ + TypeMeta: metav1.TypeMeta{Kind: "Application", APIVersion: "core.oam.dev/v1beta1"}, + ObjectMeta: metav1.ObjectMeta{ + Name: "app-with-true-restart", + Namespace: namespace, + Annotations: map[string]string{ + "app.oam.dev/restart-workflow": "true", + }, + }, + Spec: oamcore.ApplicationSpec{ + Components: []common.ApplicationComponent{ + {Name: "myweb", Type: "worker"}, + }, + }, + Status: common.AppStatus{ + Workflow: &common.WorkflowStatus{ + AppRevision: "app-v1", + Finished: true, + EndTime: metav1.Time{Time: time.Now().Add(-10 * time.Minute)}, + }, + }, + } + + Expect(reconciler.Client.Create(ctx, app)).Should(Succeed()) + + reconciler.handleWorkflowRestartAnnotation(ctx, app) + + Expect(app.Annotations).NotTo(HaveKey("app.oam.dev/restart-workflow")) + Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil()) + Expect(app.Status.WorkflowRestartScheduledAt.Time).To(BeTemporally("~", time.Now(), 2*time.Second)) + + handler := &AppHandler{ + currentAppRev: &oamcore.ApplicationRevision{ + ObjectMeta: metav1.ObjectMeta{Name: "app-v1"}, + }, + latestAppRev: &oamcore.ApplicationRevision{ + ObjectMeta: metav1.ObjectMeta{Name: "app-v1"}, + Status: oamcore.ApplicationRevisionStatus{ + // Set non-nil to avoid UpdateApplicationRevisionStatus call in tests + Workflow: &common.WorkflowStatus{}, + }, + }, + } + logCtx := monitorContext.NewTraceContext(ctx, "") + reconciler.checkWorkflowRestart(logCtx, app, handler) + + Expect(app.Status.WorkflowRestartScheduledAt).To(BeNil()) + Expect(app.Status.Workflow.AppRevision).To(Equal("app-v1")) + }) + + It("Test workflow restart with future timestamp (should NOT restart)", func() { + futureTime := time.Now().Add(1 * time.Hour) + futureTimeStr := futureTime.Format(time.RFC3339) + + app := &oamcore.Application{ + TypeMeta: metav1.TypeMeta{Kind: "Application", APIVersion: "core.oam.dev/v1beta1"}, + ObjectMeta: metav1.ObjectMeta{ + Name: "app-with-future-timestamp", + Namespace: namespace, + Annotations: map[string]string{ + "app.oam.dev/restart-workflow": futureTimeStr, + }, + }, + Spec: oamcore.ApplicationSpec{ + Components: []common.ApplicationComponent{ + {Name: "myweb", Type: "worker"}, + }, + }, + Status: common.AppStatus{ + Workflow: &common.WorkflowStatus{ + AppRevision: "app-v1", + Finished: true, + EndTime: metav1.Time{Time: time.Now().Add(-10 * time.Minute)}, + }, + Services: []common.ApplicationComponentStatus{ + {Name: "myweb", Healthy: true}, + }, + }, + } + + Expect(reconciler.Client.Create(ctx, app)).Should(Succeed()) + + reconciler.handleWorkflowRestartAnnotation(ctx, app) + + Expect(app.Annotations).NotTo(HaveKey("app.oam.dev/restart-workflow")) + Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil()) + + handler := &AppHandler{ + currentAppRev: &oamcore.ApplicationRevision{ + ObjectMeta: metav1.ObjectMeta{Name: "app-v1"}, + }, + } + logCtx := monitorContext.NewTraceContext(ctx, "") + reconciler.checkWorkflowRestart(logCtx, app, handler) + + Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil()) + Expect(app.Status.Workflow.AppRevision).To(Equal("app-v1")) + Expect(app.Status.Workflow.Finished).To(BeTrue()) + Expect(app.Status.Services).To(HaveLen(1)) + }) + + It("Test workflow restart with duration (not yet time)", func() { + // Workflow finished 2 minutes ago + workflowEndTime := time.Now().Add(-2 * time.Minute) + + app := &oamcore.Application{ + TypeMeta: metav1.TypeMeta{Kind: "Application", APIVersion: "core.oam.dev/v1beta1"}, + ObjectMeta: metav1.ObjectMeta{ + Name: "app-with-duration", + Namespace: namespace, + Annotations: map[string]string{ + "app.oam.dev/restart-workflow": "5m", // Restart 5 minutes after completion + }, + }, + Spec: oamcore.ApplicationSpec{ + Components: []common.ApplicationComponent{ + {Name: "myweb", Type: "worker"}, + }, + }, + Status: common.AppStatus{ + Workflow: &common.WorkflowStatus{ + AppRevision: "app-v1", + Finished: true, + EndTime: metav1.Time{Time: workflowEndTime}, + }, + }, + } + + Expect(reconciler.Client.Create(ctx, app)).Should(Succeed()) + + reconciler.handleWorkflowRestartAnnotation(ctx, app) + + Expect(app.Annotations).To(HaveKey("app.oam.dev/restart-workflow")) + Expect(app.Annotations["app.oam.dev/restart-workflow"]).To(Equal("5m")) + Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil()) + expectedTime := workflowEndTime.Add(5 * time.Minute) + Expect(app.Status.WorkflowRestartScheduledAt.Time).To(BeTemporally("~", expectedTime, 1*time.Second)) + + handler := &AppHandler{ + currentAppRev: &oamcore.ApplicationRevision{ + ObjectMeta: metav1.ObjectMeta{Name: "app-v1"}, + }, + } + logCtx := monitorContext.NewTraceContext(ctx, "") + reconciler.checkWorkflowRestart(logCtx, app, handler) + + Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil()) + Expect(app.Status.Workflow.AppRevision).To(Equal("app-v1")) + Expect(app.Status.Workflow.Finished).To(BeTrue()) + }) + + It("Test workflow restart with duration (recurring)", func() { + // Initial workflow finished 10 minutes ago + firstEndTime := time.Now().Add(-10 * time.Minute) + + app := &oamcore.Application{ + TypeMeta: metav1.TypeMeta{Kind: "Application", APIVersion: "core.oam.dev/v1beta1"}, + ObjectMeta: metav1.ObjectMeta{ + Name: "app-with-recurring-duration", + Namespace: namespace, + Annotations: map[string]string{ + "app.oam.dev/restart-workflow": "5m", + }, + }, + Spec: oamcore.ApplicationSpec{ + Components: []common.ApplicationComponent{ + {Name: "myweb", Type: "worker"}, + }, + }, + Status: common.AppStatus{ + Workflow: &common.WorkflowStatus{ + AppRevision: "app-v1", + Finished: true, + EndTime: metav1.Time{Time: firstEndTime}, + }, + }, + } + + Expect(reconciler.Client.Create(ctx, app)).Should(Succeed()) + + // First scheduling: 10 min ago + 5m = 5 min ago (ready to trigger) + reconciler.handleWorkflowRestartAnnotation(ctx, app) + Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil()) + + handler := &AppHandler{ + currentAppRev: &oamcore.ApplicationRevision{ + ObjectMeta: metav1.ObjectMeta{Name: "app-v1"}, + }, + latestAppRev: &oamcore.ApplicationRevision{ + ObjectMeta: metav1.ObjectMeta{Name: "app-v1"}, + Status: oamcore.ApplicationRevisionStatus{ + // Set non-nil to avoid UpdateApplicationRevisionStatus call in tests + Workflow: &common.WorkflowStatus{}, + }, + }, + } + logCtx := monitorContext.NewTraceContext(ctx, "") + reconciler.checkWorkflowRestart(logCtx, app, handler) + + Expect(app.Status.WorkflowRestartScheduledAt).To(BeNil()) + Expect(app.Status.Workflow.AppRevision).To(Equal("app-v1")) + + // Simulate workflow completing again (2 minutes ago) + secondEndTime := time.Now().Add(-2 * time.Minute) + app.Status.Workflow.Finished = true + app.Status.Workflow.EndTime = metav1.Time{Time: secondEndTime} + + reconciler.handleWorkflowRestartAnnotation(ctx, app) + Expect(app.Annotations).To(HaveKey("app.oam.dev/restart-workflow")) + Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil()) + secondScheduledTime := secondEndTime.Add(5 * time.Minute) + Expect(app.Status.WorkflowRestartScheduledAt.Time).To(BeTemporally("~", secondScheduledTime, 1*time.Second)) + + reconciler.checkWorkflowRestart(logCtx, app, handler) + Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil()) + Expect(app.Status.Workflow.AppRevision).To(Equal("app-v1")) + }) + + It("Test workflow restart ignored when workflow not finished", func() { + pastTime := time.Now().Add(-1 * time.Hour) + pastTimeStr := pastTime.Format(time.RFC3339) + + app := &oamcore.Application{ + TypeMeta: metav1.TypeMeta{Kind: "Application", APIVersion: "core.oam.dev/v1beta1"}, + ObjectMeta: metav1.ObjectMeta{ + Name: "app-with-running-workflow", + Namespace: namespace, + Annotations: map[string]string{ + "app.oam.dev/restart-workflow": pastTimeStr, + }, + }, + Spec: oamcore.ApplicationSpec{ + Components: []common.ApplicationComponent{ + {Name: "myweb", Type: "worker"}, + }, + }, + Status: common.AppStatus{ + Workflow: &common.WorkflowStatus{ + AppRevision: "app-v1", + Finished: false, // Workflow still running + }, + Services: []common.ApplicationComponentStatus{ + {Name: "myweb", Healthy: true}, + }, + }, + } + + Expect(reconciler.Client.Create(ctx, app)).Should(Succeed()) + + reconciler.handleWorkflowRestartAnnotation(ctx, app) + Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil()) + + handler := &AppHandler{ + currentAppRev: &oamcore.ApplicationRevision{ + ObjectMeta: metav1.ObjectMeta{Name: "app-v1"}, + }, + } + logCtx := monitorContext.NewTraceContext(ctx, "") + reconciler.checkWorkflowRestart(logCtx, app, handler) + + Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil()) + Expect(app.Status.Workflow.AppRevision).To(Equal("app-v1")) + Expect(app.Status.Workflow.Finished).To(BeFalse()) + Expect(app.Status.Services).To(HaveLen(1)) + }) + + It("Test workflow restart prevents duplicate restarts (restartTime <= lastEndTime)", func() { + // Workflow finished 5 minutes ago + lastEndTime := time.Now().Add(-5 * time.Minute) + // Restart scheduled for 10 minutes ago (already passed, but < lastEndTime) + restartTime := time.Now().Add(-10 * time.Minute) + restartTimeStr := restartTime.Format(time.RFC3339) + + app := &oamcore.Application{ + TypeMeta: metav1.TypeMeta{Kind: "Application", APIVersion: "core.oam.dev/v1beta1"}, + ObjectMeta: metav1.ObjectMeta{ + Name: "app-with-old-restart-time", + Namespace: namespace, + Annotations: map[string]string{ + "app.oam.dev/restart-workflow": restartTimeStr, + }, + }, + Spec: oamcore.ApplicationSpec{ + Components: []common.ApplicationComponent{ + {Name: "myweb", Type: "worker"}, + }, + }, + Status: common.AppStatus{ + Workflow: &common.WorkflowStatus{ + AppRevision: "app-v1", + Finished: true, + EndTime: metav1.Time{Time: lastEndTime}, + }, + }, + } + + Expect(reconciler.Client.Create(ctx, app)).Should(Succeed()) + + reconciler.handleWorkflowRestartAnnotation(ctx, app) + Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil()) + + handler := &AppHandler{ + currentAppRev: &oamcore.ApplicationRevision{ + ObjectMeta: metav1.ObjectMeta{Name: "app-v1"}, + }, + } + logCtx := monitorContext.NewTraceContext(ctx, "") + reconciler.checkWorkflowRestart(logCtx, app, handler) + + Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil()) + Expect(app.Status.Workflow.AppRevision).To(Equal("app-v1")) + Expect(app.Status.Workflow.Finished).To(BeTrue()) + }) + + It("Test workflow restart with invalid timestamp format (should be ignored with warning)", func() { + app := &oamcore.Application{ + TypeMeta: metav1.TypeMeta{Kind: "Application", APIVersion: "core.oam.dev/v1beta1"}, + ObjectMeta: metav1.ObjectMeta{ + Name: "app-with-invalid-timestamp", + Namespace: namespace, + Annotations: map[string]string{ + "app.oam.dev/restart-workflow": "2025-13-45T99:99:99Z", // Invalid timestamp + }, + }, + Spec: oamcore.ApplicationSpec{ + Components: []common.ApplicationComponent{ + {Name: "myweb", Type: "worker"}, + }, + }, + Status: common.AppStatus{ + Workflow: &common.WorkflowStatus{ + AppRevision: "app-v1", + Finished: true, + }, + }, + } + + Expect(reconciler.Client.Create(ctx, app)).Should(Succeed()) + + reconciler.handleWorkflowRestartAnnotation(ctx, app) + + Expect(app.Annotations).To(HaveKey("app.oam.dev/restart-workflow")) + Expect(app.Annotations["app.oam.dev/restart-workflow"]).To(Equal("2025-13-45T99:99:99Z")) + Expect(app.Status.WorkflowRestartScheduledAt).To(BeNil()) + Expect(app.Status.Workflow.AppRevision).To(Equal("app-v1")) + Expect(app.Status.Workflow.Finished).To(BeTrue()) + }) + + It("Test workflow restart with invalid duration format (should be ignored with warning)", func() { + app := &oamcore.Application{ + TypeMeta: metav1.TypeMeta{Kind: "Application", APIVersion: "core.oam.dev/v1beta1"}, + ObjectMeta: metav1.ObjectMeta{ + Name: "app-with-invalid-duration", + Namespace: namespace, + Annotations: map[string]string{ + "app.oam.dev/restart-workflow": "5xyz", // Invalid duration + }, + }, + Spec: oamcore.ApplicationSpec{ + Components: []common.ApplicationComponent{ + {Name: "myweb", Type: "worker"}, + }, + }, + Status: common.AppStatus{ + Workflow: &common.WorkflowStatus{ + AppRevision: "app-v1", + Finished: true, + }, + }, + } + + Expect(reconciler.Client.Create(ctx, app)).Should(Succeed()) + + reconciler.handleWorkflowRestartAnnotation(ctx, app) + + Expect(app.Annotations).To(HaveKey("app.oam.dev/restart-workflow")) + Expect(app.Annotations["app.oam.dev/restart-workflow"]).To(Equal("5xyz")) + Expect(app.Status.WorkflowRestartScheduledAt).To(BeNil()) + Expect(app.Status.Workflow.AppRevision).To(Equal("app-v1")) + Expect(app.Status.Workflow.Finished).To(BeTrue()) + }) +}) diff --git a/pkg/oam/labels.go b/pkg/oam/labels.go index 318902d58..87111e1a7 100644 --- a/pkg/oam/labels.go +++ b/pkg/oam/labels.go @@ -162,6 +162,16 @@ const ( // AnnotationWorkflowName specifies the workflow name for execution. AnnotationWorkflowName = "app.oam.dev/workflowName" + // AnnotationWorkflowRestart triggers a workflow restart when set. Supported values: + // - "true": Immediate restart (sets restart time to current time + 1 second). + // Annotation is automatically removed after being processed. + // - RFC3339 timestamp (e.g., "2025-01-15T14:30:00Z"): One-time restart at specified time. + // Annotation is automatically removed after being processed. + // - Duration (e.g., "5m", "1h", "30s"): Recurring restart with minimum interval after each completion. + // Annotation persists; automatically reschedules after each workflow completion. + // All modes are GitOps-safe: the schedule is stored in status.workflowRestartScheduledAt. + AnnotationWorkflowRestart = "app.oam.dev/restart-workflow" + // AnnotationAppName specifies the name for application in db. // Note: the annotation is only created by velaUX, please don't use it in other Source of Truth. AnnotationAppName = "app.oam.dev/appName" diff --git a/references/docgen/def-doc/workflowstep/restart-workflow.eg.md b/references/docgen/def-doc/workflowstep/restart-workflow.eg.md new file mode 100644 index 000000000..8002d54e9 --- /dev/null +++ b/references/docgen/def-doc/workflowstep/restart-workflow.eg.md @@ -0,0 +1,81 @@ +Schedule workflow restarts to enable periodic tasks, delayed execution, or time-based orchestration. The step uses exactly one of three timing modes: `at` for a specific timestamp, `after` for a relative delay, or `every` for recurring intervals. + +```yaml +# Example 1: Fixed timestamp - restart at specific time +apiVersion: core.oam.dev/v1beta1 +kind: Application +metadata: + name: scheduled-app + namespace: default +spec: + components: + - name: my-component + type: webservice + properties: + image: nginx:latest + port: 80 + workflow: + steps: + - name: deploy + type: apply-component + properties: + component: my-component + - name: schedule-restart + type: restart-workflow + properties: + at: "2025-01-20T15:00:00Z" +--- +# Example 2: Relative delay - restart after duration +apiVersion: core.oam.dev/v1beta1 +kind: Application +metadata: + name: delayed-restart-app + namespace: default +spec: + components: + - name: batch-processor + type: webservice + properties: + image: myapp/batch-processor:v1 + port: 8080 + workflow: + steps: + - name: deploy + type: apply-component + properties: + component: batch-processor + - name: schedule-restart-after + type: restart-workflow + properties: + after: "1h" +--- +# Example 3: Recurring - restart every interval +apiVersion: core.oam.dev/v1beta1 +kind: Application +metadata: + name: periodic-sync-app + namespace: default +spec: + components: + - name: data-sync + type: webservice + properties: + image: myapp/data-sync:v1 + port: 8080 + workflow: + steps: + - name: deploy + type: apply-component + properties: + component: data-sync + - name: schedule-recurring-restart + type: restart-workflow + properties: + every: "24h" +``` + +**Use cases:** + +- **Periodic tasks**: Schedule recurring workflow execution for data synchronization, batch processing, or scheduled maintenance +- **Delayed deployment**: Add a delay after initial deployment before triggering workflow restart +- **Time-based orchestration**: Coordinate workflows to run at specific times across multiple applications diff --git a/vela-templates/definitions/internal/workflowstep/restart-workflow.cue b/vela-templates/definitions/internal/workflowstep/restart-workflow.cue new file mode 100644 index 000000000..db31ac1ce --- /dev/null +++ b/vela-templates/definitions/internal/workflowstep/restart-workflow.cue @@ -0,0 +1,118 @@ +import ( + "vela/kube" + "vela/builtin" +) + +"restart-workflow": { + type: "workflow-step" + annotations: { + "category": "Workflow Control" + } + labels: { + "scope": "Application" + } + description: "Schedule the current Application's workflow to restart at a specific time, after a duration, or at recurring intervals" +} +template: { + // Count how many parameters are provided + _paramCount: len([ + if parameter.at != _|_ {1}, + if parameter.after != _|_ {1}, + if parameter.every != _|_ {1}, + ]) + + // Fail if not exactly one parameter is provided + if _paramCount != 1 { + validateParams: builtin.#Fail & { + $params: { + message: "Exactly one of 'at', 'after', or 'every' parameters must be specified (found \(_paramCount))" + } + } + } + + // Build the bash script to calculate annotation value + _script: string + if parameter.at != _|_ { + // Fixed timestamp mode - use as-is + _script: """ + VALUE="\(parameter.at)" + kubectl annotate application \(context.name) -n \(context.namespace) app.oam.dev/restart-workflow="$VALUE" --overwrite + """ + } + if parameter.after != _|_ { + // Relative time mode - calculate timestamp using date + // Convert duration format (5m, 1h, 2d) to seconds, then calculate + _script: """ + DURATION="\(parameter.after)" + + # Convert duration to seconds + SECONDS=0 + if [[ "$DURATION" =~ ^([0-9]+)m$ ]]; then + SECONDS=$((${BASH_REMATCH[1]} * 60)) + elif [[ "$DURATION" =~ ^([0-9]+)h$ ]]; then + SECONDS=$((${BASH_REMATCH[1]} * 3600)) + elif [[ "$DURATION" =~ ^([0-9]+)d$ ]]; then + SECONDS=$((${BASH_REMATCH[1]} * 86400)) + else + echo "ERROR: Invalid duration format: $DURATION (expected format: 5m, 1h, or 2d)" + exit 1 + fi + + # Calculate future timestamp using seconds offset + VALUE=$(date -u -d "@$(($(date +%s) + SECONDS))" +%Y-%m-%dT%H:%M:%SZ) + echo "Calculated timestamp for after '$DURATION' ($SECONDS seconds): $VALUE" + kubectl annotate application \(context.name) -n \(context.namespace) app.oam.dev/restart-workflow="$VALUE" --overwrite + """ + } + if parameter.every != _|_ { + // Recurring interval mode - pass duration directly + _script: """ + VALUE="\(parameter.every)" + kubectl annotate application \(context.name) -n \(context.namespace) app.oam.dev/restart-workflow="$VALUE" --overwrite + """ + } + + // Run kubectl to annotate the Application + job: kube.#Apply & { + $params: { + value: { + apiVersion: "batch/v1" + kind: "Job" + metadata: { + name: "\(context.name)-restart-workflow-\(context.stepSessionID)" + namespace: "vela-system" + } + spec: { + backoffLimit: 3 + template: { + spec: { + containers: [{ + name: "kubectl-annotate" + image: "bitnami/kubectl:latest" + command: ["/bin/sh", "-c"] + args: [_script] + }] + restartPolicy: "Never" + serviceAccountName: "kubevela-vela-core" + } + } + } + } + } + } + + wait: builtin.#ConditionalWait & { + if job.$returns.value.status != _|_ if job.$returns.value.status.succeeded != _|_ { + $params: continue: job.$returns.value.status.succeeded > 0 + } + } + + parameter: { + // +usage=Schedule restart at a specific RFC3339 timestamp (e.g., "2025-01-15T14:30:00Z") + at?: string & =~"^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(\\.[0-9]+)?(Z|[+-][0-9]{2}:[0-9]{2})$" + // +usage=Schedule restart after a relative duration from now (e.g., "5m", "1h", "2d") + after?: string & =~"^[0-9]+(m|h|d)$" + // +usage=Schedule recurring restarts every specified duration (e.g., "5m", "1h", "24h") + every?: string & =~"^[0-9]+(m|h|d)$" + } +}