Feat: add pending phase in workflow step (#4365)

* Feat: add pending phase in workflow step

Signed-off-by: FogDong <dongtianxin.tx@alibaba-inc.com>

* fix test

Signed-off-by: FogDong <dongtianxin.tx@alibaba-inc.com>

* fix lint

Signed-off-by: FogDong <dongtianxin.tx@alibaba-inc.com>

* fix test

Signed-off-by: FogDong <dongtianxin.tx@alibaba-inc.com>
This commit is contained in:
Tianxin Dong
2022-07-19 14:31:20 +08:00
committed by GitHub
parent f876a0b8f8
commit 16dfc1bf8a
12 changed files with 470 additions and 111 deletions

View File

@@ -420,6 +420,8 @@ const (
WorkflowStepPhaseStopped WorkflowStepPhase = "stopped"
// WorkflowStepPhaseRunning will make the controller continue the workflow.
WorkflowStepPhaseRunning WorkflowStepPhase = "running"
// WorkflowStepPhasePending will make the controller wait for the step to run.
WorkflowStepPhasePending WorkflowStepPhase = "pending"
)
// DefinitionType describes the type of DefinitionRevision.

View File

@@ -2402,6 +2402,271 @@ var _ = Describe("Test Application Controller", func() {
Expect(checkApp.Status.Phase).Should(BeEquivalentTo(common.ApplicationWorkflowTerminated))
})
It("application with skip outputs in workflow", func() {
ns := corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "app-with-skip-output",
},
}
Expect(k8sClient.Create(ctx, &ns)).Should(BeNil())
healthComponentDef := &v1beta1.ComponentDefinition{}
hCDefJson, _ := yaml.YAMLToJSON([]byte(cdDefWithHealthStatusYaml))
Expect(json.Unmarshal(hCDefJson, healthComponentDef)).Should(BeNil())
healthComponentDef.Name = "worker-with-health"
healthComponentDef.Namespace = "app-with-skip-output"
Expect(k8sClient.Create(ctx, healthComponentDef)).Should(BeNil())
app := &v1beta1.Application{
TypeMeta: metav1.TypeMeta{
Kind: "Application",
APIVersion: "core.oam.dev/v1beta1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "app-with-skip-output",
Namespace: "app-with-skip-output",
},
Spec: v1beta1.ApplicationSpec{
Components: []common.ApplicationComponent{
{
Name: "myweb1",
Type: "worker-with-health",
Properties: &runtime.RawExtension{Raw: []byte(`{"cmd":["sleep","1000"],"image":"busybox","lives": "i am lives","enemies": "empty"}`)},
},
{
Name: "myweb2",
Type: "worker",
Properties: &runtime.RawExtension{Raw: []byte(`{"cmd":["sleep","1000"],"image":"busybox"}`)},
},
},
Workflow: &v1beta1.Workflow{
Steps: []v1beta1.WorkflowStep{
{
Name: "myweb1",
Type: "apply-component",
If: "false",
Outputs: common.StepOutputs{
{
Name: "output",
ValueFrom: "context.name",
},
},
Properties: &runtime.RawExtension{Raw: []byte(`{"component":"myweb1"}`)},
},
{
Name: "myweb2",
Inputs: common.StepInputs{
{
From: "output",
ParameterKey: "",
},
},
If: `inputs.output == "app-with-timeout-output"`,
Type: "apply-component",
Properties: &runtime.RawExtension{Raw: []byte(`{"component":"myweb2"}`)},
},
},
},
},
}
Expect(k8sClient.Create(context.Background(), app)).Should(BeNil())
appKey := types.NamespacedName{Namespace: ns.Name, Name: app.Name}
testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey})
testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey})
testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey})
expDeployment := &v1.Deployment{}
web1Key := types.NamespacedName{Namespace: ns.Name, Name: "myweb1"}
Expect(k8sClient.Get(ctx, web1Key, expDeployment)).Should(util.NotFoundMatcher{})
web2Key := types.NamespacedName{Namespace: ns.Name, Name: "myweb2"}
Expect(k8sClient.Get(ctx, web2Key, expDeployment)).Should(util.NotFoundMatcher{})
testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey})
Expect(k8sClient.Get(ctx, web2Key, expDeployment)).Should(util.NotFoundMatcher{})
checkApp := &v1beta1.Application{}
Expect(k8sClient.Get(ctx, appKey, checkApp)).Should(BeNil())
Expect(checkApp.Status.Workflow.Steps[0].Phase).Should(BeEquivalentTo(common.WorkflowStepPhaseSkipped))
Expect(checkApp.Status.Workflow.Steps[1].Phase).Should(BeEquivalentTo(common.WorkflowStepPhaseSkipped))
Expect(checkApp.Status.Phase).Should(BeEquivalentTo(common.ApplicationRunning))
})
It("application with invalid inputs in workflow", func() {
ns := corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "app-with-invalid-input",
},
}
Expect(k8sClient.Create(ctx, &ns)).Should(BeNil())
healthComponentDef := &v1beta1.ComponentDefinition{}
hCDefJson, _ := yaml.YAMLToJSON([]byte(cdDefWithHealthStatusYaml))
Expect(json.Unmarshal(hCDefJson, healthComponentDef)).Should(BeNil())
healthComponentDef.Name = "worker-with-health"
healthComponentDef.Namespace = "app-with-invalid-input"
Expect(k8sClient.Create(ctx, healthComponentDef)).Should(BeNil())
app := &v1beta1.Application{
TypeMeta: metav1.TypeMeta{
Kind: "Application",
APIVersion: "core.oam.dev/v1beta1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "app-with-invalid-input",
Namespace: "app-with-invalid-input",
},
Spec: v1beta1.ApplicationSpec{
Components: []common.ApplicationComponent{
{
Name: "myweb1",
Type: "worker-with-health",
Properties: &runtime.RawExtension{Raw: []byte(`{"cmd":["sleep","1000"],"image":"busybox","lives": "i am lives","enemies": "empty"}`)},
},
{
Name: "myweb2",
Type: "worker",
Properties: &runtime.RawExtension{Raw: []byte(`{"cmd":["sleep","1000"],"image":"busybox"}`)},
},
},
Workflow: &v1beta1.Workflow{
Steps: []v1beta1.WorkflowStep{
{
Name: "myweb1",
Type: "apply-component",
Outputs: common.StepOutputs{
{
Name: "output",
ValueFrom: "context.name",
},
},
Properties: &runtime.RawExtension{Raw: []byte(`{"component":"myweb1"}`)},
},
{
Name: "myweb2",
Inputs: common.StepInputs{
{
From: "invalid",
ParameterKey: "",
},
},
Type: "apply-component",
Properties: &runtime.RawExtension{Raw: []byte(`{"component":"myweb2"}`)},
},
},
},
},
}
Expect(k8sClient.Create(context.Background(), app)).Should(BeNil())
appKey := types.NamespacedName{Namespace: ns.Name, Name: app.Name}
testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey})
testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey})
testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey})
expDeployment := &v1.Deployment{}
web1Key := types.NamespacedName{Namespace: ns.Name, Name: "myweb1"}
Expect(k8sClient.Get(ctx, web1Key, expDeployment)).Should(BeNil())
expDeployment.Status.Replicas = 1
expDeployment.Status.ReadyReplicas = 1
Expect(k8sClient.Status().Update(ctx, expDeployment)).Should(BeNil())
web2Key := types.NamespacedName{Namespace: ns.Name, Name: "myweb2"}
Expect(k8sClient.Get(ctx, web2Key, expDeployment)).Should(util.NotFoundMatcher{})
testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey})
Expect(k8sClient.Get(ctx, web2Key, expDeployment)).Should(util.NotFoundMatcher{})
checkApp := &v1beta1.Application{}
Expect(k8sClient.Get(ctx, appKey, checkApp)).Should(BeNil())
Expect(checkApp.Status.Workflow.Steps[0].Phase).Should(BeEquivalentTo(common.WorkflowStepPhaseSucceeded))
Expect(checkApp.Status.Workflow.Steps[1].Phase).Should(BeEquivalentTo(common.WorkflowStepPhasePending))
Expect(checkApp.Status.Phase).Should(BeEquivalentTo(common.ApplicationRunningWorkflow))
})
It("application with invalid inputs in workflow in dag mode", func() {
ns := corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "app-with-invalid-input-dag",
},
}
Expect(k8sClient.Create(ctx, &ns)).Should(BeNil())
healthComponentDef := &v1beta1.ComponentDefinition{}
hCDefJson, _ := yaml.YAMLToJSON([]byte(cdDefWithHealthStatusYaml))
Expect(json.Unmarshal(hCDefJson, healthComponentDef)).Should(BeNil())
healthComponentDef.Name = "worker-with-health"
healthComponentDef.Namespace = "app-with-invalid-input-dag"
Expect(k8sClient.Create(ctx, healthComponentDef)).Should(BeNil())
app := &v1beta1.Application{
TypeMeta: metav1.TypeMeta{
Kind: "Application",
APIVersion: "core.oam.dev/v1beta1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "app-with-invalid-input-dag",
Namespace: "app-with-invalid-input-dag",
},
Spec: v1beta1.ApplicationSpec{
Components: []common.ApplicationComponent{
{
Name: "myweb1",
Type: "worker-with-health",
Properties: &runtime.RawExtension{Raw: []byte(`{"cmd":["sleep","1000"],"image":"busybox","lives": "i am lives","enemies": "empty"}`)},
},
{
Name: "myweb2",
Type: "worker",
Properties: &runtime.RawExtension{Raw: []byte(`{"cmd":["sleep","1000"],"image":"busybox"}`)},
},
},
Workflow: &v1beta1.Workflow{
Mode: &v1beta1.WorkflowExecuteMode{
Steps: common.WorkflowModeDAG,
},
Steps: []v1beta1.WorkflowStep{
{
Name: "myweb1",
Type: "apply-component",
Outputs: common.StepOutputs{
{
Name: "output",
ValueFrom: "context.name",
},
},
Properties: &runtime.RawExtension{Raw: []byte(`{"component":"myweb1"}`)},
},
{
Name: "myweb2",
Inputs: common.StepInputs{
{
From: "invalid",
ParameterKey: "",
},
},
Type: "apply-component",
Properties: &runtime.RawExtension{Raw: []byte(`{"component":"myweb2"}`)},
},
},
},
},
}
Expect(k8sClient.Create(context.Background(), app)).Should(BeNil())
appKey := types.NamespacedName{Namespace: ns.Name, Name: app.Name}
testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey})
testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey})
testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey})
expDeployment := &v1.Deployment{}
web1Key := types.NamespacedName{Namespace: ns.Name, Name: "myweb1"}
Expect(k8sClient.Get(ctx, web1Key, expDeployment)).Should(BeNil())
web2Key := types.NamespacedName{Namespace: ns.Name, Name: "myweb2"}
Expect(k8sClient.Get(ctx, web2Key, expDeployment)).Should(util.NotFoundMatcher{})
checkApp := &v1beta1.Application{}
Expect(k8sClient.Get(ctx, appKey, checkApp)).Should(BeNil())
Expect(checkApp.Status.Workflow.Steps[0].Phase).Should(BeEquivalentTo(common.WorkflowStepPhaseRunning))
Expect(checkApp.Status.Workflow.Steps[1].Phase).Should(BeEquivalentTo(common.WorkflowStepPhasePending))
Expect(checkApp.Status.Phase).Should(BeEquivalentTo(common.ApplicationRunningWorkflow))
})
It("application with if always in workflow", func() {
ns := corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{

View File

@@ -77,7 +77,7 @@ func (watcher *applicationMetricsWatcher) report() {
metrics.ApplicationPhaseCounter.WithLabelValues(phase).Set(float64(watcher.phaseCounter[phase]))
}
for stepPhase := range watcher.stepPhaseDirty {
metrics.WorkflowStepPhaseGauge.WithLabelValues(strings.Split(stepPhase, "/")...).Set(float64(watcher.stepPhaseCounter[stepPhase]))
metrics.WorkflowStepPhaseGauge.WithLabelValues(strings.Split(stepPhase, "/")[:2]...).Set(float64(watcher.stepPhaseCounter[stepPhase]))
}
watcher.phaseDirty = map[string]struct{}{}
watcher.stepPhaseDirty = map[string]struct{}{}

View File

@@ -17,9 +17,12 @@ limitations under the License.
package hooks
import (
"encoding/json"
"fmt"
"strings"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/runtime"
"github.com/oam-dev/kubevela/apis/core.oam.dev/common"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
@@ -43,24 +46,59 @@ func Input(ctx wfContext.Context, paramValue *value.Value, step v1beta1.Workflow
}
// Output get data from task value.
func Output(ctx wfContext.Context, taskValue *value.Value, step v1beta1.WorkflowStep, status common.StepStatus) error {
func Output(ctx wfContext.Context, taskValue *value.Value, step v1beta1.WorkflowStep, status common.StepStatus, stepStatus map[string]common.StepStatus) error {
errMsg := ""
if wfTypes.IsStepFinish(status.Phase, status.Reason) {
SetAdditionalNameInStatus(stepStatus, step.Name, step.Properties, status)
for _, output := range step.Outputs {
v, err := taskValue.LookupByScript(output.ValueFrom)
if err != nil && !strings.Contains(err.Error(), "not found") {
return err
// if the error is not nil and the step is not skipped, return the error
if err != nil && status.Phase != common.WorkflowStepPhaseSkipped {
errMsg += fmt.Sprintf("failed to get output from %s: %s\n", output.ValueFrom, err.Error())
}
// if the error is not nil, set the value to null
if err != nil || v.Error() != nil {
v, err = taskValue.MakeValue("null")
if err != nil {
return err
}
v, _ = taskValue.MakeValue("null")
}
if err := ctx.SetVar(v, output.Name); err != nil {
return err
errMsg += fmt.Sprintf("failed to set output %s: %s\n", output.Name, err.Error())
}
}
}
if errMsg != "" {
return errors.New(errMsg)
}
return nil
}
// SetAdditionalNameInStatus sets additional name from properties to status map
func SetAdditionalNameInStatus(stepStatus map[string]common.StepStatus, name string, properties *runtime.RawExtension, status common.StepStatus) {
if stepStatus == nil || properties == nil {
return
}
o := struct {
Name string `json:"name"`
Component string `json:"component"`
}{}
js, err := properties.MarshalJSON()
if err != nil {
return
}
if err := json.Unmarshal(js, &o); err != nil {
return
}
additionalName := ""
switch {
case o.Name != "":
additionalName = o.Name
case o.Component != "":
additionalName = o.Component
default:
return
}
if _, ok := stepStatus[additionalName]; !ok {
stepStatus[additionalName] = status
return
}
}

View File

@@ -66,6 +66,7 @@ func TestOutput(t *testing.T) {
output: score: 99
`, nil, "")
r.NoError(err)
stepStatus := make(map[string]common.StepStatus)
err = Output(wfCtx, taskValue, v1beta1.WorkflowStep{
Properties: &runtime.RawExtension{
Raw: []byte("{\"name\":\"mystep\"}"),
@@ -76,7 +77,7 @@ output: score: 99
}},
}, common.StepStatus{
Phase: common.WorkflowStepPhaseSucceeded,
})
}, stepStatus)
r.NoError(err)
result, err := wfCtx.GetVar("myscore")
r.NoError(err)
@@ -84,6 +85,7 @@ output: score: 99
r.NoError(err)
r.Equal(s, `99
`)
r.Equal(stepStatus["mystep"].Phase, common.WorkflowStepPhaseSucceeded)
}
func mockContext(t *testing.T) wfContext.Context {

View File

@@ -63,7 +63,7 @@ func (t *TaskLoader) GetTaskGenerator(ctx context.Context, name string) (wfTypes
type taskRunner struct {
name string
run func(ctx wfContext.Context, options *wfTypes.TaskRunOptions) (common.StepStatus, *wfTypes.Operation, error)
checkPending func(ctx wfContext.Context, stepStatus map[string]common.StepStatus) bool
checkPending func(ctx wfContext.Context, stepStatus map[string]common.StepStatus) (bool, common.StepStatus)
}
// Name return step name.
@@ -77,7 +77,7 @@ func (tr *taskRunner) Run(ctx wfContext.Context, options *wfTypes.TaskRunOptions
}
// Pending check task should be executed or not.
func (tr *taskRunner) Pending(ctx wfContext.Context, stepStatus map[string]common.StepStatus) bool {
func (tr *taskRunner) Pending(ctx wfContext.Context, stepStatus map[string]common.StepStatus) (bool, common.StepStatus) {
return tr.checkPending(ctx, stepStatus)
}
@@ -120,10 +120,10 @@ func (t *TaskLoader) makeTaskGenerator(templ string) (wfTypes.TaskGenerator, err
tRunner := new(taskRunner)
tRunner.name = wfStep.Name
tRunner.checkPending = func(ctx wfContext.Context, stepStatus map[string]common.StepStatus) bool {
return CheckPending(ctx, wfStep, stepStatus)
tRunner.checkPending = func(ctx wfContext.Context, stepStatus map[string]common.StepStatus) (bool, common.StepStatus) {
return CheckPending(ctx, wfStep, exec.wfStatus.ID, stepStatus)
}
tRunner.run = func(ctx wfContext.Context, options *wfTypes.TaskRunOptions) (common.StepStatus, *wfTypes.Operation, error) {
tRunner.run = func(ctx wfContext.Context, options *wfTypes.TaskRunOptions) (stepStatus common.StepStatus, operations *wfTypes.Operation, rErr error) {
if options.GetTracer == nil {
options.GetTracer = func(id string, step v1beta1.WorkflowStep) monitorContext.Context {
return monitorContext.NewTraceContext(context.Background(), "")
@@ -139,10 +139,28 @@ func (t *TaskLoader) makeTaskGenerator(templ string) (wfTypes.TaskGenerator, err
t.runOptionsProcess(options)
}
exec.wfStatus.Message = ""
var taskv *value.Value
var err error
var paramFile string
defer func() {
if taskv == nil {
taskv, err = convertTemplate(ctx, t.pd, strings.Join([]string{templ, paramFile}, "\n"), exec.wfStatus.ID, options.PCtx)
if err != nil {
return
}
}
for _, hook := range options.PostStopHooks {
if err := hook(ctx, taskv, wfStep, exec.status(), options.StepStatus); err != nil {
exec.wfStatus.Message = err.Error()
stepStatus = exec.status()
operations = exec.operation()
return
}
}
}()
for _, hook := range options.PreCheckHooks {
result, err := hook(wfStep, &wfTypes.PreCheckOptions{
PackageDiscover: t.pd,
@@ -212,12 +230,6 @@ func (t *TaskLoader) makeTaskGenerator(templ string) (wfTypes.TaskGenerator, err
exec.err(ctx, true, err, wfTypes.StatusReasonExecute)
return exec.status(), exec.operation(), nil
}
for _, hook := range options.PostStopHooks {
if err := hook(ctx, taskv, wfStep, exec.status()); err != nil {
exec.err(ctx, false, err, wfTypes.StatusReasonOutput)
return exec.status(), exec.operation(), nil
}
}
return exec.status(), exec.operation(), nil
}
@@ -552,20 +564,28 @@ func NewTaskLoader(lt LoadTaskTemplate, pkgDiscover *packages.PackageDiscover, h
}
// CheckPending checks whether to pending task run
func CheckPending(ctx wfContext.Context, step v1beta1.WorkflowStep, stepStatus map[string]common.StepStatus) bool {
func CheckPending(ctx wfContext.Context, step v1beta1.WorkflowStep, id string, stepStatus map[string]common.StepStatus) (bool, common.StepStatus) {
pStatus := common.StepStatus{
Phase: common.WorkflowStepPhasePending,
Type: step.Type,
ID: id,
Name: step.Name,
}
for _, depend := range step.DependsOn {
pStatus.Message = fmt.Sprintf("Pending on DependsOn: %s", depend)
if status, ok := stepStatus[depend]; ok {
if !wfTypes.IsStepFinish(status.Phase, status.Reason) {
return true
return true, pStatus
}
} else {
return true
return true, pStatus
}
}
for _, input := range step.Inputs {
pStatus.Message = fmt.Sprintf("Pending on Input: %s", input.From)
if _, err := ctx.GetVar(strings.Split(input.From, ".")...); err != nil {
return true
return true, pStatus
}
}
return false
return false, common.StepStatus{}
}

View File

@@ -246,9 +246,9 @@ close({
case "input":
r.Equal(err.Error(), "do preStartHook: get input from [podIP]: var(path=podIP) not exist")
case "output-var-conflict":
r.Equal(status.Reason, types.StatusReasonOutput)
r.Contains(status.Message, "conflict")
r.Equal(operation.Waiting, false)
r.Equal(status.Phase, common.WorkflowStepPhaseFailed)
r.Equal(status.Phase, common.WorkflowStepPhaseSucceeded)
case "failed-after-retries":
wfContext.CleanupMemoryStore("app-v1", "default")
newCtx := newWorkflowContextForTest(t)
@@ -433,14 +433,16 @@ func TestPendingInputCheck(t *testing.T) {
r.NoError(err)
run, err := gen(step, &types.GeneratorOptions{})
r.NoError(err)
r.Equal(run.Pending(wfCtx, nil), true)
p, _ := run.Pending(wfCtx, nil)
r.Equal(p, true)
score, err := value.NewValue(`
100
`, nil, "")
r.NoError(err)
err = wfCtx.SetVar(score, "score")
r.NoError(err)
r.Equal(run.Pending(wfCtx, nil), false)
p, _ = run.Pending(wfCtx, nil)
r.Equal(p, false)
}
func TestPendingDependsOnCheck(t *testing.T) {
@@ -468,13 +470,15 @@ func TestPendingDependsOnCheck(t *testing.T) {
r.NoError(err)
run, err := gen(step, &types.GeneratorOptions{})
r.NoError(err)
r.Equal(run.Pending(wfCtx, nil), true)
p, _ := run.Pending(wfCtx, nil)
r.Equal(p, true)
ss := map[string]common.StepStatus{
"depend": {
Phase: common.WorkflowStepPhaseSucceeded,
},
}
r.Equal(run.Pending(wfCtx, ss), false)
p, _ = run.Pending(wfCtx, ss)
r.Equal(p, false)
}
func TestSkip(t *testing.T) {
@@ -500,7 +504,8 @@ func TestSkip(t *testing.T) {
r.NoError(err)
runner, err := gen(step, &types.GeneratorOptions{})
r.NoError(err)
status, operations, err := runner.Run(nil, &types.TaskRunOptions{
wfCtx := newWorkflowContextForTest(t)
status, operations, err := runner.Run(wfCtx, &types.TaskRunOptions{
PreCheckHooks: []types.TaskPreCheckHook{
func(step v1beta1.WorkflowStep, options *types.PreCheckOptions) (*types.PreCheckResult, error) {
return &types.PreCheckResult{Skip: true}, nil

View File

@@ -134,10 +134,11 @@ func (tr *suspendTaskRunner) Name() string {
// Run make workflow suspend.
func (tr *suspendTaskRunner) Run(ctx wfContext.Context, options *types.TaskRunOptions) (stepStatus common.StepStatus, operations *types.Operation, rErr error) {
stepStatus = common.StepStatus{
ID: tr.id,
Name: tr.step.Name,
Type: types.WorkflowStepTypeSuspend,
Phase: common.WorkflowStepPhaseRunning,
ID: tr.id,
Name: tr.step.Name,
Type: types.WorkflowStepTypeSuspend,
Phase: common.WorkflowStepPhaseRunning,
Message: "",
}
operations = &types.Operation{Suspend: true}
@@ -173,7 +174,6 @@ func (tr *suspendTaskRunner) Run(ctx wfContext.Context, options *types.TaskRunOp
}
return stepStatus, operations, nil
}
for _, input := range tr.step.Inputs {
if input.ParameterKey == "duration" {
inputValue, err := ctx.GetVar(strings.Split(input.From, ".")...)
@@ -207,8 +207,8 @@ func (tr *suspendTaskRunner) Run(ctx wfContext.Context, options *types.TaskRunOp
}
// Pending check task should be executed or not.
func (tr *suspendTaskRunner) Pending(ctx wfContext.Context, stepStatus map[string]common.StepStatus) bool {
return custom.CheckPending(ctx, tr.step, stepStatus)
func (tr *suspendTaskRunner) Pending(ctx wfContext.Context, stepStatus map[string]common.StepStatus) (bool, common.StepStatus) {
return custom.CheckPending(ctx, tr.step, tr.id, stepStatus)
}
type stepGroupTaskRunner struct {
@@ -227,16 +227,17 @@ func (tr *stepGroupTaskRunner) Name() string {
}
// Pending check task should be executed or not.
func (tr *stepGroupTaskRunner) Pending(ctx wfContext.Context, stepStatus map[string]common.StepStatus) bool {
return custom.CheckPending(ctx, tr.step, stepStatus)
func (tr *stepGroupTaskRunner) Pending(ctx wfContext.Context, stepStatus map[string]common.StepStatus) (bool, common.StepStatus) {
return custom.CheckPending(ctx, tr.step, tr.id, stepStatus)
}
// Run make workflow step group.
func (tr *stepGroupTaskRunner) Run(ctx wfContext.Context, options *types.TaskRunOptions) (status common.StepStatus, operations *types.Operation, rErr error) {
status = common.StepStatus{
ID: tr.id,
Name: tr.name,
Type: types.WorkflowStepTypeStepGroup,
ID: tr.id,
Name: tr.name,
Type: types.WorkflowStepTypeStepGroup,
Message: "",
}
pStatus := &status
@@ -307,6 +308,8 @@ func getStepGroupStatus(status common.StepStatus, stepStatus common.WorkflowStep
status.Phase = common.WorkflowStepPhaseRunning
case subStepCounts[string(common.WorkflowStepPhaseStopped)] > 0:
status.Phase = common.WorkflowStepPhaseStopped
case subStepCounts[string(common.WorkflowStepPhasePending)] > 0:
status.Phase = common.WorkflowStepPhasePending
case subStepCounts[string(common.WorkflowStepPhaseFailed)] > 0:
status.Phase = common.WorkflowStepPhaseFailed
switch {
@@ -371,7 +374,7 @@ func GetSuspendStepDurationWaiting(step v1beta1.WorkflowStep) (time.Duration, er
func handleOutput(ctx wfContext.Context, stepStatus *common.StepStatus, operations *types.Operation, step v1beta1.WorkflowStep, postStopHooks []types.TaskPostStopHook, pd *packages.PackageDiscover, id string, pCtx process.Context) {
status := *stepStatus
if status.Phase != common.WorkflowStepPhaseSkipped && len(step.Outputs) > 0 {
if len(step.Outputs) > 0 {
contextValue, err := custom.MakeValueForContext(ctx, pd, id, pCtx)
if err != nil {
status.Phase = common.WorkflowStepPhaseFailed
@@ -384,7 +387,7 @@ func handleOutput(ctx wfContext.Context, stepStatus *common.StepStatus, operatio
}
for _, hook := range postStopHooks {
if err := hook(ctx, contextValue, step, status); err != nil {
if err := hook(ctx, contextValue, step, status, nil); err != nil {
status.Phase = common.WorkflowStepPhaseFailed
if status.Reason == "" {
status.Reason = types.StatusReasonOutput

View File

@@ -91,13 +91,15 @@ func TestSuspendStep(t *testing.T) {
r.Equal(runner.Name(), "test")
// test pending
r.Equal(runner.Pending(nil, nil), true)
p, _ := runner.Pending(nil, nil)
r.Equal(p, true)
ss := map[string]common.StepStatus{
"depend": {
Phase: common.WorkflowStepPhaseSucceeded,
},
}
r.Equal(runner.Pending(nil, ss), false)
p, _ = runner.Pending(nil, ss)
r.Equal(p, false)
// test skip
status, operations, err := runner.Run(nil, &types.TaskRunOptions{
@@ -181,13 +183,15 @@ func TestStepGroupStep(t *testing.T) {
r.Equal(runner.Name(), "test")
// test pending
r.Equal(runner.Pending(nil, nil), true)
p, _ := runner.Pending(nil, nil)
r.Equal(p, true)
ss := map[string]common.StepStatus{
"depend": {
Phase: common.WorkflowStepPhaseSucceeded,
},
}
r.Equal(runner.Pending(nil, ss), false)
p, _ = runner.Pending(nil, ss)
r.Equal(p, false)
// test skip
status, operations, err := runner.Run(nil, &types.TaskRunOptions{

View File

@@ -34,7 +34,7 @@ import (
// TaskRunner is a task runner.
type TaskRunner interface {
Name() string
Pending(ctx wfContext.Context, stepStatus map[string]common.StepStatus) bool
Pending(ctx wfContext.Context, stepStatus map[string]common.StepStatus) (bool, common.StepStatus)
Run(ctx wfContext.Context, options *TaskRunOptions) (common.StepStatus, *Operation, error)
}
@@ -85,7 +85,7 @@ type TaskPreCheckHook func(step v1beta1.WorkflowStep, options *PreCheckOptions)
type TaskPreStartHook func(ctx wfContext.Context, paramValue *value.Value, step v1beta1.WorkflowStep) error
// TaskPostStopHook run after task execution.
type TaskPostStopHook func(ctx wfContext.Context, taskValue *value.Value, step v1beta1.WorkflowStep, status common.StepStatus) error
type TaskPostStopHook func(ctx wfContext.Context, taskValue *value.Value, step v1beta1.WorkflowStep, status common.StepStatus, stepStatus map[string]common.StepStatus) error
// Operation is workflow operation object.
type Operation struct {

View File

@@ -237,17 +237,14 @@ func (w *workflow) restartWorkflow(ctx monitorContext.Context, revAndSpecHash st
func newEngine(ctx monitorContext.Context, wfCtx wfContext.Context, w *workflow, wfStatus *common.WorkflowStatus) *engine {
stepStatus := make(map[string]common.StepStatus)
for _, ss := range wfStatus.Steps {
setStepStatus(stepStatus, ss.StepStatus)
for _, sss := range ss.SubStepsStatus {
setStepStatus(stepStatus, sss.StepStatus)
}
}
setStepStatus(stepStatus, wfStatus.Steps)
stepDependsOn := make(map[string][]string)
if w.app.Spec.Workflow != nil {
for _, step := range w.app.Spec.Workflow.Steps {
hooks.SetAdditionalNameInStatus(stepStatus, step.Name, step.Properties, stepStatus[step.Name])
stepDependsOn[step.Name] = append(stepDependsOn[step.Name], step.DependsOn...)
for _, sub := range step.SubSteps {
hooks.SetAdditionalNameInStatus(stepStatus, sub.Name, sub.Properties, stepStatus[step.Name])
stepDependsOn[sub.Name] = append(stepDependsOn[sub.Name], sub.DependsOn...)
}
}
@@ -270,12 +267,12 @@ func newEngine(ctx monitorContext.Context, wfCtx wfContext.Context, w *workflow,
}
}
func setStepStatus(statusMap map[string]common.StepStatus, status common.StepStatus) {
statusMap[status.Name] = common.StepStatus{
Phase: status.Phase,
ID: status.ID,
Reason: status.Reason,
FirstExecuteTime: status.FirstExecuteTime,
func setStepStatus(statusMap map[string]common.StepStatus, status []common.WorkflowStepStatus) {
for _, ss := range status {
statusMap[ss.Name] = ss.StepStatus
for _, sss := range ss.SubStepsStatus {
statusMap[sss.Name] = sss.StepStatus
}
}
}
@@ -296,12 +293,7 @@ func (w *workflow) GetSuspendBackoffWaitTime() time.Duration {
return 0
}
stepStatus := make(map[string]common.StepStatus)
for _, ss := range w.app.Status.Workflow.Steps {
setStepStatus(stepStatus, ss.StepStatus)
for _, sss := range ss.SubStepsStatus {
setStepStatus(stepStatus, sss.StepStatus)
}
}
setStepStatus(stepStatus, w.app.Status.Workflow.Steps)
max := time.Duration(1<<63 - 1)
min := max
for _, step := range w.app.Spec.Workflow.Steps {
@@ -529,7 +521,7 @@ func (e *engine) setNextExecuteTime() {
e.wfCtx.SetValueInMemory(next, wfTypes.ContextKeyNextExecuteTime)
}
func (e *engine) runAsDAG(taskRunners []wfTypes.TaskRunner) error {
func (e *engine) runAsDAG(taskRunners []wfTypes.TaskRunner, pendingRunners bool) error {
var (
todoTasks []wfTypes.TaskRunner
pendingTasks []wfTypes.TaskRunner
@@ -545,9 +537,15 @@ func (e *engine) runAsDAG(taskRunners []wfTypes.TaskRunner) error {
}
if !finish {
done = false
if tRunner.Pending(wfCtx, e.stepStatus) {
if pending, status := tRunner.Pending(wfCtx, e.stepStatus); pending {
if pendingRunners {
wfCtx.IncreaseCountValueInMemory(wfTypes.ContextPrefixBackoffTimes, status.ID)
e.updateStepStatus(status)
}
pendingTasks = append(pendingTasks, tRunner)
continue
} else if status.Phase == common.WorkflowStepPhasePending {
wfCtx.DeleteValueInMemory(wfTypes.ContextPrefixBackoffTimes, stepID)
}
todoTasks = append(todoTasks, tRunner)
} else {
@@ -569,7 +567,7 @@ func (e *engine) runAsDAG(taskRunners []wfTypes.TaskRunner) error {
}
if len(pendingTasks) > 0 {
return e.runAsDAG(pendingTasks)
return e.runAsDAG(pendingTasks, true)
}
}
return nil
@@ -579,7 +577,7 @@ func (e *engine) runAsDAG(taskRunners []wfTypes.TaskRunner) error {
func (e *engine) Run(taskRunners []wfTypes.TaskRunner, dag bool) error {
var err error
if dag {
err = e.runAsDAG(taskRunners)
err = e.runAsDAG(taskRunners, false)
} else {
err = e.steps(taskRunners, dag)
}
@@ -611,6 +609,14 @@ func (e *engine) steps(taskRunners []wfTypes.TaskRunner, dag bool) error {
continue
}
}
if pending, status := runner.Pending(wfCtx, e.stepStatus); pending {
wfCtx.IncreaseCountValueInMemory(wfTypes.ContextPrefixBackoffTimes, status.ID)
e.updateStepStatus(status)
if dag {
continue
}
return nil
}
options := e.generateRunOptions(e.findDependPhase(taskRunners, index, dag))
status, operation, err := runner.Run(wfCtx, options)

View File

@@ -2030,19 +2030,27 @@ var _ = Describe("Test Workflow", func() {
AppRevision: app.Status.Workflow.AppRevision,
Mode: common.WorkflowModeDAG,
Message: string(common.WorkflowStateExecuting),
Steps: []common.WorkflowStepStatus{{
StepStatus: common.StepStatus{
Name: "s1",
Type: "success",
Phase: common.WorkflowStepPhaseSucceeded,
Steps: []common.WorkflowStepStatus{
{
StepStatus: common.StepStatus{
Name: "s1",
Type: "success",
Phase: common.WorkflowStepPhaseSucceeded,
},
}, {
StepStatus: common.StepStatus{
Name: "s3",
Type: "success",
Phase: common.WorkflowStepPhaseSucceeded,
},
}, {
StepStatus: common.StepStatus{
Name: "s2",
Type: "pending",
Phase: common.WorkflowStepPhasePending,
},
},
}, {
StepStatus: common.StepStatus{
Name: "s3",
Type: "success",
Phase: common.WorkflowStepPhaseSucceeded,
},
}},
},
})).Should(BeEquivalentTo(""))
state, err = wf.ExecuteSteps(ctx, revision, runners)
@@ -2059,25 +2067,27 @@ var _ = Describe("Test Workflow", func() {
AppRevision: app.Status.Workflow.AppRevision,
Mode: common.WorkflowModeDAG,
Message: string(common.WorkflowStateSucceeded),
Steps: []common.WorkflowStepStatus{{
StepStatus: common.StepStatus{
Name: "s1",
Type: "success",
Phase: common.WorkflowStepPhaseSucceeded,
Steps: []common.WorkflowStepStatus{
{
StepStatus: common.StepStatus{
Name: "s1",
Type: "success",
Phase: common.WorkflowStepPhaseSucceeded,
},
}, {
StepStatus: common.StepStatus{
Name: "s3",
Type: "success",
Phase: common.WorkflowStepPhaseSucceeded,
},
}, {
StepStatus: common.StepStatus{
Name: "s2",
Type: "pending",
Phase: common.WorkflowStepPhaseSucceeded,
},
},
}, {
StepStatus: common.StepStatus{
Name: "s3",
Type: "success",
Phase: common.WorkflowStepPhaseSucceeded,
},
}, {
StepStatus: common.StepStatus{
Name: "s2",
Type: "pending",
Phase: common.WorkflowStepPhaseSucceeded,
},
}},
},
})).Should(BeEquivalentTo(""))
})
@@ -2246,14 +2256,18 @@ func makeRunner(step oamcore.WorkflowStep, subTaskRunners []wfTypes.TaskRunner)
return &testTaskRunner{
step: step,
run: run,
checkPending: func(ctx wfContext.Context, stepStatus map[string]common.StepStatus) bool {
checkPending: func(ctx wfContext.Context, stepStatus map[string]common.StepStatus) (bool, common.StepStatus) {
if step.Type != "pending" {
return false
return false, common.StepStatus{}
}
if pending == true {
return true
return true, common.StepStatus{
Phase: common.WorkflowStepPhasePending,
Name: step.Name,
Type: step.Type,
}
}
return false
return false, common.StepStatus{}
},
}
}
@@ -2277,7 +2291,7 @@ metadata:
type testTaskRunner struct {
step oamcore.WorkflowStep
run func(ctx wfContext.Context, options *wfTypes.TaskRunOptions) (common.StepStatus, *wfTypes.Operation, error)
checkPending func(ctx wfContext.Context, stepStatus map[string]common.StepStatus) bool
checkPending func(ctx wfContext.Context, stepStatus map[string]common.StepStatus) (bool, common.StepStatus)
}
// Name return step name.
@@ -2317,7 +2331,7 @@ func (tr *testTaskRunner) Run(ctx wfContext.Context, options *wfTypes.TaskRunOpt
}
// Pending check task should be executed or not.
func (tr *testTaskRunner) Pending(ctx wfContext.Context, stepStatus map[string]common.StepStatus) bool {
func (tr *testTaskRunner) Pending(ctx wfContext.Context, stepStatus map[string]common.StepStatus) (bool, common.StepStatus) {
return tr.checkPending(ctx, stepStatus)
}