[Backport release-1.1] Feat: Commit step-generate data without success (#2565)

* Feat: commit without success

* Feat: add test case

Co-authored-by: Jian.Li <lj176172@alibaba-inc.com>
This commit is contained in:
Tianxin Dong
2021-10-28 10:48:39 +08:00
committed by GitHub
parent 706a65beae
commit 5e6be649c1
3 changed files with 56 additions and 6 deletions

View File

@@ -48,6 +48,7 @@ type WorkflowContext struct {
store corev1.ConfigMap
components map[string]*ComponentManifest
vars *value.Value
modified bool
}
// GetComponent Get ComponentManifest from workflow context.
@@ -70,7 +71,11 @@ func (wf *WorkflowContext) PatchComponent(name string, patchValue *value.Value)
if err != nil {
return err
}
return component.Patch(patchValue)
if err := component.Patch(patchValue); err != nil {
return err
}
wf.modified = true
return nil
}
// GetVar get variable from workflow context.
@@ -87,7 +92,11 @@ func (wf *WorkflowContext) SetVar(v *value.Value, paths ...string) error {
if err := wf.vars.FillRaw(str, paths...); err != nil {
return err
}
return wf.vars.Error()
if err := wf.vars.Error(); err != nil {
return err
}
wf.modified = true
return nil
}
// MakeParameter make 'value' with interface{}
@@ -106,6 +115,9 @@ func (wf *WorkflowContext) MakeParameter(parameter interface{}) (*value.Value, e
// Commit the workflow context and persist it's content.
func (wf *WorkflowContext) Commit() error {
if !wf.modified {
return nil
}
if err := wf.writeToStore(); err != nil {
return err
}
@@ -303,6 +315,7 @@ func newContext(cli client.Client, ns, app string) (*WorkflowContext, error) {
cli: cli,
store: store,
components: map[string]*ComponentManifest{},
modified: true,
}
var err error
wfCtx.vars, err = value.NewValue("", nil, "")

View File

@@ -249,6 +249,10 @@ func (e *engine) steps(wfCtx wfContext.Context, taskRunners []wfTypes.TaskRunner
e.updateStepStatus(status)
if err := wfCtx.Commit(); err != nil {
return errors.WithMessage(err, "commit workflow context")
}
if status.Phase != common.WorkflowStepPhaseSucceeded {
if e.isDag() {
continue
@@ -256,10 +260,6 @@ func (e *engine) steps(wfCtx wfContext.Context, taskRunners []wfTypes.TaskRunner
return nil
}
if err := wfCtx.Commit(); err != nil {
return errors.WithMessage(err, "commit workflow context")
}
e.finishStep(operation)
if e.needStop() {
return nil

View File

@@ -20,6 +20,8 @@ import (
"context"
"encoding/json"
"github.com/oam-dev/kubevela/pkg/cue/model/value"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@@ -332,6 +334,31 @@ var _ = Describe("Test Workflow", func() {
}},
})).Should(BeEquivalentTo(""))
})
It("step commit data without success", func() {
app, runners := makeTestCase([]oamcore.WorkflowStep{
{
Name: "s1",
Type: "wait-with-set-var",
},
{
Name: "s2",
Type: "success",
},
})
wf := NewWorkflow(app, k8sClient, common.WorkflowModeStep)
state, err := wf.ExecuteSteps(context.Background(), revision, runners)
Expect(err).ToNot(HaveOccurred())
Expect(state).Should(BeEquivalentTo(common.WorkflowStateExecuting))
Expect(app.Status.Workflow.Steps[0].Phase).Should(BeEquivalentTo(common.WorkflowStepPhaseRunning))
wfCtx, err := wfContext.LoadContext(k8sClient, app.Namespace, app.Name)
Expect(err).ToNot(HaveOccurred())
v, err := wfCtx.GetVar("saved")
Expect(err).ToNot(HaveOccurred())
saved, err := v.CueValue().Bool()
Expect(err).ToNot(HaveOccurred())
Expect(saved).Should(BeEquivalentTo(true))
})
})
func makeTestCase(steps []oamcore.WorkflowStep) (*oamcore.Application, []wfTypes.TaskRunner) {
@@ -401,6 +428,16 @@ func makeRunner(name string, tpy string) wfTypes.TaskRunner {
Phase: common.WorkflowStepPhaseRunning,
}, &wfTypes.Operation{}, errors.New("error for test")
}
case "wait-with-set-var":
run = func(ctx wfContext.Context, options *wfTypes.TaskRunOptions) (common.WorkflowStepStatus, *wfTypes.Operation, error) {
v, _ := value.NewValue(`saved: true`, nil, "")
err := ctx.SetVar(v)
return common.WorkflowStepStatus{
Name: name,
Type: "wait-with-set-var",
Phase: common.WorkflowStepPhaseRunning,
}, &wfTypes.Operation{}, err
}
default:
run = func(ctx wfContext.Context, options *wfTypes.TaskRunOptions) (common.WorkflowStepStatus, *wfTypes.Operation, error) {