diff --git a/apis/core.oam.dev/common/types.go b/apis/core.oam.dev/common/types.go index 3717ae8a2..40275b150 100644 --- a/apis/core.oam.dev/common/types.go +++ b/apis/core.oam.dev/common/types.go @@ -420,6 +420,8 @@ const ( WorkflowStepPhaseStopped WorkflowStepPhase = "stopped" // WorkflowStepPhaseRunning will make the controller continue the workflow. WorkflowStepPhaseRunning WorkflowStepPhase = "running" + // WorkflowStepPhasePending will make the controller wait for the step to run. + WorkflowStepPhasePending WorkflowStepPhase = "pending" ) // DefinitionType describes the type of DefinitionRevision. diff --git a/pkg/controller/core.oam.dev/v1alpha2/application/application_controller_test.go b/pkg/controller/core.oam.dev/v1alpha2/application/application_controller_test.go index d2cf07e14..c10ba0679 100644 --- a/pkg/controller/core.oam.dev/v1alpha2/application/application_controller_test.go +++ b/pkg/controller/core.oam.dev/v1alpha2/application/application_controller_test.go @@ -2402,6 +2402,271 @@ var _ = Describe("Test Application Controller", func() { Expect(checkApp.Status.Phase).Should(BeEquivalentTo(common.ApplicationWorkflowTerminated)) }) + It("application with skip outputs in workflow", func() { + ns := corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "app-with-skip-output", + }, + } + Expect(k8sClient.Create(ctx, &ns)).Should(BeNil()) + healthComponentDef := &v1beta1.ComponentDefinition{} + hCDefJson, _ := yaml.YAMLToJSON([]byte(cdDefWithHealthStatusYaml)) + Expect(json.Unmarshal(hCDefJson, healthComponentDef)).Should(BeNil()) + healthComponentDef.Name = "worker-with-health" + healthComponentDef.Namespace = "app-with-skip-output" + Expect(k8sClient.Create(ctx, healthComponentDef)).Should(BeNil()) + app := &v1beta1.Application{ + TypeMeta: metav1.TypeMeta{ + Kind: "Application", + APIVersion: "core.oam.dev/v1beta1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "app-with-skip-output", + Namespace: "app-with-skip-output", + }, + Spec: v1beta1.ApplicationSpec{ + Components: []common.ApplicationComponent{ + { + Name: "myweb1", + Type: "worker-with-health", + Properties: &runtime.RawExtension{Raw: []byte(`{"cmd":["sleep","1000"],"image":"busybox","lives": "i am lives","enemies": "empty"}`)}, + }, + { + Name: "myweb2", + Type: "worker", + Properties: &runtime.RawExtension{Raw: []byte(`{"cmd":["sleep","1000"],"image":"busybox"}`)}, + }, + }, + Workflow: &v1beta1.Workflow{ + Steps: []v1beta1.WorkflowStep{ + { + Name: "myweb1", + Type: "apply-component", + If: "false", + Outputs: common.StepOutputs{ + { + Name: "output", + ValueFrom: "context.name", + }, + }, + Properties: &runtime.RawExtension{Raw: []byte(`{"component":"myweb1"}`)}, + }, + { + Name: "myweb2", + Inputs: common.StepInputs{ + { + From: "output", + ParameterKey: "", + }, + }, + If: `inputs.output == "app-with-timeout-output"`, + Type: "apply-component", + Properties: &runtime.RawExtension{Raw: []byte(`{"component":"myweb2"}`)}, + }, + }, + }, + }, + } + + Expect(k8sClient.Create(context.Background(), app)).Should(BeNil()) + appKey := types.NamespacedName{Namespace: ns.Name, Name: app.Name} + testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey}) + testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey}) + testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey}) + + expDeployment := &v1.Deployment{} + web1Key := types.NamespacedName{Namespace: ns.Name, Name: "myweb1"} + Expect(k8sClient.Get(ctx, web1Key, expDeployment)).Should(util.NotFoundMatcher{}) + web2Key := types.NamespacedName{Namespace: ns.Name, Name: "myweb2"} + Expect(k8sClient.Get(ctx, web2Key, expDeployment)).Should(util.NotFoundMatcher{}) + + testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey}) + + Expect(k8sClient.Get(ctx, web2Key, expDeployment)).Should(util.NotFoundMatcher{}) + + checkApp := &v1beta1.Application{} + Expect(k8sClient.Get(ctx, appKey, checkApp)).Should(BeNil()) + Expect(checkApp.Status.Workflow.Steps[0].Phase).Should(BeEquivalentTo(common.WorkflowStepPhaseSkipped)) + Expect(checkApp.Status.Workflow.Steps[1].Phase).Should(BeEquivalentTo(common.WorkflowStepPhaseSkipped)) + Expect(checkApp.Status.Phase).Should(BeEquivalentTo(common.ApplicationRunning)) + }) + + It("application with invalid inputs in workflow", func() { + ns := corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "app-with-invalid-input", + }, + } + Expect(k8sClient.Create(ctx, &ns)).Should(BeNil()) + healthComponentDef := &v1beta1.ComponentDefinition{} + hCDefJson, _ := yaml.YAMLToJSON([]byte(cdDefWithHealthStatusYaml)) + Expect(json.Unmarshal(hCDefJson, healthComponentDef)).Should(BeNil()) + healthComponentDef.Name = "worker-with-health" + healthComponentDef.Namespace = "app-with-invalid-input" + Expect(k8sClient.Create(ctx, healthComponentDef)).Should(BeNil()) + app := &v1beta1.Application{ + TypeMeta: metav1.TypeMeta{ + Kind: "Application", + APIVersion: "core.oam.dev/v1beta1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "app-with-invalid-input", + Namespace: "app-with-invalid-input", + }, + Spec: v1beta1.ApplicationSpec{ + Components: []common.ApplicationComponent{ + { + Name: "myweb1", + Type: "worker-with-health", + Properties: &runtime.RawExtension{Raw: []byte(`{"cmd":["sleep","1000"],"image":"busybox","lives": "i am lives","enemies": "empty"}`)}, + }, + { + Name: "myweb2", + Type: "worker", + Properties: &runtime.RawExtension{Raw: []byte(`{"cmd":["sleep","1000"],"image":"busybox"}`)}, + }, + }, + Workflow: &v1beta1.Workflow{ + Steps: []v1beta1.WorkflowStep{ + { + Name: "myweb1", + Type: "apply-component", + Outputs: common.StepOutputs{ + { + Name: "output", + ValueFrom: "context.name", + }, + }, + Properties: &runtime.RawExtension{Raw: []byte(`{"component":"myweb1"}`)}, + }, + { + Name: "myweb2", + Inputs: common.StepInputs{ + { + From: "invalid", + ParameterKey: "", + }, + }, + Type: "apply-component", + Properties: &runtime.RawExtension{Raw: []byte(`{"component":"myweb2"}`)}, + }, + }, + }, + }, + } + + Expect(k8sClient.Create(context.Background(), app)).Should(BeNil()) + appKey := types.NamespacedName{Namespace: ns.Name, Name: app.Name} + testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey}) + testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey}) + testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey}) + + expDeployment := &v1.Deployment{} + web1Key := types.NamespacedName{Namespace: ns.Name, Name: "myweb1"} + Expect(k8sClient.Get(ctx, web1Key, expDeployment)).Should(BeNil()) + expDeployment.Status.Replicas = 1 + expDeployment.Status.ReadyReplicas = 1 + Expect(k8sClient.Status().Update(ctx, expDeployment)).Should(BeNil()) + web2Key := types.NamespacedName{Namespace: ns.Name, Name: "myweb2"} + Expect(k8sClient.Get(ctx, web2Key, expDeployment)).Should(util.NotFoundMatcher{}) + + testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey}) + + Expect(k8sClient.Get(ctx, web2Key, expDeployment)).Should(util.NotFoundMatcher{}) + + checkApp := &v1beta1.Application{} + Expect(k8sClient.Get(ctx, appKey, checkApp)).Should(BeNil()) + Expect(checkApp.Status.Workflow.Steps[0].Phase).Should(BeEquivalentTo(common.WorkflowStepPhaseSucceeded)) + Expect(checkApp.Status.Workflow.Steps[1].Phase).Should(BeEquivalentTo(common.WorkflowStepPhasePending)) + Expect(checkApp.Status.Phase).Should(BeEquivalentTo(common.ApplicationRunningWorkflow)) + }) + + It("application with invalid inputs in workflow in dag mode", func() { + ns := corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "app-with-invalid-input-dag", + }, + } + Expect(k8sClient.Create(ctx, &ns)).Should(BeNil()) + healthComponentDef := &v1beta1.ComponentDefinition{} + hCDefJson, _ := yaml.YAMLToJSON([]byte(cdDefWithHealthStatusYaml)) + Expect(json.Unmarshal(hCDefJson, healthComponentDef)).Should(BeNil()) + healthComponentDef.Name = "worker-with-health" + healthComponentDef.Namespace = "app-with-invalid-input-dag" + Expect(k8sClient.Create(ctx, healthComponentDef)).Should(BeNil()) + app := &v1beta1.Application{ + TypeMeta: metav1.TypeMeta{ + Kind: "Application", + APIVersion: "core.oam.dev/v1beta1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "app-with-invalid-input-dag", + Namespace: "app-with-invalid-input-dag", + }, + Spec: v1beta1.ApplicationSpec{ + Components: []common.ApplicationComponent{ + { + Name: "myweb1", + Type: "worker-with-health", + Properties: &runtime.RawExtension{Raw: []byte(`{"cmd":["sleep","1000"],"image":"busybox","lives": "i am lives","enemies": "empty"}`)}, + }, + { + Name: "myweb2", + Type: "worker", + Properties: &runtime.RawExtension{Raw: []byte(`{"cmd":["sleep","1000"],"image":"busybox"}`)}, + }, + }, + Workflow: &v1beta1.Workflow{ + Mode: &v1beta1.WorkflowExecuteMode{ + Steps: common.WorkflowModeDAG, + }, + Steps: []v1beta1.WorkflowStep{ + { + Name: "myweb1", + Type: "apply-component", + Outputs: common.StepOutputs{ + { + Name: "output", + ValueFrom: "context.name", + }, + }, + Properties: &runtime.RawExtension{Raw: []byte(`{"component":"myweb1"}`)}, + }, + { + Name: "myweb2", + Inputs: common.StepInputs{ + { + From: "invalid", + ParameterKey: "", + }, + }, + Type: "apply-component", + Properties: &runtime.RawExtension{Raw: []byte(`{"component":"myweb2"}`)}, + }, + }, + }, + }, + } + + Expect(k8sClient.Create(context.Background(), app)).Should(BeNil()) + appKey := types.NamespacedName{Namespace: ns.Name, Name: app.Name} + testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey}) + testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey}) + testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey}) + + expDeployment := &v1.Deployment{} + web1Key := types.NamespacedName{Namespace: ns.Name, Name: "myweb1"} + Expect(k8sClient.Get(ctx, web1Key, expDeployment)).Should(BeNil()) + web2Key := types.NamespacedName{Namespace: ns.Name, Name: "myweb2"} + Expect(k8sClient.Get(ctx, web2Key, expDeployment)).Should(util.NotFoundMatcher{}) + + checkApp := &v1beta1.Application{} + Expect(k8sClient.Get(ctx, appKey, checkApp)).Should(BeNil()) + Expect(checkApp.Status.Workflow.Steps[0].Phase).Should(BeEquivalentTo(common.WorkflowStepPhaseRunning)) + Expect(checkApp.Status.Workflow.Steps[1].Phase).Should(BeEquivalentTo(common.WorkflowStepPhasePending)) + Expect(checkApp.Status.Phase).Should(BeEquivalentTo(common.ApplicationRunningWorkflow)) + }) + It("application with if always in workflow", func() { ns := corev1.Namespace{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/monitor/watcher/application.go b/pkg/monitor/watcher/application.go index 4454cc3bc..5b6aae376 100644 --- a/pkg/monitor/watcher/application.go +++ b/pkg/monitor/watcher/application.go @@ -77,7 +77,7 @@ func (watcher *applicationMetricsWatcher) report() { metrics.ApplicationPhaseCounter.WithLabelValues(phase).Set(float64(watcher.phaseCounter[phase])) } for stepPhase := range watcher.stepPhaseDirty { - metrics.WorkflowStepPhaseGauge.WithLabelValues(strings.Split(stepPhase, "/")...).Set(float64(watcher.stepPhaseCounter[stepPhase])) + metrics.WorkflowStepPhaseGauge.WithLabelValues(strings.Split(stepPhase, "/")[:2]...).Set(float64(watcher.stepPhaseCounter[stepPhase])) } watcher.phaseDirty = map[string]struct{}{} watcher.stepPhaseDirty = map[string]struct{}{} diff --git a/pkg/workflow/hooks/data_passing.go b/pkg/workflow/hooks/data_passing.go index b759d6391..f66c8ae6a 100644 --- a/pkg/workflow/hooks/data_passing.go +++ b/pkg/workflow/hooks/data_passing.go @@ -17,9 +17,12 @@ limitations under the License. package hooks import ( + "encoding/json" + "fmt" "strings" "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/runtime" "github.com/oam-dev/kubevela/apis/core.oam.dev/common" "github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1" @@ -43,24 +46,59 @@ func Input(ctx wfContext.Context, paramValue *value.Value, step v1beta1.Workflow } // Output get data from task value. -func Output(ctx wfContext.Context, taskValue *value.Value, step v1beta1.WorkflowStep, status common.StepStatus) error { +func Output(ctx wfContext.Context, taskValue *value.Value, step v1beta1.WorkflowStep, status common.StepStatus, stepStatus map[string]common.StepStatus) error { + errMsg := "" if wfTypes.IsStepFinish(status.Phase, status.Reason) { + SetAdditionalNameInStatus(stepStatus, step.Name, step.Properties, status) for _, output := range step.Outputs { v, err := taskValue.LookupByScript(output.ValueFrom) - if err != nil && !strings.Contains(err.Error(), "not found") { - return err + // if the error is not nil and the step is not skipped, return the error + if err != nil && status.Phase != common.WorkflowStepPhaseSkipped { + errMsg += fmt.Sprintf("failed to get output from %s: %s\n", output.ValueFrom, err.Error()) } + // if the error is not nil, set the value to null if err != nil || v.Error() != nil { - v, err = taskValue.MakeValue("null") - if err != nil { - return err - } + v, _ = taskValue.MakeValue("null") } if err := ctx.SetVar(v, output.Name); err != nil { - return err + errMsg += fmt.Sprintf("failed to set output %s: %s\n", output.Name, err.Error()) } } } + if errMsg != "" { + return errors.New(errMsg) + } return nil } + +// SetAdditionalNameInStatus sets additional name from properties to status map +func SetAdditionalNameInStatus(stepStatus map[string]common.StepStatus, name string, properties *runtime.RawExtension, status common.StepStatus) { + if stepStatus == nil || properties == nil { + return + } + o := struct { + Name string `json:"name"` + Component string `json:"component"` + }{} + js, err := properties.MarshalJSON() + if err != nil { + return + } + if err := json.Unmarshal(js, &o); err != nil { + return + } + additionalName := "" + switch { + case o.Name != "": + additionalName = o.Name + case o.Component != "": + additionalName = o.Component + default: + return + } + if _, ok := stepStatus[additionalName]; !ok { + stepStatus[additionalName] = status + return + } +} diff --git a/pkg/workflow/hooks/data_passing_test.go b/pkg/workflow/hooks/data_passing_test.go index 8a0f9fd54..e5623236a 100644 --- a/pkg/workflow/hooks/data_passing_test.go +++ b/pkg/workflow/hooks/data_passing_test.go @@ -66,6 +66,7 @@ func TestOutput(t *testing.T) { output: score: 99 `, nil, "") r.NoError(err) + stepStatus := make(map[string]common.StepStatus) err = Output(wfCtx, taskValue, v1beta1.WorkflowStep{ Properties: &runtime.RawExtension{ Raw: []byte("{\"name\":\"mystep\"}"), @@ -76,7 +77,7 @@ output: score: 99 }}, }, common.StepStatus{ Phase: common.WorkflowStepPhaseSucceeded, - }) + }, stepStatus) r.NoError(err) result, err := wfCtx.GetVar("myscore") r.NoError(err) @@ -84,6 +85,7 @@ output: score: 99 r.NoError(err) r.Equal(s, `99 `) + r.Equal(stepStatus["mystep"].Phase, common.WorkflowStepPhaseSucceeded) } func mockContext(t *testing.T) wfContext.Context { diff --git a/pkg/workflow/tasks/custom/task.go b/pkg/workflow/tasks/custom/task.go index f26405a46..6583a33b7 100644 --- a/pkg/workflow/tasks/custom/task.go +++ b/pkg/workflow/tasks/custom/task.go @@ -63,7 +63,7 @@ func (t *TaskLoader) GetTaskGenerator(ctx context.Context, name string) (wfTypes type taskRunner struct { name string run func(ctx wfContext.Context, options *wfTypes.TaskRunOptions) (common.StepStatus, *wfTypes.Operation, error) - checkPending func(ctx wfContext.Context, stepStatus map[string]common.StepStatus) bool + checkPending func(ctx wfContext.Context, stepStatus map[string]common.StepStatus) (bool, common.StepStatus) } // Name return step name. @@ -77,7 +77,7 @@ func (tr *taskRunner) Run(ctx wfContext.Context, options *wfTypes.TaskRunOptions } // Pending check task should be executed or not. -func (tr *taskRunner) Pending(ctx wfContext.Context, stepStatus map[string]common.StepStatus) bool { +func (tr *taskRunner) Pending(ctx wfContext.Context, stepStatus map[string]common.StepStatus) (bool, common.StepStatus) { return tr.checkPending(ctx, stepStatus) } @@ -120,10 +120,10 @@ func (t *TaskLoader) makeTaskGenerator(templ string) (wfTypes.TaskGenerator, err tRunner := new(taskRunner) tRunner.name = wfStep.Name - tRunner.checkPending = func(ctx wfContext.Context, stepStatus map[string]common.StepStatus) bool { - return CheckPending(ctx, wfStep, stepStatus) + tRunner.checkPending = func(ctx wfContext.Context, stepStatus map[string]common.StepStatus) (bool, common.StepStatus) { + return CheckPending(ctx, wfStep, exec.wfStatus.ID, stepStatus) } - tRunner.run = func(ctx wfContext.Context, options *wfTypes.TaskRunOptions) (common.StepStatus, *wfTypes.Operation, error) { + tRunner.run = func(ctx wfContext.Context, options *wfTypes.TaskRunOptions) (stepStatus common.StepStatus, operations *wfTypes.Operation, rErr error) { if options.GetTracer == nil { options.GetTracer = func(id string, step v1beta1.WorkflowStep) monitorContext.Context { return monitorContext.NewTraceContext(context.Background(), "") @@ -139,10 +139,28 @@ func (t *TaskLoader) makeTaskGenerator(templ string) (wfTypes.TaskGenerator, err t.runOptionsProcess(options) } + exec.wfStatus.Message = "" var taskv *value.Value var err error var paramFile string + defer func() { + if taskv == nil { + taskv, err = convertTemplate(ctx, t.pd, strings.Join([]string{templ, paramFile}, "\n"), exec.wfStatus.ID, options.PCtx) + if err != nil { + return + } + } + for _, hook := range options.PostStopHooks { + if err := hook(ctx, taskv, wfStep, exec.status(), options.StepStatus); err != nil { + exec.wfStatus.Message = err.Error() + stepStatus = exec.status() + operations = exec.operation() + return + } + } + }() + for _, hook := range options.PreCheckHooks { result, err := hook(wfStep, &wfTypes.PreCheckOptions{ PackageDiscover: t.pd, @@ -212,12 +230,6 @@ func (t *TaskLoader) makeTaskGenerator(templ string) (wfTypes.TaskGenerator, err exec.err(ctx, true, err, wfTypes.StatusReasonExecute) return exec.status(), exec.operation(), nil } - for _, hook := range options.PostStopHooks { - if err := hook(ctx, taskv, wfStep, exec.status()); err != nil { - exec.err(ctx, false, err, wfTypes.StatusReasonOutput) - return exec.status(), exec.operation(), nil - } - } return exec.status(), exec.operation(), nil } @@ -552,20 +564,28 @@ func NewTaskLoader(lt LoadTaskTemplate, pkgDiscover *packages.PackageDiscover, h } // CheckPending checks whether to pending task run -func CheckPending(ctx wfContext.Context, step v1beta1.WorkflowStep, stepStatus map[string]common.StepStatus) bool { +func CheckPending(ctx wfContext.Context, step v1beta1.WorkflowStep, id string, stepStatus map[string]common.StepStatus) (bool, common.StepStatus) { + pStatus := common.StepStatus{ + Phase: common.WorkflowStepPhasePending, + Type: step.Type, + ID: id, + Name: step.Name, + } for _, depend := range step.DependsOn { + pStatus.Message = fmt.Sprintf("Pending on DependsOn: %s", depend) if status, ok := stepStatus[depend]; ok { if !wfTypes.IsStepFinish(status.Phase, status.Reason) { - return true + return true, pStatus } } else { - return true + return true, pStatus } } for _, input := range step.Inputs { + pStatus.Message = fmt.Sprintf("Pending on Input: %s", input.From) if _, err := ctx.GetVar(strings.Split(input.From, ".")...); err != nil { - return true + return true, pStatus } } - return false + return false, common.StepStatus{} } diff --git a/pkg/workflow/tasks/custom/task_test.go b/pkg/workflow/tasks/custom/task_test.go index a56ba7750..36f97d7a9 100644 --- a/pkg/workflow/tasks/custom/task_test.go +++ b/pkg/workflow/tasks/custom/task_test.go @@ -246,9 +246,9 @@ close({ case "input": r.Equal(err.Error(), "do preStartHook: get input from [podIP]: var(path=podIP) not exist") case "output-var-conflict": - r.Equal(status.Reason, types.StatusReasonOutput) + r.Contains(status.Message, "conflict") r.Equal(operation.Waiting, false) - r.Equal(status.Phase, common.WorkflowStepPhaseFailed) + r.Equal(status.Phase, common.WorkflowStepPhaseSucceeded) case "failed-after-retries": wfContext.CleanupMemoryStore("app-v1", "default") newCtx := newWorkflowContextForTest(t) @@ -433,14 +433,16 @@ func TestPendingInputCheck(t *testing.T) { r.NoError(err) run, err := gen(step, &types.GeneratorOptions{}) r.NoError(err) - r.Equal(run.Pending(wfCtx, nil), true) + p, _ := run.Pending(wfCtx, nil) + r.Equal(p, true) score, err := value.NewValue(` 100 `, nil, "") r.NoError(err) err = wfCtx.SetVar(score, "score") r.NoError(err) - r.Equal(run.Pending(wfCtx, nil), false) + p, _ = run.Pending(wfCtx, nil) + r.Equal(p, false) } func TestPendingDependsOnCheck(t *testing.T) { @@ -468,13 +470,15 @@ func TestPendingDependsOnCheck(t *testing.T) { r.NoError(err) run, err := gen(step, &types.GeneratorOptions{}) r.NoError(err) - r.Equal(run.Pending(wfCtx, nil), true) + p, _ := run.Pending(wfCtx, nil) + r.Equal(p, true) ss := map[string]common.StepStatus{ "depend": { Phase: common.WorkflowStepPhaseSucceeded, }, } - r.Equal(run.Pending(wfCtx, ss), false) + p, _ = run.Pending(wfCtx, ss) + r.Equal(p, false) } func TestSkip(t *testing.T) { @@ -500,7 +504,8 @@ func TestSkip(t *testing.T) { r.NoError(err) runner, err := gen(step, &types.GeneratorOptions{}) r.NoError(err) - status, operations, err := runner.Run(nil, &types.TaskRunOptions{ + wfCtx := newWorkflowContextForTest(t) + status, operations, err := runner.Run(wfCtx, &types.TaskRunOptions{ PreCheckHooks: []types.TaskPreCheckHook{ func(step v1beta1.WorkflowStep, options *types.PreCheckOptions) (*types.PreCheckResult, error) { return &types.PreCheckResult{Skip: true}, nil diff --git a/pkg/workflow/tasks/discover.go b/pkg/workflow/tasks/discover.go index 3b6793efb..c6b779512 100644 --- a/pkg/workflow/tasks/discover.go +++ b/pkg/workflow/tasks/discover.go @@ -134,10 +134,11 @@ func (tr *suspendTaskRunner) Name() string { // Run make workflow suspend. func (tr *suspendTaskRunner) Run(ctx wfContext.Context, options *types.TaskRunOptions) (stepStatus common.StepStatus, operations *types.Operation, rErr error) { stepStatus = common.StepStatus{ - ID: tr.id, - Name: tr.step.Name, - Type: types.WorkflowStepTypeSuspend, - Phase: common.WorkflowStepPhaseRunning, + ID: tr.id, + Name: tr.step.Name, + Type: types.WorkflowStepTypeSuspend, + Phase: common.WorkflowStepPhaseRunning, + Message: "", } operations = &types.Operation{Suspend: true} @@ -173,7 +174,6 @@ func (tr *suspendTaskRunner) Run(ctx wfContext.Context, options *types.TaskRunOp } return stepStatus, operations, nil } - for _, input := range tr.step.Inputs { if input.ParameterKey == "duration" { inputValue, err := ctx.GetVar(strings.Split(input.From, ".")...) @@ -207,8 +207,8 @@ func (tr *suspendTaskRunner) Run(ctx wfContext.Context, options *types.TaskRunOp } // Pending check task should be executed or not. -func (tr *suspendTaskRunner) Pending(ctx wfContext.Context, stepStatus map[string]common.StepStatus) bool { - return custom.CheckPending(ctx, tr.step, stepStatus) +func (tr *suspendTaskRunner) Pending(ctx wfContext.Context, stepStatus map[string]common.StepStatus) (bool, common.StepStatus) { + return custom.CheckPending(ctx, tr.step, tr.id, stepStatus) } type stepGroupTaskRunner struct { @@ -227,16 +227,17 @@ func (tr *stepGroupTaskRunner) Name() string { } // Pending check task should be executed or not. -func (tr *stepGroupTaskRunner) Pending(ctx wfContext.Context, stepStatus map[string]common.StepStatus) bool { - return custom.CheckPending(ctx, tr.step, stepStatus) +func (tr *stepGroupTaskRunner) Pending(ctx wfContext.Context, stepStatus map[string]common.StepStatus) (bool, common.StepStatus) { + return custom.CheckPending(ctx, tr.step, tr.id, stepStatus) } // Run make workflow step group. func (tr *stepGroupTaskRunner) Run(ctx wfContext.Context, options *types.TaskRunOptions) (status common.StepStatus, operations *types.Operation, rErr error) { status = common.StepStatus{ - ID: tr.id, - Name: tr.name, - Type: types.WorkflowStepTypeStepGroup, + ID: tr.id, + Name: tr.name, + Type: types.WorkflowStepTypeStepGroup, + Message: "", } pStatus := &status @@ -307,6 +308,8 @@ func getStepGroupStatus(status common.StepStatus, stepStatus common.WorkflowStep status.Phase = common.WorkflowStepPhaseRunning case subStepCounts[string(common.WorkflowStepPhaseStopped)] > 0: status.Phase = common.WorkflowStepPhaseStopped + case subStepCounts[string(common.WorkflowStepPhasePending)] > 0: + status.Phase = common.WorkflowStepPhasePending case subStepCounts[string(common.WorkflowStepPhaseFailed)] > 0: status.Phase = common.WorkflowStepPhaseFailed switch { @@ -371,7 +374,7 @@ func GetSuspendStepDurationWaiting(step v1beta1.WorkflowStep) (time.Duration, er func handleOutput(ctx wfContext.Context, stepStatus *common.StepStatus, operations *types.Operation, step v1beta1.WorkflowStep, postStopHooks []types.TaskPostStopHook, pd *packages.PackageDiscover, id string, pCtx process.Context) { status := *stepStatus - if status.Phase != common.WorkflowStepPhaseSkipped && len(step.Outputs) > 0 { + if len(step.Outputs) > 0 { contextValue, err := custom.MakeValueForContext(ctx, pd, id, pCtx) if err != nil { status.Phase = common.WorkflowStepPhaseFailed @@ -384,7 +387,7 @@ func handleOutput(ctx wfContext.Context, stepStatus *common.StepStatus, operatio } for _, hook := range postStopHooks { - if err := hook(ctx, contextValue, step, status); err != nil { + if err := hook(ctx, contextValue, step, status, nil); err != nil { status.Phase = common.WorkflowStepPhaseFailed if status.Reason == "" { status.Reason = types.StatusReasonOutput diff --git a/pkg/workflow/tasks/discover_test.go b/pkg/workflow/tasks/discover_test.go index febcabaea..3a2da3a2c 100644 --- a/pkg/workflow/tasks/discover_test.go +++ b/pkg/workflow/tasks/discover_test.go @@ -91,13 +91,15 @@ func TestSuspendStep(t *testing.T) { r.Equal(runner.Name(), "test") // test pending - r.Equal(runner.Pending(nil, nil), true) + p, _ := runner.Pending(nil, nil) + r.Equal(p, true) ss := map[string]common.StepStatus{ "depend": { Phase: common.WorkflowStepPhaseSucceeded, }, } - r.Equal(runner.Pending(nil, ss), false) + p, _ = runner.Pending(nil, ss) + r.Equal(p, false) // test skip status, operations, err := runner.Run(nil, &types.TaskRunOptions{ @@ -181,13 +183,15 @@ func TestStepGroupStep(t *testing.T) { r.Equal(runner.Name(), "test") // test pending - r.Equal(runner.Pending(nil, nil), true) + p, _ := runner.Pending(nil, nil) + r.Equal(p, true) ss := map[string]common.StepStatus{ "depend": { Phase: common.WorkflowStepPhaseSucceeded, }, } - r.Equal(runner.Pending(nil, ss), false) + p, _ = runner.Pending(nil, ss) + r.Equal(p, false) // test skip status, operations, err := runner.Run(nil, &types.TaskRunOptions{ diff --git a/pkg/workflow/types/types.go b/pkg/workflow/types/types.go index 5cb616045..81433a2cf 100644 --- a/pkg/workflow/types/types.go +++ b/pkg/workflow/types/types.go @@ -34,7 +34,7 @@ import ( // TaskRunner is a task runner. type TaskRunner interface { Name() string - Pending(ctx wfContext.Context, stepStatus map[string]common.StepStatus) bool + Pending(ctx wfContext.Context, stepStatus map[string]common.StepStatus) (bool, common.StepStatus) Run(ctx wfContext.Context, options *TaskRunOptions) (common.StepStatus, *Operation, error) } @@ -85,7 +85,7 @@ type TaskPreCheckHook func(step v1beta1.WorkflowStep, options *PreCheckOptions) type TaskPreStartHook func(ctx wfContext.Context, paramValue *value.Value, step v1beta1.WorkflowStep) error // TaskPostStopHook run after task execution. -type TaskPostStopHook func(ctx wfContext.Context, taskValue *value.Value, step v1beta1.WorkflowStep, status common.StepStatus) error +type TaskPostStopHook func(ctx wfContext.Context, taskValue *value.Value, step v1beta1.WorkflowStep, status common.StepStatus, stepStatus map[string]common.StepStatus) error // Operation is workflow operation object. type Operation struct { diff --git a/pkg/workflow/workflow.go b/pkg/workflow/workflow.go index 460d6fc00..1ba336bc3 100644 --- a/pkg/workflow/workflow.go +++ b/pkg/workflow/workflow.go @@ -237,17 +237,14 @@ func (w *workflow) restartWorkflow(ctx monitorContext.Context, revAndSpecHash st func newEngine(ctx monitorContext.Context, wfCtx wfContext.Context, w *workflow, wfStatus *common.WorkflowStatus) *engine { stepStatus := make(map[string]common.StepStatus) - for _, ss := range wfStatus.Steps { - setStepStatus(stepStatus, ss.StepStatus) - for _, sss := range ss.SubStepsStatus { - setStepStatus(stepStatus, sss.StepStatus) - } - } + setStepStatus(stepStatus, wfStatus.Steps) stepDependsOn := make(map[string][]string) if w.app.Spec.Workflow != nil { for _, step := range w.app.Spec.Workflow.Steps { + hooks.SetAdditionalNameInStatus(stepStatus, step.Name, step.Properties, stepStatus[step.Name]) stepDependsOn[step.Name] = append(stepDependsOn[step.Name], step.DependsOn...) for _, sub := range step.SubSteps { + hooks.SetAdditionalNameInStatus(stepStatus, sub.Name, sub.Properties, stepStatus[step.Name]) stepDependsOn[sub.Name] = append(stepDependsOn[sub.Name], sub.DependsOn...) } } @@ -270,12 +267,12 @@ func newEngine(ctx monitorContext.Context, wfCtx wfContext.Context, w *workflow, } } -func setStepStatus(statusMap map[string]common.StepStatus, status common.StepStatus) { - statusMap[status.Name] = common.StepStatus{ - Phase: status.Phase, - ID: status.ID, - Reason: status.Reason, - FirstExecuteTime: status.FirstExecuteTime, +func setStepStatus(statusMap map[string]common.StepStatus, status []common.WorkflowStepStatus) { + for _, ss := range status { + statusMap[ss.Name] = ss.StepStatus + for _, sss := range ss.SubStepsStatus { + statusMap[sss.Name] = sss.StepStatus + } } } @@ -296,12 +293,7 @@ func (w *workflow) GetSuspendBackoffWaitTime() time.Duration { return 0 } stepStatus := make(map[string]common.StepStatus) - for _, ss := range w.app.Status.Workflow.Steps { - setStepStatus(stepStatus, ss.StepStatus) - for _, sss := range ss.SubStepsStatus { - setStepStatus(stepStatus, sss.StepStatus) - } - } + setStepStatus(stepStatus, w.app.Status.Workflow.Steps) max := time.Duration(1<<63 - 1) min := max for _, step := range w.app.Spec.Workflow.Steps { @@ -529,7 +521,7 @@ func (e *engine) setNextExecuteTime() { e.wfCtx.SetValueInMemory(next, wfTypes.ContextKeyNextExecuteTime) } -func (e *engine) runAsDAG(taskRunners []wfTypes.TaskRunner) error { +func (e *engine) runAsDAG(taskRunners []wfTypes.TaskRunner, pendingRunners bool) error { var ( todoTasks []wfTypes.TaskRunner pendingTasks []wfTypes.TaskRunner @@ -545,9 +537,15 @@ func (e *engine) runAsDAG(taskRunners []wfTypes.TaskRunner) error { } if !finish { done = false - if tRunner.Pending(wfCtx, e.stepStatus) { + if pending, status := tRunner.Pending(wfCtx, e.stepStatus); pending { + if pendingRunners { + wfCtx.IncreaseCountValueInMemory(wfTypes.ContextPrefixBackoffTimes, status.ID) + e.updateStepStatus(status) + } pendingTasks = append(pendingTasks, tRunner) continue + } else if status.Phase == common.WorkflowStepPhasePending { + wfCtx.DeleteValueInMemory(wfTypes.ContextPrefixBackoffTimes, stepID) } todoTasks = append(todoTasks, tRunner) } else { @@ -569,7 +567,7 @@ func (e *engine) runAsDAG(taskRunners []wfTypes.TaskRunner) error { } if len(pendingTasks) > 0 { - return e.runAsDAG(pendingTasks) + return e.runAsDAG(pendingTasks, true) } } return nil @@ -579,7 +577,7 @@ func (e *engine) runAsDAG(taskRunners []wfTypes.TaskRunner) error { func (e *engine) Run(taskRunners []wfTypes.TaskRunner, dag bool) error { var err error if dag { - err = e.runAsDAG(taskRunners) + err = e.runAsDAG(taskRunners, false) } else { err = e.steps(taskRunners, dag) } @@ -611,6 +609,14 @@ func (e *engine) steps(taskRunners []wfTypes.TaskRunner, dag bool) error { continue } } + if pending, status := runner.Pending(wfCtx, e.stepStatus); pending { + wfCtx.IncreaseCountValueInMemory(wfTypes.ContextPrefixBackoffTimes, status.ID) + e.updateStepStatus(status) + if dag { + continue + } + return nil + } options := e.generateRunOptions(e.findDependPhase(taskRunners, index, dag)) status, operation, err := runner.Run(wfCtx, options) diff --git a/pkg/workflow/workflow_test.go b/pkg/workflow/workflow_test.go index de015f872..9ee2c7bd5 100644 --- a/pkg/workflow/workflow_test.go +++ b/pkg/workflow/workflow_test.go @@ -2030,19 +2030,27 @@ var _ = Describe("Test Workflow", func() { AppRevision: app.Status.Workflow.AppRevision, Mode: common.WorkflowModeDAG, Message: string(common.WorkflowStateExecuting), - Steps: []common.WorkflowStepStatus{{ - StepStatus: common.StepStatus{ - Name: "s1", - Type: "success", - Phase: common.WorkflowStepPhaseSucceeded, + Steps: []common.WorkflowStepStatus{ + { + StepStatus: common.StepStatus{ + Name: "s1", + Type: "success", + Phase: common.WorkflowStepPhaseSucceeded, + }, + }, { + StepStatus: common.StepStatus{ + Name: "s3", + Type: "success", + Phase: common.WorkflowStepPhaseSucceeded, + }, + }, { + StepStatus: common.StepStatus{ + Name: "s2", + Type: "pending", + Phase: common.WorkflowStepPhasePending, + }, }, - }, { - StepStatus: common.StepStatus{ - Name: "s3", - Type: "success", - Phase: common.WorkflowStepPhaseSucceeded, - }, - }}, + }, })).Should(BeEquivalentTo("")) state, err = wf.ExecuteSteps(ctx, revision, runners) @@ -2059,25 +2067,27 @@ var _ = Describe("Test Workflow", func() { AppRevision: app.Status.Workflow.AppRevision, Mode: common.WorkflowModeDAG, Message: string(common.WorkflowStateSucceeded), - Steps: []common.WorkflowStepStatus{{ - StepStatus: common.StepStatus{ - Name: "s1", - Type: "success", - Phase: common.WorkflowStepPhaseSucceeded, + Steps: []common.WorkflowStepStatus{ + { + StepStatus: common.StepStatus{ + Name: "s1", + Type: "success", + Phase: common.WorkflowStepPhaseSucceeded, + }, + }, { + StepStatus: common.StepStatus{ + Name: "s3", + Type: "success", + Phase: common.WorkflowStepPhaseSucceeded, + }, + }, { + StepStatus: common.StepStatus{ + Name: "s2", + Type: "pending", + Phase: common.WorkflowStepPhaseSucceeded, + }, }, - }, { - StepStatus: common.StepStatus{ - Name: "s3", - Type: "success", - Phase: common.WorkflowStepPhaseSucceeded, - }, - }, { - StepStatus: common.StepStatus{ - Name: "s2", - Type: "pending", - Phase: common.WorkflowStepPhaseSucceeded, - }, - }}, + }, })).Should(BeEquivalentTo("")) }) @@ -2246,14 +2256,18 @@ func makeRunner(step oamcore.WorkflowStep, subTaskRunners []wfTypes.TaskRunner) return &testTaskRunner{ step: step, run: run, - checkPending: func(ctx wfContext.Context, stepStatus map[string]common.StepStatus) bool { + checkPending: func(ctx wfContext.Context, stepStatus map[string]common.StepStatus) (bool, common.StepStatus) { if step.Type != "pending" { - return false + return false, common.StepStatus{} } if pending == true { - return true + return true, common.StepStatus{ + Phase: common.WorkflowStepPhasePending, + Name: step.Name, + Type: step.Type, + } } - return false + return false, common.StepStatus{} }, } } @@ -2277,7 +2291,7 @@ metadata: type testTaskRunner struct { step oamcore.WorkflowStep run func(ctx wfContext.Context, options *wfTypes.TaskRunOptions) (common.StepStatus, *wfTypes.Operation, error) - checkPending func(ctx wfContext.Context, stepStatus map[string]common.StepStatus) bool + checkPending func(ctx wfContext.Context, stepStatus map[string]common.StepStatus) (bool, common.StepStatus) } // Name return step name. @@ -2317,7 +2331,7 @@ func (tr *testTaskRunner) Run(ctx wfContext.Context, options *wfTypes.TaskRunOpt } // Pending check task should be executed or not. -func (tr *testTaskRunner) Pending(ctx wfContext.Context, stepStatus map[string]common.StepStatus) bool { +func (tr *testTaskRunner) Pending(ctx wfContext.Context, stepStatus map[string]common.StepStatus) (bool, common.StepStatus) { return tr.checkPending(ctx, stepStatus) }