From 5e6be649c140fc7bd59fd4e44ea859fe8dd0477e Mon Sep 17 00:00:00 2001 From: Tianxin Dong Date: Thu, 28 Oct 2021 10:48:39 +0800 Subject: [PATCH] [Backport release-1.1] Feat: Commit step-generate data without success (#2565) * Feat: commit without success * Feat: add test case Co-authored-by: Jian.Li --- pkg/workflow/context/context.go | 17 +++++++++++++-- pkg/workflow/workflow.go | 8 +++---- pkg/workflow/workflow_test.go | 37 +++++++++++++++++++++++++++++++++ 3 files changed, 56 insertions(+), 6 deletions(-) diff --git a/pkg/workflow/context/context.go b/pkg/workflow/context/context.go index f59d310e9..107377f02 100644 --- a/pkg/workflow/context/context.go +++ b/pkg/workflow/context/context.go @@ -48,6 +48,7 @@ type WorkflowContext struct { store corev1.ConfigMap components map[string]*ComponentManifest vars *value.Value + modified bool } // GetComponent Get ComponentManifest from workflow context. @@ -70,7 +71,11 @@ func (wf *WorkflowContext) PatchComponent(name string, patchValue *value.Value) if err != nil { return err } - return component.Patch(patchValue) + if err := component.Patch(patchValue); err != nil { + return err + } + wf.modified = true + return nil } // GetVar get variable from workflow context. @@ -87,7 +92,11 @@ func (wf *WorkflowContext) SetVar(v *value.Value, paths ...string) error { if err := wf.vars.FillRaw(str, paths...); err != nil { return err } - return wf.vars.Error() + if err := wf.vars.Error(); err != nil { + return err + } + wf.modified = true + return nil } // MakeParameter make 'value' with interface{} @@ -106,6 +115,9 @@ func (wf *WorkflowContext) MakeParameter(parameter interface{}) (*value.Value, e // Commit the workflow context and persist it's content. func (wf *WorkflowContext) Commit() error { + if !wf.modified { + return nil + } if err := wf.writeToStore(); err != nil { return err } @@ -303,6 +315,7 @@ func newContext(cli client.Client, ns, app string) (*WorkflowContext, error) { cli: cli, store: store, components: map[string]*ComponentManifest{}, + modified: true, } var err error wfCtx.vars, err = value.NewValue("", nil, "") diff --git a/pkg/workflow/workflow.go b/pkg/workflow/workflow.go index 16a69beb5..bc9c308f0 100644 --- a/pkg/workflow/workflow.go +++ b/pkg/workflow/workflow.go @@ -249,6 +249,10 @@ func (e *engine) steps(wfCtx wfContext.Context, taskRunners []wfTypes.TaskRunner e.updateStepStatus(status) + if err := wfCtx.Commit(); err != nil { + return errors.WithMessage(err, "commit workflow context") + } + if status.Phase != common.WorkflowStepPhaseSucceeded { if e.isDag() { continue @@ -256,10 +260,6 @@ func (e *engine) steps(wfCtx wfContext.Context, taskRunners []wfTypes.TaskRunner return nil } - if err := wfCtx.Commit(); err != nil { - return errors.WithMessage(err, "commit workflow context") - } - e.finishStep(operation) if e.needStop() { return nil diff --git a/pkg/workflow/workflow_test.go b/pkg/workflow/workflow_test.go index f88e9959f..87c9b0aeb 100644 --- a/pkg/workflow/workflow_test.go +++ b/pkg/workflow/workflow_test.go @@ -20,6 +20,8 @@ import ( "context" "encoding/json" + "github.com/oam-dev/kubevela/pkg/cue/model/value" + . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -332,6 +334,31 @@ var _ = Describe("Test Workflow", func() { }}, })).Should(BeEquivalentTo("")) }) + + It("step commit data without success", func() { + app, runners := makeTestCase([]oamcore.WorkflowStep{ + { + Name: "s1", + Type: "wait-with-set-var", + }, + { + Name: "s2", + Type: "success", + }, + }) + wf := NewWorkflow(app, k8sClient, common.WorkflowModeStep) + state, err := wf.ExecuteSteps(context.Background(), revision, runners) + Expect(err).ToNot(HaveOccurred()) + Expect(state).Should(BeEquivalentTo(common.WorkflowStateExecuting)) + Expect(app.Status.Workflow.Steps[0].Phase).Should(BeEquivalentTo(common.WorkflowStepPhaseRunning)) + wfCtx, err := wfContext.LoadContext(k8sClient, app.Namespace, app.Name) + Expect(err).ToNot(HaveOccurred()) + v, err := wfCtx.GetVar("saved") + Expect(err).ToNot(HaveOccurred()) + saved, err := v.CueValue().Bool() + Expect(err).ToNot(HaveOccurred()) + Expect(saved).Should(BeEquivalentTo(true)) + }) }) func makeTestCase(steps []oamcore.WorkflowStep) (*oamcore.Application, []wfTypes.TaskRunner) { @@ -401,6 +428,16 @@ func makeRunner(name string, tpy string) wfTypes.TaskRunner { Phase: common.WorkflowStepPhaseRunning, }, &wfTypes.Operation{}, errors.New("error for test") } + case "wait-with-set-var": + run = func(ctx wfContext.Context, options *wfTypes.TaskRunOptions) (common.WorkflowStepStatus, *wfTypes.Operation, error) { + v, _ := value.NewValue(`saved: true`, nil, "") + err := ctx.SetVar(v) + return common.WorkflowStepStatus{ + Name: name, + Type: "wait-with-set-var", + Phase: common.WorkflowStepPhaseRunning, + }, &wfTypes.Operation{}, err + } default: run = func(ctx wfContext.Context, options *wfTypes.TaskRunOptions) (common.WorkflowStepStatus, *wfTypes.Operation, error) {