Feat(delay suspend): delayDuration in suspend step properties (#3644)

* Feat(delay suspend): add delayDuration in suspend step properties to support delay by certain time.Duration

Signed-off-by: suxiang <suxiang@asiainfo.com>
Signed-off-by: ArenaSu <704427617@qq.com>

* Feat(delay suspend): add delayDuration parameter to suspend cue

Signed-off-by: ArenaSu <704427617@qq.com>

* Feat(wait suspend): optimize wait suspend

Signed-off-by: ArenaSu <704427617@qq.com>

* Feat(wait suspend): change e2e test to local cluster env

Signed-off-by: ArenaSu <704427617@qq.com>

* Feat(wait suspend): fix WaitSuspend status modify bug

Signed-off-by: ArenaSu <704427617@qq.com>

* Feat(wait suspend): suspend cue parameter type error

Signed-off-by: ArenaSu <704427617@qq.com>

* Feat(wait suspend): set waitDuration optional param in suspend workflow step

Signed-off-by: ArenaSu <704427617@qq.com>

* Feat(wait suspend): add lost suspend.yaml

Signed-off-by: ArenaSu <704427617@qq.com>

* Feat(wait suspend): change solution to add suspendState

Signed-off-by: ArenaSu <704427617@qq.com>

* Feat(wait suspend): change durationWaiting to duration and add isPatch to gcResourceTrackers

Signed-off-by: ArenaSu <704427617@qq.com>
This commit is contained in:
Arena.Su
2022-04-22 11:40:13 +08:00
committed by GitHub
parent 81d479aedf
commit 007f13d2ee
16 changed files with 272 additions and 30 deletions

View File

@@ -345,6 +345,8 @@ type WorkflowStatus struct {
Mode WorkflowMode `json:"mode"`
Message string `json:"message,omitempty"`
SuspendState string `json:"suspendState,omitempty"`
Suspend bool `json:"suspend"`
Terminated bool `json:"terminated"`
Finished bool `json:"finished"`

View File

@@ -934,6 +934,8 @@ spec:
type: array
suspend:
type: boolean
suspendState:
type: string
terminated:
type: boolean
required:
@@ -2743,6 +2745,8 @@ spec:
type: array
suspend:
type: boolean
suspendState:
type: string
terminated:
type: boolean
required:
@@ -4682,6 +4686,8 @@ spec:
type: array
suspend:
type: boolean
suspendState:
type: string
terminated:
type: boolean
required:

View File

@@ -643,6 +643,8 @@ spec:
type: array
suspend:
type: boolean
suspendState:
type: string
terminated:
type: boolean
required:
@@ -1146,6 +1148,8 @@ spec:
type: array
suspend:
type: boolean
suspendState:
type: string
terminated:
type: boolean
required:

View File

@@ -11,6 +11,8 @@ spec:
schematic:
cue:
template: |
// no parameters
parameter: {}
parameter: {
// +usage=Specify the wait duration time to resume workflow such as "30s", "1min" or "2m15s"
duration?: string
}

View File

@@ -934,6 +934,8 @@ spec:
type: array
suspend:
type: boolean
suspendState:
type: string
terminated:
type: boolean
required:
@@ -2743,6 +2745,8 @@ spec:
type: array
suspend:
type: boolean
suspendState:
type: string
terminated:
type: boolean
required:
@@ -4682,6 +4686,8 @@ spec:
type: array
suspend:
type: boolean
suspendState:
type: string
terminated:
type: boolean
required:

View File

@@ -643,6 +643,8 @@ spec:
type: array
suspend:
type: boolean
suspendState:
type: string
terminated:
type: boolean
required:
@@ -1146,6 +1148,8 @@ spec:
type: array
suspend:
type: boolean
suspendState:
type: string
terminated:
type: boolean
required:

View File

@@ -11,6 +11,8 @@ spec:
schematic:
cue:
template: |
// no parameters
parameter: {}
parameter: {
// +usage=Specify the wait duration time to resume workflow such as "30s", "1min" or "2m15s"
duration?: string
}

View File

@@ -934,6 +934,8 @@ spec:
type: array
suspend:
type: boolean
suspendState:
type: string
terminated:
type: boolean
required:
@@ -2743,6 +2745,8 @@ spec:
type: array
suspend:
type: boolean
suspendState:
type: string
terminated:
type: boolean
required:
@@ -4682,6 +4686,8 @@ spec:
type: array
suspend:
type: boolean
suspendState:
type: string
terminated:
type: boolean
required:

View File

@@ -854,6 +854,8 @@ spec:
type: array
suspend:
type: boolean
suspendState:
type: string
terminated:
type: boolean
required:
@@ -1519,6 +1521,8 @@ spec:
type: array
suspend:
type: boolean
suspendState:
type: string
terminated:
type: boolean
required:

View File

@@ -213,23 +213,36 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
case common.WorkflowStateInitializing:
logCtx.Info("Workflow return state=Initializing")
handler.UpdateApplicationRevisionStatus(logCtx, handler.currentAppRev, false, app.Status.Workflow)
return r.gcResourceTrackers(logCtx, handler, common.ApplicationRendering, false)
return r.gcResourceTrackers(logCtx, handler, common.ApplicationRendering, false, false)
case common.WorkflowStateSuspended:
logCtx.Info("Workflow return state=Suspend")
doWaiting, durationWaiting, err := wf.HandleSuspendWait(logCtx)
if err != nil {
return r.endWithNegativeCondition(logCtx, app, condition.ErrorCondition(common.WorkflowCondition.String(), err), common.ApplicationRunningWorkflow)
}
if doWaiting {
if durationWaiting > 0 {
_, err = r.gcResourceTrackers(logCtx, handler, common.ApplicationWorkflowSuspending, false, true)
return r.result(err).requeue(durationWaiting).ret()
}
handler.app.Status.Workflow.Suspend = false
handler.app.Status.Workflow.SuspendState = ""
return r.gcResourceTrackers(logCtx, handler, common.ApplicationRunningWorkflow, false, false)
}
if !workflow.IsFailedAfterRetry(app) {
r.stateKeep(logCtx, handler, app)
}
return r.gcResourceTrackers(logCtx, handler, common.ApplicationWorkflowSuspending, false)
return r.gcResourceTrackers(logCtx, handler, common.ApplicationWorkflowSuspending, false, true)
case common.WorkflowStateTerminated:
logCtx.Info("Workflow return state=Terminated")
handler.UpdateApplicationRevisionStatus(logCtx, handler.latestAppRev, false, app.Status.Workflow)
if err := r.doWorkflowFinish(app, wf); err != nil {
return r.endWithNegativeCondition(ctx, app, condition.ErrorCondition(common.WorkflowCondition.String(), errors.WithMessage(err, "DoWorkflowFinish")), common.ApplicationRunningWorkflow)
}
return r.gcResourceTrackers(logCtx, handler, common.ApplicationWorkflowTerminated, false)
return r.gcResourceTrackers(logCtx, handler, common.ApplicationWorkflowTerminated, false, true)
case common.WorkflowStateExecuting:
logCtx.Info("Workflow return state=Executing")
_, err = r.gcResourceTrackers(logCtx, handler, common.ApplicationRunningWorkflow, false)
_, err = r.gcResourceTrackers(logCtx, handler, common.ApplicationRunningWorkflow, false, true)
return r.result(err).requeue(wf.GetBackoffWaitTime()).ret()
case common.WorkflowStateSucceeded:
logCtx.Info("Workflow return state=Succeeded")
@@ -241,7 +254,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
r.Recorder.Event(app, event.Normal(velatypes.ReasonApplied, velatypes.MessageWorkflowFinished))
logCtx.Info("Application manifests has applied by workflow successfully")
if !EnableReconcileLoopReduction {
return r.gcResourceTrackers(logCtx, handler, common.ApplicationWorkflowFinished, false)
return r.gcResourceTrackers(logCtx, handler, common.ApplicationWorkflowFinished, false, true)
}
case common.WorkflowStateFinished:
logCtx.Info("Workflow state=Finished")
@@ -275,7 +288,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
Reason: condition.ReasonReconcileSuccess,
})
r.Recorder.Event(app, event.Normal(velatypes.ReasonDeployed, velatypes.MessageDeployed))
return r.gcResourceTrackers(logCtx, handler, phase, true)
return r.gcResourceTrackers(logCtx, handler, phase, true, true)
}
func (r *Reconciler) stateKeep(logCtx monitorContext.Context, handler *AppHandler, app *v1beta1.Application) {
@@ -286,7 +299,7 @@ func (r *Reconciler) stateKeep(logCtx monitorContext.Context, handler *AppHandle
}
}
func (r *Reconciler) gcResourceTrackers(logCtx monitorContext.Context, handler *AppHandler, phase common.ApplicationPhase, gcOutdated bool) (ctrl.Result, error) {
func (r *Reconciler) gcResourceTrackers(logCtx monitorContext.Context, handler *AppHandler, phase common.ApplicationPhase, gcOutdated bool, isPatch bool) (ctrl.Result, error) {
subCtx := logCtx.Fork("gc_resourceTrackers", monitorContext.DurationMetric(func(v float64) {
metrics.GCResourceTrackersDurationHistogram.WithLabelValues("-").Observe(v)
}))
@@ -312,7 +325,7 @@ func (r *Reconciler) gcResourceTrackers(logCtx monitorContext.Context, handler *
return r.result(r.patchStatus(logCtx, handler.app, phase)).requeue(baseGCBackoffWaitTime).ret()
}
logCtx.Info("GarbageCollected resourcetrackers")
if phase == common.ApplicationRendering {
if !isPatch {
return r.result(r.updateStatus(logCtx, handler.app, common.ApplicationRunningWorkflow)).ret()
}
return r.result(r.patchStatus(logCtx, handler.app, phase)).ret()
@@ -371,7 +384,7 @@ func (r *Reconciler) handleFinalizers(ctx monitorContext.Context, app *v1beta1.A
if err != nil {
return r.result(err).end(true)
}
result, err := r.gcResourceTrackers(ctx, handler, common.ApplicationDeleting, true)
result, err := r.gcResourceTrackers(ctx, handler, common.ApplicationDeleting, true, true)
if err != nil {
return true, result, err
}

View File

@@ -35,4 +35,6 @@ type Workflow interface {
// GetBackoffWaitTime returns the wait time for next retry.
GetBackoffWaitTime() time.Duration
HandleSuspendWait(ctx monitorContext.Context) (bool, time.Duration, error)
}

View File

@@ -18,6 +18,8 @@ package tasks
import (
"context"
"encoding/json"
builtintime "time"
"github.com/pkg/errors"
"k8s.io/client-go/rest"
@@ -68,10 +70,20 @@ func (td *taskDiscover) GetTaskGenerator(ctx context.Context, name string) (type
}
func suspend(step v1beta1.WorkflowStep, opt *types.GeneratorOptions) (types.TaskRunner, error) {
return &suspendTaskRunner{
tr := &suspendTaskRunner{
id: opt.ID,
name: step.Name,
}, nil
wait: false,
}
doDelay, _, err := GetSuspendStepDurationWaiting(step)
if err != nil {
return nil, err
}
tr.wait = doDelay
return tr, nil
}
func newTaskDiscover(providerHandlers providers.Providers, pd *packages.PackageDiscover, pCtx process.Context, templateLoader template.Loader) types.TaskDiscover {
@@ -104,6 +116,7 @@ func NewTaskDiscoverFromRevision(providerHandlers providers.Providers, pd *packa
type suspendTaskRunner struct {
id string
name string
wait bool
}
// Name return suspend step name.
@@ -113,12 +126,18 @@ func (tr *suspendTaskRunner) Name() string {
// Run make workflow suspend.
func (tr *suspendTaskRunner) Run(ctx wfContext.Context, options *types.TaskRunOptions) (common.WorkflowStepStatus, *types.Operation, error) {
return common.WorkflowStepStatus{
stepStatus := common.WorkflowStepStatus{
ID: tr.id,
Name: tr.name,
Type: types.WorkflowStepTypeSuspend,
Phase: common.WorkflowStepPhaseSucceeded,
}, &types.Operation{Suspend: true}, nil
}
if tr.wait {
stepStatus.Phase = common.WorkflowStepPhaseRunning
}
return stepStatus, &types.Operation{Suspend: true}, nil
}
// Pending check task should be executed or not.
@@ -143,3 +162,27 @@ func NewViewTaskDiscover(pd *packages.PackageDiscover, cli client.Client, cfg *r
templateLoader: templateLoader,
}
}
// GetSuspendStepDurationWaiting get suspend step wait duration
func GetSuspendStepDurationWaiting(step v1beta1.WorkflowStep) (bool, builtintime.Duration, error) {
if step.Properties.Size() > 0 {
o := struct {
Duration string `json:"duration"`
}{}
js, err := common.RawExtensionPointer{RawExtension: step.Properties}.MarshalJSON()
if err != nil {
return false, 0, err
}
if err := json.Unmarshal(js, &o); err != nil {
return false, 0, err
}
if o.Duration != "" {
waitDuration, err := builtintime.ParseDuration(o.Duration)
return true, waitDuration, err
}
}
return false, 0, nil
}

View File

@@ -40,6 +40,7 @@ import (
wfContext "github.com/oam-dev/kubevela/pkg/workflow/context"
"github.com/oam-dev/kubevela/pkg/workflow/debug"
"github.com/oam-dev/kubevela/pkg/workflow/recorder"
wfTasks "github.com/oam-dev/kubevela/pkg/workflow/tasks"
wfTypes "github.com/oam-dev/kubevela/pkg/workflow/types"
)
@@ -236,6 +237,64 @@ func (w *workflow) GetBackoffWaitTime() time.Duration {
return time.Second
}
func (w *workflow) HandleSuspendWait(ctx monitorContext.Context) (doWaiting bool, durationWaiting time.Duration, errRet error) {
ctx.Info("handle suspend wait")
for i, stepStatus := range w.app.Status.Workflow.Steps {
if !w.isWaitSuspendStep(stepStatus) {
continue
}
step := w.getWorkflowStepByName(stepStatus.Name)
if step.Name == "" {
errRet = fmt.Errorf("failed to get workflow step by name: %s", stepStatus.Name)
return
}
d, wd, err := wfTasks.GetSuspendStepDurationWaiting(step)
if err != nil {
ctx.Error(err, "failed to get suspend step wait duration")
errRet = err
return
}
if d {
doWaiting = d
remainingDuration := wd - time.Since(stepStatus.FirstExecuteTime.Time)
if remainingDuration <= 0 {
w.app.Status.Workflow.Steps[i].Phase = common.WorkflowStepPhaseSucceeded
}
if remainingDuration > 0 && (durationWaiting > remainingDuration || durationWaiting <= 0) {
suspendState := fmt.Sprintf("durationWaiting(%s)", wd.String())
if w.app.Status.Workflow.SuspendState != suspendState {
w.app.Status.Workflow.SuspendState = suspendState
}
durationWaiting = remainingDuration
}
}
if !w.dagMode {
return
}
}
return doWaiting, durationWaiting, errRet
}
func (w *workflow) isWaitSuspendStep(status common.WorkflowStepStatus) bool {
return status.Type == wfTypes.WorkflowStepTypeSuspend && status.Phase == common.WorkflowStepPhaseRunning
}
func (w *workflow) getWorkflowStepByName(name string) oamcore.WorkflowStep {
for _, s := range w.app.Spec.Workflow.Steps {
if s.Name == name {
return s
}
}
return oamcore.WorkflowStep{}
}
func (w *workflow) allDone(taskRunners []wfTypes.TaskRunner) bool {
status := w.app.Status.Workflow
for _, t := range taskRunners {
@@ -470,26 +529,28 @@ func (e *engine) steps(taskRunners []wfTypes.TaskRunner) error {
e.failedAfterRetries = e.failedAfterRetries || operation.FailedAfterRetries
e.waiting = e.waiting || operation.Waiting
if status.Phase != common.WorkflowStepPhaseSucceeded {
wfCtx.IncreaseCountValueInMemory(wfTypes.ContextPrefixBackoffTimes, status.ID)
if status.Phase == common.WorkflowStepPhaseSucceeded || (status.Phase == common.WorkflowStepPhaseRunning && status.Type == wfTypes.WorkflowStepTypeSuspend) {
wfCtx.DeleteValueInMemory(wfTypes.ContextPrefixBackoffTimes, status.ID)
if err := wfCtx.Commit(); err != nil {
return errors.WithMessage(err, "commit workflow context")
}
if e.isDag() {
continue
e.finishStep(operation)
if e.needStop() {
return nil
}
e.checkFailedAfterRetries()
return nil
continue
}
wfCtx.DeleteValueInMemory(wfTypes.ContextPrefixBackoffTimes, status.ID)
wfCtx.IncreaseCountValueInMemory(wfTypes.ContextPrefixBackoffTimes, status.ID)
if err := wfCtx.Commit(); err != nil {
return errors.WithMessage(err, "commit workflow context")
}
e.finishStep(operation)
if e.needStop() {
return nil
if e.isDag() {
continue
}
e.checkFailedAfterRetries()
return nil
}
return nil
}

View File

@@ -135,6 +135,59 @@ var _ = Describe("Application Normal tests", func() {
}, 120*time.Second, time.Second).Should(BeNil())
}
verifyApplicationDelaySuspendExpected := func(ns, appName, suspendStep, nextStep, duration string) {
var testApp v1beta1.Application
Eventually(func() error {
waitDuration, err := time.ParseDuration(duration)
if err != nil {
return err
}
err = k8sClient.Get(ctx, client.ObjectKey{Namespace: ns, Name: appName}, &testApp)
if err != nil {
return err
}
if testApp.Status.Workflow == nil {
return fmt.Errorf("application wait to start workflow")
}
if testApp.Status.Workflow.Finished {
var suspendStartTime, nextStepStartTime metav1.Time
var sFlag, nFlag bool
for _, wfStatus := range testApp.Status.Workflow.Steps {
if wfStatus.Name == suspendStep {
suspendStartTime = wfStatus.FirstExecuteTime
sFlag = true
continue
}
if wfStatus.Name == nextStep {
nextStepStartTime = wfStatus.FirstExecuteTime
nFlag = true
}
}
if !sFlag {
return fmt.Errorf("application can not find suspend step: %s", suspendStep)
}
if !nFlag {
return fmt.Errorf("application can not find next step: %s", nextStep)
}
dd := nextStepStartTime.Sub(suspendStartTime.Time)
if waitDuration > dd {
return fmt.Errorf("application suspend wait duration wants more than %s, actually %s", duration, dd.String())
}
return nil
}
return fmt.Errorf("application status workflow finished wants true, actually false")
}, 120*time.Second, time.Second).Should(BeNil())
}
verifyWorkloadRunningExpected := func(workloadName string, replicas int32, image string) {
var workload v1.Deployment
By("Verify Workload running as expected")
@@ -284,6 +337,17 @@ var _ = Describe("Application Normal tests", func() {
verifyApplicationWorkflowSuspending(newApp.Namespace, newApp.Name)
})
It("Test wait suspend", func() {
By("Apply wait suspend application")
var newApp v1beta1.Application
Expect(common.ReadYamlToObject("testdata/app/app_wait_suspend.yaml", &newApp)).Should(BeNil())
newApp.Namespace = namespaceName
Expect(k8sClient.Create(ctx, &newApp)).Should(BeNil())
By("check application suspend duration")
verifyApplicationDelaySuspendExpected(newApp.Namespace, newApp.Name, "suspend-test", "apply-wait-suspend-comp", "30s")
})
It("Test app with ServiceAccount", func() {
By("Creating a ServiceAccount")
const saName = "app-service-account"

View File

@@ -0,0 +1,21 @@
apiVersion: core.oam.dev/v1beta1
kind: Application
metadata:
name: wait-suspend-test
spec:
components:
- name: wait-suspend-comp
type: webservice
properties:
image: nginx
port: 80
workflow:
steps:
- name: suspend-test
type: suspend
properties:
duration: 30s
- name: apply-wait-suspend-comp
type: apply-component
properties:
component: wait-suspend-comp

View File

@@ -5,6 +5,8 @@
description: "Suspend your workflow"
}
template: {
// no parameters
parameter: {}
parameter: {
// +usage=Specify the wait duration time to resume workflow such as "30s", "1min" or "2m15s"
duration?: string
}
}