From ee8773e1cf16145c7fc2a714b2a745aed74dd331 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Mon, 24 Jan 2022 10:37:48 +0800 Subject: [PATCH] [Backport release-1.2] Fix: handle workflow cache reconcile (#3148) * Fix: handle workflow cache reconcile Signed-off-by: FogDong (cherry picked from commit 12df87ac117cd17c7507a6feed530a577723d18a) * fix return and move backoff to memory Signed-off-by: FogDong (cherry picked from commit ee876f53c396a8addeeb48ae24e1c484ec6c685a) * handle failed to patch case Signed-off-by: FogDong (cherry picked from commit eac4a1b3708ae6fcba6dca72b4bb58cdb49f9cfe) * add store in err case Signed-off-by: FogDong (cherry picked from commit 32825c5c4187c71156e2556eebbb421db9c978c2) * make reviewable Signed-off-by: FogDong (cherry picked from commit 02b9c609227b44614887bc54a3c8ae13096ea43c) * fix ut Signed-off-by: FogDong (cherry picked from commit bff156cbe615a7be613e715fd1486e60250f6574) * do cleanup in ut Signed-off-by: FogDong (cherry picked from commit 463bd96e78efd9a5586efb1275c8a8ede11701d5) Co-authored-by: FogDong --- apis/core.oam.dev/common/types.go | 2 + .../application/application_controller.go | 17 ++- pkg/velaql/context.go | 17 ++- pkg/workflow/context/context.go | 104 +++++++++++++----- pkg/workflow/context/context_test.go | 29 ++++- pkg/workflow/context/interface.go | 5 +- pkg/workflow/interface.go | 3 - pkg/workflow/tasks/custom/task.go | 2 +- pkg/workflow/tasks/custom/task_test.go | 1 + pkg/workflow/workflow.go | 99 +++++++---------- pkg/workflow/workflow_test.go | 34 +++--- 11 files changed, 195 insertions(+), 118 deletions(-) diff --git a/apis/core.oam.dev/common/types.go b/apis/core.oam.dev/common/types.go index cb1fd074c..3f74c1cd0 100644 --- a/apis/core.oam.dev/common/types.go +++ b/apis/core.oam.dev/common/types.go @@ -213,6 +213,8 @@ const ( WorkflowStateFinished WorkflowState = "finished" // WorkflowStateExecuting means workflow is still running or waiting some steps. WorkflowStateExecuting WorkflowState = "executing" + // WorkflowStateSkipping means it will skip this reconcile and let next reconcile to handle it. + WorkflowStateSkipping WorkflowState = "skipping" ) // ApplicationComponentStatus record the health status of App component diff --git a/pkg/controller/core.oam.dev/v1alpha2/application/application_controller.go b/pkg/controller/core.oam.dev/v1alpha2/application/application_controller.go index af1080a3d..2759e5191 100644 --- a/pkg/controller/core.oam.dev/v1alpha2/application/application_controller.go +++ b/pkg/controller/core.oam.dev/v1alpha2/application/application_controller.go @@ -245,6 +245,9 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu if status := app.Status.Workflow; status != nil && status.Terminated { return r.result(nil).ret() } + case common.WorkflowStateSkipping: + logCtx.Info("Skip this reconcile") + return ctrl.Result{}, nil } var phase = common.ApplicationRunning @@ -389,7 +392,12 @@ func (r *Reconciler) endWithNegativeCondition(ctx context.Context, app *v1beta1. func (r *Reconciler) patchStatus(ctx context.Context, app *v1beta1.Application, phase common.ApplicationPhase) error { app.Status.Phase = phase updateObservedGeneration(app) - return r.Status().Patch(ctx, app, client.Merge) + if err := r.Status().Patch(ctx, app, client.Merge); err != nil { + // set to -1 to re-run workflow if status is failed to patch + workflow.StepStatusCache.Store(fmt.Sprintf("%s-%s", app.Name, app.Namespace), -1) + return err + } + return nil } func (r *Reconciler) updateStatus(ctx context.Context, app *v1beta1.Application, phase common.ApplicationPhase) error { @@ -403,7 +411,12 @@ func (r *Reconciler) updateStatus(ctx context.Context, app *v1beta1.Application, if err != nil { return err } - return r.Status().Update(ctx, obj) + if err := r.Status().Update(ctx, obj); err != nil { + // set to -1 to re-run workflow if status is failed to update + workflow.StepStatusCache.Store(fmt.Sprintf("%s-%s", app.Name, app.Namespace), -1) + return err + } + return nil } func (r *Reconciler) doWorkflowFinish(app *v1beta1.Application, wf workflow.Workflow) error { diff --git a/pkg/velaql/context.go b/pkg/velaql/context.go index 3e3ce6f71..5dce307ed 100644 --- a/pkg/velaql/context.go +++ b/pkg/velaql/context.go @@ -85,11 +85,24 @@ func (c ViewContext) GetMutableValue(paths ...string) string { func (c ViewContext) SetMutableValue(data string, paths ...string) { } -// IncreaseMutableCountValue increase mutable count in workflow context. -func (c ViewContext) IncreaseMutableCountValue(paths ...string) int { +// IncreaseCountValueInMemory increase count in workflow context memory store. +func (c ViewContext) IncreaseCountValueInMemory(paths ...string) int { return 0 } +// SetValueInMemory set data in workflow context memory store. +func (c ViewContext) SetValueInMemory(data interface{}, paths ...string) { +} + +// GetValueInMemory get data in workflow context memory store. +func (c ViewContext) GetValueInMemory(paths ...string) (interface{}, bool) { + return "", true +} + +// DeleteValueInMemory delete data in workflow context memory store. +func (c ViewContext) DeleteValueInMemory(paths ...string) { +} + // DeleteMutableValue delete mutable data in workflow context. func (c ViewContext) DeleteMutableValue(paths ...string) { } diff --git a/pkg/workflow/context/context.go b/pkg/workflow/context/context.go index def2331ba..d52317443 100644 --- a/pkg/workflow/context/context.go +++ b/pkg/workflow/context/context.go @@ -20,8 +20,8 @@ import ( "context" "encoding/json" "fmt" - "strconv" "strings" + "sync" "time" "cuelang.org/go/cue" @@ -48,13 +48,18 @@ const ( AnnotationStartTimestamp = "vela.io/startTime" ) +var ( + workflowMemoryCache sync.Map +) + // WorkflowContext is workflow context. type WorkflowContext struct { - cli client.Client - store *corev1.ConfigMap - components map[string]*ComponentManifest - vars *value.Value - modified bool + cli client.Client + store *corev1.ConfigMap + memoryStore *sync.Map + components map[string]*ComponentManifest + vars *value.Value + modified bool } // GetComponent Get ComponentManifest from workflow context. @@ -105,7 +110,7 @@ func (wf *WorkflowContext) SetVar(v *value.Value, paths ...string) error { return nil } -// GetStore get configmap of workflow context. +// GetStore get store of workflow context. func (wf *WorkflowContext) GetStore() *corev1.ConfigMap { return wf.store } @@ -121,23 +126,6 @@ func (wf *WorkflowContext) SetMutableValue(data string, paths ...string) { wf.modified = true } -// IncreaseMutableCountValue increase mutable count in workflow context. -func (wf *WorkflowContext) IncreaseMutableCountValue(paths ...string) int { - c := wf.GetMutableValue(paths...) - if c == "" { - wf.SetMutableValue("0", paths...) - return 0 - } - count, err := strconv.Atoi(c) - if err != nil { - wf.SetMutableValue("0", paths...) - return 0 - } - count++ - wf.SetMutableValue(strconv.Itoa(count), paths...) - return count -} - // DeleteMutableValue delete mutable data in workflow context. func (wf *WorkflowContext) DeleteMutableValue(paths ...string) { key := strings.Join(paths, ".") @@ -147,6 +135,39 @@ func (wf *WorkflowContext) DeleteMutableValue(paths ...string) { } } +// IncreaseCountValueInMemory increase count in workflow context memory store. +func (wf *WorkflowContext) IncreaseCountValueInMemory(paths ...string) int { + key := strings.Join(paths, ".") + c, ok := wf.memoryStore.Load(key) + if !ok { + wf.memoryStore.Store(key, 0) + return 0 + } + count, ok := c.(int) + if !ok { + wf.memoryStore.Store(key, 0) + return 0 + } + count++ + wf.memoryStore.Store(key, count) + return count +} + +// SetValueInMemory set data in workflow context memory store. +func (wf *WorkflowContext) SetValueInMemory(data interface{}, paths ...string) { + wf.memoryStore.Store(strings.Join(paths, "."), data) +} + +// GetValueInMemory get data in workflow context memory store. +func (wf *WorkflowContext) GetValueInMemory(paths ...string) (interface{}, bool) { + return wf.memoryStore.Load(strings.Join(paths, ".")) +} + +// DeleteValueInMemory delete data in workflow context memory store. +func (wf *WorkflowContext) DeleteValueInMemory(paths ...string) { + wf.memoryStore.Delete(strings.Join(paths, ".")) +} + // MakeParameter make 'value' with interface{} func (wf *WorkflowContext) MakeParameter(parameter interface{}) (*value.Value, error) { var s = "{}" @@ -317,6 +338,11 @@ func NewContext(cli client.Client, ns, app string, appUID types.UID) (Context, e return wfCtx, wfCtx.Commit() } +// CleanupMemoryStore cleans up memory store. +func CleanupMemoryStore(app, ns string) { + workflowMemoryCache.Delete(fmt.Sprintf("%s-%s", app, ns)) +} + func newContext(cli client.Client, ns, app string, appUID types.UID) (*WorkflowContext, error) { var ( ctx = context.Background() @@ -347,11 +373,13 @@ func newContext(cli client.Client, ns, app string, appUID types.UID) (*WorkflowC store.Annotations = map[string]string{ AnnotationStartTimestamp: time.Now().String(), } + memCache := getMemoryStore(fmt.Sprintf("%s-%s", app, ns)) wfCtx := &WorkflowContext{ - cli: cli, - store: &store, - components: map[string]*ComponentManifest{}, - modified: true, + cli: cli, + store: &store, + memoryStore: memCache, + components: map[string]*ComponentManifest{}, + modified: true, } var err error wfCtx.vars, err = value.NewValue("", nil, "") @@ -359,6 +387,20 @@ func newContext(cli client.Client, ns, app string, appUID types.UID) (*WorkflowC return wfCtx, err } +func getMemoryStore(key string) *sync.Map { + memCache := &sync.Map{} + mc, ok := workflowMemoryCache.Load(key) + if !ok { + workflowMemoryCache.Store(key, memCache) + } else { + memCache, ok = mc.(*sync.Map) + if !ok { + workflowMemoryCache.Store(key, memCache) + } + } + return memCache +} + // LoadContext load workflow context from store. func LoadContext(cli client.Client, ns, app string) (Context, error) { var store corev1.ConfigMap @@ -372,9 +414,11 @@ func LoadContext(cli client.Client, ns, app string) (Context, error) { }, &store); err != nil { return nil, err } + memCache := getMemoryStore(fmt.Sprintf("%s-%s", app, ns)) ctx := &WorkflowContext{ - cli: cli, - store: &store, + cli: cli, + store: &store, + memoryStore: memCache, } if err := ctx.LoadFromConfigMap(store); err != nil { return nil, err diff --git a/pkg/workflow/context/context_test.go b/pkg/workflow/context/context_test.go index 0c9a07122..a8ab77b32 100644 --- a/pkg/workflow/context/context_test.go +++ b/pkg/workflow/context/context_test.go @@ -275,14 +275,33 @@ func TestMutableValue(t *testing.T) { wfCtx.DeleteMutableValue("test", "key") v = wfCtx.GetMutableValue("test", "key") r.Equal(v, "") +} - wfCtx.SetMutableValue("value", "test", "key") - count := wfCtx.IncreaseMutableCountValue("test", "key") +func TestMemoryValue(t *testing.T) { + cli := newCliForTest(t, nil) + r := require.New(t) + + wfCtx, err := NewContext(cli, "default", "app-v1", "testuid") + r.NoError(err) + err = wfCtx.Commit() + r.NoError(err) + + wfCtx.SetValueInMemory("value", "test", "key") + v, ok := wfCtx.GetValueInMemory("test", "key") + r.Equal(ok, true) + r.Equal(v.(string), "value") + + wfCtx.DeleteValueInMemory("test", "key") + _, ok = wfCtx.GetValueInMemory("test", "key") + r.Equal(ok, false) + + wfCtx.SetValueInMemory("value", "test", "key") + count := wfCtx.IncreaseCountValueInMemory("test", "key") r.Equal(count, 0) - count = wfCtx.IncreaseMutableCountValue("notfound", "key") + count = wfCtx.IncreaseCountValueInMemory("notfound", "key") r.Equal(count, 0) - wfCtx.SetMutableValue("10", "number", "key") - count = wfCtx.IncreaseMutableCountValue("number", "key") + wfCtx.SetValueInMemory(10, "number", "key") + count = wfCtx.IncreaseCountValueInMemory("number", "key") r.Equal(count, 11) } diff --git a/pkg/workflow/context/interface.go b/pkg/workflow/context/interface.go index 1ce8e32bd..53e6a298a 100644 --- a/pkg/workflow/context/interface.go +++ b/pkg/workflow/context/interface.go @@ -32,8 +32,11 @@ type Context interface { GetStore() *corev1.ConfigMap GetMutableValue(path ...string) string SetMutableValue(data string, path ...string) - IncreaseMutableCountValue(paths ...string) int DeleteMutableValue(paths ...string) + IncreaseCountValueInMemory(paths ...string) int + SetValueInMemory(data interface{}, paths ...string) + GetValueInMemory(paths ...string) (interface{}, bool) + DeleteValueInMemory(paths ...string) Commit() error MakeParameter(parameter interface{}) (*value.Value, error) StoreRef() *corev1.ObjectReference diff --git a/pkg/workflow/interface.go b/pkg/workflow/interface.go index 7feada095..c6e5e32e6 100644 --- a/pkg/workflow/interface.go +++ b/pkg/workflow/interface.go @@ -33,9 +33,6 @@ type Workflow interface { // Trace record workflow state in controllerRevision. Trace() error - // CleanupCountersInContext cleans up the temporary counters in workflow context. - CleanupCountersInContext(ctx monitorContext.Context) - // GetBackoffWaitTime returns the wait time for next retry. GetBackoffWaitTime() time.Duration } diff --git a/pkg/workflow/tasks/custom/task.go b/pkg/workflow/tasks/custom/task.go index 3856b6088..595b03c93 100644 --- a/pkg/workflow/tasks/custom/task.go +++ b/pkg/workflow/tasks/custom/task.go @@ -286,7 +286,7 @@ func (exec *executor) err(ctx wfContext.Context, err error, reason string) { } func (exec *executor) checkErrorTimes(ctx wfContext.Context) { - times := ctx.IncreaseMutableCountValue(wfTypes.ContextPrefixFailedTimes, exec.wfStatus.ID) + times := ctx.IncreaseCountValueInMemory(wfTypes.ContextPrefixFailedTimes, exec.wfStatus.ID) if times >= MaxErrorTimes { exec.wait = false exec.failedAfterRetries = true diff --git a/pkg/workflow/tasks/custom/task_test.go b/pkg/workflow/tasks/custom/task_test.go index bdbbe2c99..7d65a5a13 100644 --- a/pkg/workflow/tasks/custom/task_test.go +++ b/pkg/workflow/tasks/custom/task_test.go @@ -243,6 +243,7 @@ close({ r.Equal(operation.Waiting, true) r.Equal(status.Phase, common.WorkflowStepPhaseFailed) case "failed-after-retries": + wfContext.CleanupMemoryStore("app-v1", "default") newCtx := newWorkflowContextForTest(t) for i := 0; i < MaxErrorTimes; i++ { status, operation, err = run.Run(newCtx, &types.TaskRunOptions{}) diff --git a/pkg/workflow/workflow.go b/pkg/workflow/workflow.go index 67f138f0b..9f337dd27 100644 --- a/pkg/workflow/workflow.go +++ b/pkg/workflow/workflow.go @@ -20,8 +20,7 @@ import ( "encoding/json" "fmt" "math" - "strconv" - "strings" + "sync" "time" "github.com/pkg/errors" @@ -45,6 +44,8 @@ import ( var ( // DisableRecorder optimize workflow by disable recorder DisableRecorder = false + // StepStatusCache cache the step status + StepStatusCache sync.Map ) const ( @@ -123,11 +124,16 @@ func (w *workflow) ExecuteSteps(ctx monitorContext.Context, appRev *oamcore.Appl } } w.app.Status.Conditions = reservedConditions + StepStatusCache.Delete(fmt.Sprintf("%s-%s", w.app.Name, w.app.Namespace)) + wfContext.CleanupMemoryStore(w.app.Name, w.app.Namespace) return common.WorkflowStateInitializing, nil } wfStatus := w.app.Status.Workflow + cacheKey := fmt.Sprintf("%s-%s", w.app.Name, w.app.Namespace) + if wfStatus.Finished { + StepStatusCache.Delete(cacheKey) return common.WorkflowStateFinished, nil } if wfStatus.Terminated { @@ -148,7 +154,13 @@ func (w *workflow) ExecuteSteps(ctx monitorContext.Context, appRev *oamcore.Appl return common.WorkflowStateExecuting, err } w.wfCtx = wfCtx - w.checkDuplicateID(ctx) + + if cacheValue, ok := StepStatusCache.Load(cacheKey); ok { + // handle cache resource + if len(wfStatus.Steps) < cacheValue.(int) { + return common.WorkflowStateSkipping, nil + } + } e := &engine{ status: wfStatus, @@ -161,17 +173,19 @@ func (w *workflow) ExecuteSteps(ctx monitorContext.Context, appRev *oamcore.Appl err = e.run(taskRunners) if err != nil { ctx.Error(err, "run steps") + StepStatusCache.Store(cacheKey, len(wfStatus.Steps)) wfStatus.Message = string(common.WorkflowStateExecuting) return common.WorkflowStateExecuting, err } e.checkWorkflowStatusMessage(wfStatus) + StepStatusCache.Store(cacheKey, len(wfStatus.Steps)) if wfStatus.Terminated { - w.CleanupCountersInContext(ctx) + wfContext.CleanupMemoryStore(e.app.Name, e.app.Namespace) return common.WorkflowStateTerminated, nil } if wfStatus.Suspend { - w.CleanupCountersInContext(ctx) + wfContext.CleanupMemoryStore(e.app.Name, e.app.Namespace) return common.WorkflowStateSuspended, nil } if w.allDone(taskRunners) { @@ -194,31 +208,13 @@ func (w *workflow) Trace() error { return recorder.With(w.cli, w.app).Save("", data).Limit(10).Error() } -func (w *workflow) CleanupCountersInContext(ctx monitorContext.Context) { - ctxCM := w.wfCtx.GetStore() - - for k := range ctxCM.Data { - if strings.HasPrefix(k, wfTypes.ContextPrefixFailedTimes) || - strings.HasPrefix(k, wfTypes.ContextPrefixBackoffTimes) || - strings.HasPrefix(k, wfTypes.ContextKeyLastExecuteTime) || - strings.HasPrefix(k, wfTypes.ContextKeyNextExecuteTime) { - delete(ctxCM.Data, k) - } - } - - if err := w.wfCtx.Commit(); err != nil { - ctx.Error(err, "failed to commit workflow context", "application", w.app.Name, "config map", ctxCM.Name) - } - -} - func (w *workflow) GetBackoffWaitTime() time.Duration { - nextTime := w.wfCtx.GetMutableValue(wfTypes.ContextKeyNextExecuteTime) - if nextTime == "" { + nextTime, ok := w.wfCtx.GetValueInMemory(wfTypes.ContextKeyNextExecuteTime) + if !ok { return time.Second } - unix, err := strconv.ParseInt(nextTime, 10, 64) - if err != nil { + unix, ok := nextTime.(int64) + if !ok { return time.Second } next := time.Unix(unix, 0) @@ -285,32 +281,15 @@ func (w *workflow) setMetadataToContext(wfCtx wfContext.Context) error { return wfCtx.SetVar(metadata, wfTypes.ContextKeyMetadata) } -func (w *workflow) checkDuplicateID(ctx monitorContext.Context) { - if len(w.app.Status.Workflow.Steps) > 0 { - return - } - ctxCM := w.wfCtx.GetStore() - found := false - for k := range ctxCM.Data { - if strings.HasPrefix(k, wfTypes.ContextPrefixBackoffTimes) { - found = true - } - } - if found { - w.CleanupCountersInContext(ctx) - } -} - -func getBackoffWaitTime(wfCtx wfContext.Context) int { - ctxCM := wfCtx.GetStore() +func (e *engine) getBackoffWaitTime() int { // the default value of min times reaches the max workflow backoff wait time minTimes := 15 found := false - for k, v := range ctxCM.Data { - if strings.HasPrefix(k, wfTypes.ContextPrefixBackoffTimes) { + for _, step := range e.status.Steps { + if v, ok := e.wfCtx.GetValueInMemory(wfTypes.ContextPrefixBackoffTimes, step.ID); ok { found = true - times, err := strconv.Atoi(v) - if err != nil { + times, ok := v.(int) + if !ok { times = 0 } if times < minTimes { @@ -333,19 +312,19 @@ func getBackoffWaitTime(wfCtx wfContext.Context) int { } func (e *engine) setNextExecuteTime() { - interval := getBackoffWaitTime(e.wfCtx) - lastExecuteTime := e.wfCtx.GetMutableValue(wfTypes.ContextKeyLastExecuteTime) - if lastExecuteTime == "" { + interval := e.getBackoffWaitTime() + lastExecuteTime, ok := e.wfCtx.GetValueInMemory(wfTypes.ContextKeyLastExecuteTime) + if !ok { e.monitorCtx.Error(fmt.Errorf("failed to get last execute time"), "application", e.app.Name) } - last, err := strconv.ParseInt(lastExecuteTime, 10, 64) - if err != nil { - e.monitorCtx.Error(err, "failed to parse last execute time", "lastExecuteTime", lastExecuteTime) + last, ok := lastExecuteTime.(int64) + if !ok { + e.monitorCtx.Error(fmt.Errorf("failed to parse last execute time to int64"), "lastExecuteTime", lastExecuteTime) } next := last + int64(interval) - e.wfCtx.SetMutableValue(strconv.FormatInt(next, 10), wfTypes.ContextKeyNextExecuteTime) + e.wfCtx.SetValueInMemory(next, wfTypes.ContextKeyNextExecuteTime) if err := e.wfCtx.Commit(); err != nil { e.monitorCtx.Error(err, "failed to commit next execute time", "nextExecuteTime", next) } @@ -376,7 +355,7 @@ func (e *engine) runAsDAG(taskRunners []wfTypes.TaskRunner) error { } todoTasks = append(todoTasks, tRunner) } else { - wfCtx.DeleteMutableValue(wfTypes.ContextPrefixBackoffTimes, stepID) + wfCtx.DeleteValueInMemory(wfTypes.ContextPrefixBackoffTimes, stepID) } } if done { @@ -461,7 +440,7 @@ func (e *engine) steps(taskRunners []wfTypes.TaskRunner) error { e.failedAfterRetries = e.failedAfterRetries || operation.FailedAfterRetries e.waiting = e.waiting || operation.Waiting if status.Phase != common.WorkflowStepPhaseSucceeded { - wfCtx.IncreaseMutableCountValue(wfTypes.ContextPrefixBackoffTimes, status.ID) + wfCtx.IncreaseCountValueInMemory(wfTypes.ContextPrefixBackoffTimes, status.ID) if err := wfCtx.Commit(); err != nil { return errors.WithMessage(err, "commit workflow context") } @@ -471,7 +450,7 @@ func (e *engine) steps(taskRunners []wfTypes.TaskRunner) error { e.checkFailedAfterRetries() return nil } - wfCtx.DeleteMutableValue(wfTypes.ContextPrefixBackoffTimes, status.ID) + wfCtx.DeleteValueInMemory(wfTypes.ContextPrefixBackoffTimes, status.ID) if err := wfCtx.Commit(); err != nil { return errors.WithMessage(err, "commit workflow context") } @@ -511,7 +490,7 @@ func (e *engine) updateStepStatus(status common.WorkflowStepStatus) { now = metav1.NewTime(time.Now()) ) - e.wfCtx.SetMutableValue(strconv.FormatInt(now.Unix(), 10), wfTypes.ContextKeyLastExecuteTime) + e.wfCtx.SetValueInMemory(now.Unix(), wfTypes.ContextKeyLastExecuteTime) status.LastExecuteTime = now for i := range e.status.Steps { if e.status.Steps[i].Name == status.Name { diff --git a/pkg/workflow/workflow_test.go b/pkg/workflow/workflow_test.go index 810fe8106..810a7f41c 100644 --- a/pkg/workflow/workflow_test.go +++ b/pkg/workflow/workflow_test.go @@ -142,7 +142,6 @@ var _ = Describe("Test Workflow", func() { Phase: common.WorkflowStepPhaseSucceeded, }}, })).Should(BeEquivalentTo("")) - }) It("Workflow test for failed after retries", func() { @@ -237,7 +236,6 @@ var _ = Describe("Test Workflow", func() { Phase: common.WorkflowStepPhaseSucceeded, }}, })).Should(BeEquivalentTo("")) - }) It("Test get backoff time and clean", func() { @@ -251,40 +249,48 @@ var _ = Describe("Test Workflow", func() { wf := NewWorkflow(app, k8sClient, common.WorkflowModeDAG) _, err := wf.ExecuteSteps(ctx, revision, runners) Expect(err).ToNot(HaveOccurred()) + _, err = wf.ExecuteSteps(ctx, revision, runners) + Expect(err).ToNot(HaveOccurred()) + wfCtx, err := wfContext.LoadContext(k8sClient, app.Namespace, app.Name) + Expect(err).ToNot(HaveOccurred()) + e := &engine{ + status: app.Status.Workflow, + wfCtx: wfCtx, + } + interval := e.getBackoffWaitTime() + Expect(interval).Should(BeEquivalentTo(minWorkflowBackoffWaitTime)) By("Test get backoff time") - for i := 0; i < 5; i++ { + for i := 0; i < 4; i++ { _, err = wf.ExecuteSteps(ctx, revision, runners) Expect(err).ToNot(HaveOccurred()) - wfCtx, err := wfContext.LoadContext(k8sClient, app.Namespace, app.Name) - Expect(err).ToNot(HaveOccurred()) - interval := getBackoffWaitTime(wfCtx) + interval := e.getBackoffWaitTime() Expect(interval).Should(BeEquivalentTo(minWorkflowBackoffWaitTime)) } for i := 0; i < 9; i++ { _, err = wf.ExecuteSteps(ctx, revision, runners) Expect(err).ToNot(HaveOccurred()) - wfCtx, err := wfContext.LoadContext(k8sClient, app.Namespace, app.Name) - Expect(err).ToNot(HaveOccurred()) - interval := getBackoffWaitTime(wfCtx) + interval := e.getBackoffWaitTime() Expect(interval).Should(BeEquivalentTo(int(0.05 * math.Pow(2, float64(i+5))))) } _, err = wf.ExecuteSteps(ctx, revision, runners) Expect(err).ToNot(HaveOccurred()) - wfCtx, err := wfContext.LoadContext(k8sClient, app.Namespace, app.Name) - Expect(err).ToNot(HaveOccurred()) - interval := getBackoffWaitTime(wfCtx) + interval = e.getBackoffWaitTime() Expect(interval).Should(BeEquivalentTo(maxWorkflowBackoffWaitTime)) By("Test get backoff time after clean") - wf.CleanupCountersInContext(ctx) + wfContext.CleanupMemoryStore(app.Name, app.Namespace) _, err = wf.ExecuteSteps(ctx, revision, runners) Expect(err).ToNot(HaveOccurred()) wfCtx, err = wfContext.LoadContext(k8sClient, app.Namespace, app.Name) Expect(err).ToNot(HaveOccurred()) - interval = getBackoffWaitTime(wfCtx) + e = &engine{ + status: app.Status.Workflow, + wfCtx: wfCtx, + } + interval = e.getBackoffWaitTime() Expect(interval).Should(BeEquivalentTo(minWorkflowBackoffWaitTime)) })