diff --git a/apis/core.oam.dev/common/types.go b/apis/core.oam.dev/common/types.go index 055bf603d..4fdbb8d06 100644 --- a/apis/core.oam.dev/common/types.go +++ b/apis/core.oam.dev/common/types.go @@ -345,6 +345,8 @@ type WorkflowStatus struct { Mode WorkflowMode `json:"mode"` Message string `json:"message,omitempty"` + SuspendState string `json:"suspendState,omitempty"` + Suspend bool `json:"suspend"` Terminated bool `json:"terminated"` Finished bool `json:"finished"` diff --git a/charts/vela-core/crds/core.oam.dev_applicationrevisions.yaml b/charts/vela-core/crds/core.oam.dev_applicationrevisions.yaml index 22881d408..59586009f 100644 --- a/charts/vela-core/crds/core.oam.dev_applicationrevisions.yaml +++ b/charts/vela-core/crds/core.oam.dev_applicationrevisions.yaml @@ -934,6 +934,8 @@ spec: type: array suspend: type: boolean + suspendState: + type: string terminated: type: boolean required: @@ -2743,6 +2745,8 @@ spec: type: array suspend: type: boolean + suspendState: + type: string terminated: type: boolean required: @@ -4682,6 +4686,8 @@ spec: type: array suspend: type: boolean + suspendState: + type: string terminated: type: boolean required: diff --git a/charts/vela-core/crds/core.oam.dev_applications.yaml b/charts/vela-core/crds/core.oam.dev_applications.yaml index a50fd36eb..82aa3535c 100644 --- a/charts/vela-core/crds/core.oam.dev_applications.yaml +++ b/charts/vela-core/crds/core.oam.dev_applications.yaml @@ -643,6 +643,8 @@ spec: type: array suspend: type: boolean + suspendState: + type: string terminated: type: boolean required: @@ -1146,6 +1148,8 @@ spec: type: array suspend: type: boolean + suspendState: + type: string terminated: type: boolean required: diff --git a/charts/vela-core/templates/defwithtemplate/suspend.yaml b/charts/vela-core/templates/defwithtemplate/suspend.yaml index 935364c22..9d81a2d74 100644 --- a/charts/vela-core/templates/defwithtemplate/suspend.yaml +++ b/charts/vela-core/templates/defwithtemplate/suspend.yaml @@ -11,6 +11,8 @@ spec: schematic: cue: template: | - // no parameters - parameter: {} + parameter: { + // +usage=Specify the wait duration time to resume workflow such as "30s", "1min" or "2m15s" + duration?: string + } diff --git a/charts/vela-minimal/crds/core.oam.dev_applicationrevisions.yaml b/charts/vela-minimal/crds/core.oam.dev_applicationrevisions.yaml index 22881d408..59586009f 100644 --- a/charts/vela-minimal/crds/core.oam.dev_applicationrevisions.yaml +++ b/charts/vela-minimal/crds/core.oam.dev_applicationrevisions.yaml @@ -934,6 +934,8 @@ spec: type: array suspend: type: boolean + suspendState: + type: string terminated: type: boolean required: @@ -2743,6 +2745,8 @@ spec: type: array suspend: type: boolean + suspendState: + type: string terminated: type: boolean required: @@ -4682,6 +4686,8 @@ spec: type: array suspend: type: boolean + suspendState: + type: string terminated: type: boolean required: diff --git a/charts/vela-minimal/crds/core.oam.dev_applications.yaml b/charts/vela-minimal/crds/core.oam.dev_applications.yaml index a50fd36eb..82aa3535c 100644 --- a/charts/vela-minimal/crds/core.oam.dev_applications.yaml +++ b/charts/vela-minimal/crds/core.oam.dev_applications.yaml @@ -643,6 +643,8 @@ spec: type: array suspend: type: boolean + suspendState: + type: string terminated: type: boolean required: @@ -1146,6 +1148,8 @@ spec: type: array suspend: type: boolean + suspendState: + type: string terminated: type: boolean required: diff --git a/charts/vela-minimal/templates/defwithtemplate/suspend.yaml b/charts/vela-minimal/templates/defwithtemplate/suspend.yaml index 935364c22..9d81a2d74 100644 --- a/charts/vela-minimal/templates/defwithtemplate/suspend.yaml +++ b/charts/vela-minimal/templates/defwithtemplate/suspend.yaml @@ -11,6 +11,8 @@ spec: schematic: cue: template: | - // no parameters - parameter: {} + parameter: { + // +usage=Specify the wait duration time to resume workflow such as "30s", "1min" or "2m15s" + duration?: string + } diff --git a/legacy/charts/vela-core-legacy/crds/core.oam.dev_applicationrevisions.yaml b/legacy/charts/vela-core-legacy/crds/core.oam.dev_applicationrevisions.yaml index 370ebbe98..cb17750af 100644 --- a/legacy/charts/vela-core-legacy/crds/core.oam.dev_applicationrevisions.yaml +++ b/legacy/charts/vela-core-legacy/crds/core.oam.dev_applicationrevisions.yaml @@ -934,6 +934,8 @@ spec: type: array suspend: type: boolean + suspendState: + type: string terminated: type: boolean required: @@ -2743,6 +2745,8 @@ spec: type: array suspend: type: boolean + suspendState: + type: string terminated: type: boolean required: @@ -4682,6 +4686,8 @@ spec: type: array suspend: type: boolean + suspendState: + type: string terminated: type: boolean required: diff --git a/legacy/charts/vela-core-legacy/crds/core.oam.dev_applications.yaml b/legacy/charts/vela-core-legacy/crds/core.oam.dev_applications.yaml index 3a953a6dd..a9de95e21 100644 --- a/legacy/charts/vela-core-legacy/crds/core.oam.dev_applications.yaml +++ b/legacy/charts/vela-core-legacy/crds/core.oam.dev_applications.yaml @@ -854,6 +854,8 @@ spec: type: array suspend: type: boolean + suspendState: + type: string terminated: type: boolean required: @@ -1519,6 +1521,8 @@ spec: type: array suspend: type: boolean + suspendState: + type: string terminated: type: boolean required: diff --git a/pkg/controller/core.oam.dev/v1alpha2/application/application_controller.go b/pkg/controller/core.oam.dev/v1alpha2/application/application_controller.go index 468a19c82..d9adae454 100644 --- a/pkg/controller/core.oam.dev/v1alpha2/application/application_controller.go +++ b/pkg/controller/core.oam.dev/v1alpha2/application/application_controller.go @@ -213,23 +213,36 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu case common.WorkflowStateInitializing: logCtx.Info("Workflow return state=Initializing") handler.UpdateApplicationRevisionStatus(logCtx, handler.currentAppRev, false, app.Status.Workflow) - return r.gcResourceTrackers(logCtx, handler, common.ApplicationRendering, false) + return r.gcResourceTrackers(logCtx, handler, common.ApplicationRendering, false, false) case common.WorkflowStateSuspended: logCtx.Info("Workflow return state=Suspend") + doWaiting, durationWaiting, err := wf.HandleSuspendWait(logCtx) + if err != nil { + return r.endWithNegativeCondition(logCtx, app, condition.ErrorCondition(common.WorkflowCondition.String(), err), common.ApplicationRunningWorkflow) + } + if doWaiting { + if durationWaiting > 0 { + _, err = r.gcResourceTrackers(logCtx, handler, common.ApplicationWorkflowSuspending, false, true) + return r.result(err).requeue(durationWaiting).ret() + } + handler.app.Status.Workflow.Suspend = false + handler.app.Status.Workflow.SuspendState = "" + return r.gcResourceTrackers(logCtx, handler, common.ApplicationRunningWorkflow, false, false) + } if !workflow.IsFailedAfterRetry(app) { r.stateKeep(logCtx, handler, app) } - return r.gcResourceTrackers(logCtx, handler, common.ApplicationWorkflowSuspending, false) + return r.gcResourceTrackers(logCtx, handler, common.ApplicationWorkflowSuspending, false, true) case common.WorkflowStateTerminated: logCtx.Info("Workflow return state=Terminated") handler.UpdateApplicationRevisionStatus(logCtx, handler.latestAppRev, false, app.Status.Workflow) if err := r.doWorkflowFinish(app, wf); err != nil { return r.endWithNegativeCondition(ctx, app, condition.ErrorCondition(common.WorkflowCondition.String(), errors.WithMessage(err, "DoWorkflowFinish")), common.ApplicationRunningWorkflow) } - return r.gcResourceTrackers(logCtx, handler, common.ApplicationWorkflowTerminated, false) + return r.gcResourceTrackers(logCtx, handler, common.ApplicationWorkflowTerminated, false, true) case common.WorkflowStateExecuting: logCtx.Info("Workflow return state=Executing") - _, err = r.gcResourceTrackers(logCtx, handler, common.ApplicationRunningWorkflow, false) + _, err = r.gcResourceTrackers(logCtx, handler, common.ApplicationRunningWorkflow, false, true) return r.result(err).requeue(wf.GetBackoffWaitTime()).ret() case common.WorkflowStateSucceeded: logCtx.Info("Workflow return state=Succeeded") @@ -241,7 +254,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu r.Recorder.Event(app, event.Normal(velatypes.ReasonApplied, velatypes.MessageWorkflowFinished)) logCtx.Info("Application manifests has applied by workflow successfully") if !EnableReconcileLoopReduction { - return r.gcResourceTrackers(logCtx, handler, common.ApplicationWorkflowFinished, false) + return r.gcResourceTrackers(logCtx, handler, common.ApplicationWorkflowFinished, false, true) } case common.WorkflowStateFinished: logCtx.Info("Workflow state=Finished") @@ -275,7 +288,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu Reason: condition.ReasonReconcileSuccess, }) r.Recorder.Event(app, event.Normal(velatypes.ReasonDeployed, velatypes.MessageDeployed)) - return r.gcResourceTrackers(logCtx, handler, phase, true) + return r.gcResourceTrackers(logCtx, handler, phase, true, true) } func (r *Reconciler) stateKeep(logCtx monitorContext.Context, handler *AppHandler, app *v1beta1.Application) { @@ -286,7 +299,7 @@ func (r *Reconciler) stateKeep(logCtx monitorContext.Context, handler *AppHandle } } -func (r *Reconciler) gcResourceTrackers(logCtx monitorContext.Context, handler *AppHandler, phase common.ApplicationPhase, gcOutdated bool) (ctrl.Result, error) { +func (r *Reconciler) gcResourceTrackers(logCtx monitorContext.Context, handler *AppHandler, phase common.ApplicationPhase, gcOutdated bool, isPatch bool) (ctrl.Result, error) { subCtx := logCtx.Fork("gc_resourceTrackers", monitorContext.DurationMetric(func(v float64) { metrics.GCResourceTrackersDurationHistogram.WithLabelValues("-").Observe(v) })) @@ -312,7 +325,7 @@ func (r *Reconciler) gcResourceTrackers(logCtx monitorContext.Context, handler * return r.result(r.patchStatus(logCtx, handler.app, phase)).requeue(baseGCBackoffWaitTime).ret() } logCtx.Info("GarbageCollected resourcetrackers") - if phase == common.ApplicationRendering { + if !isPatch { return r.result(r.updateStatus(logCtx, handler.app, common.ApplicationRunningWorkflow)).ret() } return r.result(r.patchStatus(logCtx, handler.app, phase)).ret() @@ -371,7 +384,7 @@ func (r *Reconciler) handleFinalizers(ctx monitorContext.Context, app *v1beta1.A if err != nil { return r.result(err).end(true) } - result, err := r.gcResourceTrackers(ctx, handler, common.ApplicationDeleting, true) + result, err := r.gcResourceTrackers(ctx, handler, common.ApplicationDeleting, true, true) if err != nil { return true, result, err } diff --git a/pkg/workflow/interface.go b/pkg/workflow/interface.go index c6e5e32e6..a7cb5fda7 100644 --- a/pkg/workflow/interface.go +++ b/pkg/workflow/interface.go @@ -35,4 +35,6 @@ type Workflow interface { // GetBackoffWaitTime returns the wait time for next retry. GetBackoffWaitTime() time.Duration + + HandleSuspendWait(ctx monitorContext.Context) (bool, time.Duration, error) } diff --git a/pkg/workflow/tasks/discover.go b/pkg/workflow/tasks/discover.go index 5bf8d889a..bbf6fd83d 100644 --- a/pkg/workflow/tasks/discover.go +++ b/pkg/workflow/tasks/discover.go @@ -18,6 +18,8 @@ package tasks import ( "context" + "encoding/json" + builtintime "time" "github.com/pkg/errors" "k8s.io/client-go/rest" @@ -68,10 +70,20 @@ func (td *taskDiscover) GetTaskGenerator(ctx context.Context, name string) (type } func suspend(step v1beta1.WorkflowStep, opt *types.GeneratorOptions) (types.TaskRunner, error) { - return &suspendTaskRunner{ + tr := &suspendTaskRunner{ id: opt.ID, name: step.Name, - }, nil + wait: false, + } + + doDelay, _, err := GetSuspendStepDurationWaiting(step) + if err != nil { + return nil, err + } + + tr.wait = doDelay + + return tr, nil } func newTaskDiscover(providerHandlers providers.Providers, pd *packages.PackageDiscover, pCtx process.Context, templateLoader template.Loader) types.TaskDiscover { @@ -104,6 +116,7 @@ func NewTaskDiscoverFromRevision(providerHandlers providers.Providers, pd *packa type suspendTaskRunner struct { id string name string + wait bool } // Name return suspend step name. @@ -113,12 +126,18 @@ func (tr *suspendTaskRunner) Name() string { // Run make workflow suspend. func (tr *suspendTaskRunner) Run(ctx wfContext.Context, options *types.TaskRunOptions) (common.WorkflowStepStatus, *types.Operation, error) { - return common.WorkflowStepStatus{ + stepStatus := common.WorkflowStepStatus{ ID: tr.id, Name: tr.name, Type: types.WorkflowStepTypeSuspend, Phase: common.WorkflowStepPhaseSucceeded, - }, &types.Operation{Suspend: true}, nil + } + + if tr.wait { + stepStatus.Phase = common.WorkflowStepPhaseRunning + } + + return stepStatus, &types.Operation{Suspend: true}, nil } // Pending check task should be executed or not. @@ -143,3 +162,27 @@ func NewViewTaskDiscover(pd *packages.PackageDiscover, cli client.Client, cfg *r templateLoader: templateLoader, } } + +// GetSuspendStepDurationWaiting get suspend step wait duration +func GetSuspendStepDurationWaiting(step v1beta1.WorkflowStep) (bool, builtintime.Duration, error) { + if step.Properties.Size() > 0 { + o := struct { + Duration string `json:"duration"` + }{} + js, err := common.RawExtensionPointer{RawExtension: step.Properties}.MarshalJSON() + if err != nil { + return false, 0, err + } + + if err := json.Unmarshal(js, &o); err != nil { + return false, 0, err + } + + if o.Duration != "" { + waitDuration, err := builtintime.ParseDuration(o.Duration) + return true, waitDuration, err + } + } + + return false, 0, nil +} diff --git a/pkg/workflow/workflow.go b/pkg/workflow/workflow.go index e73c81bcb..e50b423c8 100644 --- a/pkg/workflow/workflow.go +++ b/pkg/workflow/workflow.go @@ -40,6 +40,7 @@ import ( wfContext "github.com/oam-dev/kubevela/pkg/workflow/context" "github.com/oam-dev/kubevela/pkg/workflow/debug" "github.com/oam-dev/kubevela/pkg/workflow/recorder" + wfTasks "github.com/oam-dev/kubevela/pkg/workflow/tasks" wfTypes "github.com/oam-dev/kubevela/pkg/workflow/types" ) @@ -236,6 +237,64 @@ func (w *workflow) GetBackoffWaitTime() time.Duration { return time.Second } +func (w *workflow) HandleSuspendWait(ctx monitorContext.Context) (doWaiting bool, durationWaiting time.Duration, errRet error) { + ctx.Info("handle suspend wait") + for i, stepStatus := range w.app.Status.Workflow.Steps { + if !w.isWaitSuspendStep(stepStatus) { + continue + } + + step := w.getWorkflowStepByName(stepStatus.Name) + if step.Name == "" { + errRet = fmt.Errorf("failed to get workflow step by name: %s", stepStatus.Name) + return + } + + d, wd, err := wfTasks.GetSuspendStepDurationWaiting(step) + if err != nil { + ctx.Error(err, "failed to get suspend step wait duration") + errRet = err + return + } + + if d { + doWaiting = d + remainingDuration := wd - time.Since(stepStatus.FirstExecuteTime.Time) + if remainingDuration <= 0 { + w.app.Status.Workflow.Steps[i].Phase = common.WorkflowStepPhaseSucceeded + } + + if remainingDuration > 0 && (durationWaiting > remainingDuration || durationWaiting <= 0) { + suspendState := fmt.Sprintf("durationWaiting(%s)", wd.String()) + if w.app.Status.Workflow.SuspendState != suspendState { + w.app.Status.Workflow.SuspendState = suspendState + } + durationWaiting = remainingDuration + } + } + + if !w.dagMode { + return + } + } + + return doWaiting, durationWaiting, errRet +} + +func (w *workflow) isWaitSuspendStep(status common.WorkflowStepStatus) bool { + return status.Type == wfTypes.WorkflowStepTypeSuspend && status.Phase == common.WorkflowStepPhaseRunning +} + +func (w *workflow) getWorkflowStepByName(name string) oamcore.WorkflowStep { + for _, s := range w.app.Spec.Workflow.Steps { + if s.Name == name { + return s + } + } + + return oamcore.WorkflowStep{} +} + func (w *workflow) allDone(taskRunners []wfTypes.TaskRunner) bool { status := w.app.Status.Workflow for _, t := range taskRunners { @@ -470,26 +529,28 @@ func (e *engine) steps(taskRunners []wfTypes.TaskRunner) error { e.failedAfterRetries = e.failedAfterRetries || operation.FailedAfterRetries e.waiting = e.waiting || operation.Waiting - if status.Phase != common.WorkflowStepPhaseSucceeded { - wfCtx.IncreaseCountValueInMemory(wfTypes.ContextPrefixBackoffTimes, status.ID) + if status.Phase == common.WorkflowStepPhaseSucceeded || (status.Phase == common.WorkflowStepPhaseRunning && status.Type == wfTypes.WorkflowStepTypeSuspend) { + wfCtx.DeleteValueInMemory(wfTypes.ContextPrefixBackoffTimes, status.ID) if err := wfCtx.Commit(); err != nil { return errors.WithMessage(err, "commit workflow context") } - if e.isDag() { - continue + + e.finishStep(operation) + if e.needStop() { + return nil } - e.checkFailedAfterRetries() - return nil + continue } - wfCtx.DeleteValueInMemory(wfTypes.ContextPrefixBackoffTimes, status.ID) + + wfCtx.IncreaseCountValueInMemory(wfTypes.ContextPrefixBackoffTimes, status.ID) if err := wfCtx.Commit(); err != nil { return errors.WithMessage(err, "commit workflow context") } - - e.finishStep(operation) - if e.needStop() { - return nil + if e.isDag() { + continue } + e.checkFailedAfterRetries() + return nil } return nil } diff --git a/test/e2e-test/application_test.go b/test/e2e-test/application_test.go index 16fbad449..05aad7d75 100644 --- a/test/e2e-test/application_test.go +++ b/test/e2e-test/application_test.go @@ -135,6 +135,59 @@ var _ = Describe("Application Normal tests", func() { }, 120*time.Second, time.Second).Should(BeNil()) } + verifyApplicationDelaySuspendExpected := func(ns, appName, suspendStep, nextStep, duration string) { + var testApp v1beta1.Application + Eventually(func() error { + waitDuration, err := time.ParseDuration(duration) + if err != nil { + return err + } + + err = k8sClient.Get(ctx, client.ObjectKey{Namespace: ns, Name: appName}, &testApp) + if err != nil { + return err + } + + if testApp.Status.Workflow == nil { + return fmt.Errorf("application wait to start workflow") + } + + if testApp.Status.Workflow.Finished { + var suspendStartTime, nextStepStartTime metav1.Time + var sFlag, nFlag bool + + for _, wfStatus := range testApp.Status.Workflow.Steps { + if wfStatus.Name == suspendStep { + suspendStartTime = wfStatus.FirstExecuteTime + sFlag = true + continue + } + + if wfStatus.Name == nextStep { + nextStepStartTime = wfStatus.FirstExecuteTime + nFlag = true + } + } + + if !sFlag { + return fmt.Errorf("application can not find suspend step: %s", suspendStep) + } + + if !nFlag { + return fmt.Errorf("application can not find next step: %s", nextStep) + } + + dd := nextStepStartTime.Sub(suspendStartTime.Time) + if waitDuration > dd { + return fmt.Errorf("application suspend wait duration wants more than %s, actually %s", duration, dd.String()) + } + + return nil + } + return fmt.Errorf("application status workflow finished wants true, actually false") + }, 120*time.Second, time.Second).Should(BeNil()) + } + verifyWorkloadRunningExpected := func(workloadName string, replicas int32, image string) { var workload v1.Deployment By("Verify Workload running as expected") @@ -284,6 +337,17 @@ var _ = Describe("Application Normal tests", func() { verifyApplicationWorkflowSuspending(newApp.Namespace, newApp.Name) }) + It("Test wait suspend", func() { + By("Apply wait suspend application") + var newApp v1beta1.Application + Expect(common.ReadYamlToObject("testdata/app/app_wait_suspend.yaml", &newApp)).Should(BeNil()) + newApp.Namespace = namespaceName + Expect(k8sClient.Create(ctx, &newApp)).Should(BeNil()) + + By("check application suspend duration") + verifyApplicationDelaySuspendExpected(newApp.Namespace, newApp.Name, "suspend-test", "apply-wait-suspend-comp", "30s") + }) + It("Test app with ServiceAccount", func() { By("Creating a ServiceAccount") const saName = "app-service-account" diff --git a/test/e2e-test/testdata/app/app_wait_suspend.yaml b/test/e2e-test/testdata/app/app_wait_suspend.yaml new file mode 100644 index 000000000..33f9d1539 --- /dev/null +++ b/test/e2e-test/testdata/app/app_wait_suspend.yaml @@ -0,0 +1,21 @@ +apiVersion: core.oam.dev/v1beta1 +kind: Application +metadata: + name: wait-suspend-test +spec: + components: + - name: wait-suspend-comp + type: webservice + properties: + image: nginx + port: 80 + workflow: + steps: + - name: suspend-test + type: suspend + properties: + duration: 30s + - name: apply-wait-suspend-comp + type: apply-component + properties: + component: wait-suspend-comp \ No newline at end of file diff --git a/vela-templates/definitions/internal/workflowstep/suspend.cue b/vela-templates/definitions/internal/workflowstep/suspend.cue index a25f0e4d1..46b845b69 100644 --- a/vela-templates/definitions/internal/workflowstep/suspend.cue +++ b/vela-templates/definitions/internal/workflowstep/suspend.cue @@ -5,6 +5,8 @@ description: "Suspend your workflow" } template: { - // no parameters - parameter: {} + parameter: { + // +usage=Specify the wait duration time to resume workflow such as "30s", "1min" or "2m15s" + duration?: string + } }