Feat: add op.#Suspend and suspending phase in step (#5567)

This commit is contained in:
Tianxin Dong
2023-03-01 10:59:31 +08:00
committed by GitHub
parent 7f54ca96e7
commit 41844eb461
11 changed files with 362 additions and 40 deletions

View File

@@ -12,8 +12,22 @@ spec:
schematic:
cue:
template: |
import (
"vela/op"
)
suspend: op.#Suspend & {
if parameter.duration != _|_ {
duration: parameter.duration
}
if parameter.message != _|_ {
message: parameter.message
}
}
parameter: {
// +usage=Specify the wait duration time to resume workflow such as "30s", "1min" or "2m15s"
duration?: string
// +usage=The suspend message to show
message?: string
}

View File

@@ -12,8 +12,22 @@ spec:
schematic:
cue:
template: |
import (
"vela/op"
)
suspend: op.#Suspend & {
if parameter.duration != _|_ {
duration: parameter.duration
}
if parameter.message != _|_ {
message: parameter.message
}
}
parameter: {
// +usage=Specify the wait duration time to resume workflow such as "30s", "1min" or "2m15s"
duration?: string
// +usage=The suspend message to show
message?: string
}

2
go.mod
View File

@@ -61,7 +61,7 @@ require (
github.com/koding/websocketproxy v0.0.0-20181220232114-7ed82d81a28c
github.com/kubevela/pkg v0.0.0-20230224072506-9ff31b249aa8
github.com/kubevela/prism v1.7.0-alpha.1
github.com/kubevela/workflow v0.4.1-0.20230215100259-edc78492f107
github.com/kubevela/workflow v0.4.1-0.20230227023118-8eae143050d4
github.com/kyokomi/emoji v2.2.4+incompatible
github.com/mitchellh/hashstructure/v2 v2.0.2
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd

4
go.sum
View File

@@ -992,8 +992,8 @@ github.com/kubevela/pkg v0.0.0-20230224072506-9ff31b249aa8 h1:oEiMjRn4tCEkL1p0nW
github.com/kubevela/pkg v0.0.0-20230224072506-9ff31b249aa8/go.mod h1:GilLxt+9L4sU2tLeZAGHga8wiYmjjfPX/Q6JkyuuXSM=
github.com/kubevela/prism v1.7.0-alpha.1 h1:oeZFn1Oy6gxSSFzMTfsWjLOCKaaooMVm1JGNK4j4Mlo=
github.com/kubevela/prism v1.7.0-alpha.1/go.mod h1:AJSDfdA+RkRSnWx3xEcogbmOTpX+l7RSIwqVHxwUtaI=
github.com/kubevela/workflow v0.4.1-0.20230215100259-edc78492f107 h1:KaNaPokvPAOiwJy8qx2ilLu7dXznATK7N+LE+2yv8aY=
github.com/kubevela/workflow v0.4.1-0.20230215100259-edc78492f107/go.mod h1:U94Hz5rlHPAatN+Birhumly26zjAguMumdhrYk+e5mo=
github.com/kubevela/workflow v0.4.1-0.20230227023118-8eae143050d4 h1:65x5gYdWMjewjFBlFMAPmkWU6iOj4bjAKyPyTW4Yddg=
github.com/kubevela/workflow v0.4.1-0.20230227023118-8eae143050d4/go.mod h1:0W8a7hU5i5f7UUXQPAHvOTHQdKlV/9bIyDZKXF9W8vU=
github.com/kylelemons/godebug v0.0.0-20160406211939-eadb3ce320cb/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=

View File

@@ -430,7 +430,7 @@ var _ = Describe("Test workflow service functions", func() {
record, err := workflowService.DetailWorkflowRecord(ctx, &model.Workflow{Name: ResumeWorkflow, AppPrimaryKey: appName}, "workflow-resume-1")
Expect(err).Should(BeNil())
Expect(len(record.Steps)).Should(Equal(1))
Expect(record.Steps[0].Phase).Should(Equal(workflowv1alpha1.WorkflowStepPhaseSucceeded))
Expect(record.Steps[0].Phase).Should(Equal(workflowv1alpha1.WorkflowStepPhaseRunning))
})
It("Test TerminateRecord function", func() {

View File

@@ -235,11 +235,11 @@ var _ = Describe("Test Workflow", func() {
Expect(appObj.Status.Workflow.Suspend).Should(BeTrue())
Expect(appObj.Status.Phase).Should(BeEquivalentTo(common.ApplicationWorkflowSuspending))
Expect(appObj.Status.Workflow.Steps[0].Phase).Should(BeEquivalentTo(workflowv1alpha1.WorkflowStepPhaseRunning))
Expect(appObj.Status.Workflow.Steps[0].Phase).Should(BeEquivalentTo(workflowv1alpha1.WorkflowStepPhaseSuspending))
Expect(appObj.Status.Workflow.Steps[0].ID).ShouldNot(BeEquivalentTo(""))
// resume
appObj.Status.Workflow.Suspend = false
appObj.Status.Workflow.Steps[0].Phase = workflowv1alpha1.WorkflowStepPhaseSucceeded
appObj.Status.Workflow.Steps[0].Phase = workflowv1alpha1.WorkflowStepPhaseRunning
Expect(k8sClient.Status().Patch(ctx, appObj, client.Merge)).Should(BeNil())
Expect(k8sClient.Get(ctx, client.ObjectKey{
Name: suspendApp.Name,

View File

@@ -83,19 +83,70 @@ func (wo appWorkflowOperator) Suspend(ctx context.Context) error {
if err = rollout.SuspendRollout(ctx, wo.cli, app, wo.outputWriter); err != nil {
return err
}
appKey := client.ObjectKeyFromObject(app)
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
if err := wo.cli.Get(ctx, appKey, app); err != nil {
return err
if err := SuspendWorkflow(ctx, wo.cli, app, ""); err != nil {
return err
}
return writeOutputF(wo.outputWriter, "Successfully suspend workflow: %s\n", app.Name)
}
// Suspend a suspending workflow
func (wo appWorkflowStepOperator) Suspend(ctx context.Context, step string) error {
if step == "" {
return fmt.Errorf("step can not be empty")
}
app := wo.application
if app.Status.Workflow == nil {
return fmt.Errorf("the workflow in application is not running")
}
if app.Status.Workflow.Terminated {
return fmt.Errorf("can not suspend a terminated workflow")
}
if err := SuspendWorkflow(ctx, wo.cli, app, step); err != nil {
return err
}
return writeOutputF(wo.outputWriter, "Successfully suspend workflow %s from step %s \n", app.Name, step)
}
// SuspendWorkflow suspend workflow
func SuspendWorkflow(ctx context.Context, kubecli client.Client, app *v1beta1.Application, stepName string) error {
app.Status.Workflow.Suspend = true
steps := app.Status.Workflow.Steps
found := stepName == ""
for i, step := range steps {
if step.Phase != workflowv1alpha1.WorkflowStepPhaseRunning {
continue
}
// set the workflow suspend to true
app.Status.Workflow.Suspend = true
return wo.cli.Status().Patch(ctx, app, client.Merge)
if stepName == "" {
wfUtils.OperateSteps(steps, i, -1, workflowv1alpha1.WorkflowStepPhaseSuspending)
} else if stepName == step.Name {
wfUtils.OperateSteps(steps, i, -1, workflowv1alpha1.WorkflowStepPhaseSuspending)
found = true
break
}
for j, sub := range step.SubStepsStatus {
if sub.Phase != workflowv1alpha1.WorkflowStepPhaseRunning {
continue
}
if stepName == "" {
wfUtils.OperateSteps(steps, i, j, workflowv1alpha1.WorkflowStepPhaseSuspending)
} else if stepName == sub.Name {
wfUtils.OperateSteps(steps, i, j, workflowv1alpha1.WorkflowStepPhaseSuspending)
found = true
break
}
}
}
if !found {
return fmt.Errorf("can not find step %s", stepName)
}
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return kubecli.Status().Patch(ctx, app, client.Merge)
}); err != nil {
return err
}
return writeOutputF(wo.outputWriter, "Successfully suspend workflow: %s\n", app.Name)
return nil
}
// Resume a suspending workflow
@@ -158,27 +209,30 @@ func ResumeWorkflow(ctx context.Context, kubecli client.Client, app *v1beta1.App
found := stepName == ""
for i, step := range steps {
if step.Type == wfTypes.WorkflowStepTypeSuspend && step.Phase == workflowv1alpha1.WorkflowStepPhaseRunning {
if step.Phase != workflowv1alpha1.WorkflowStepPhaseSuspending {
continue
}
if stepName == "" {
wfUtils.OperateSteps(steps, i, -1, workflowv1alpha1.WorkflowStepPhaseRunning)
} else if stepName == step.Name {
wfUtils.OperateSteps(steps, i, -1, workflowv1alpha1.WorkflowStepPhaseRunning)
found = true
break
}
for j, sub := range step.SubStepsStatus {
if sub.Phase != workflowv1alpha1.WorkflowStepPhaseSuspending {
continue
}
if stepName == "" {
steps[i].Phase = workflowv1alpha1.WorkflowStepPhaseSucceeded
} else if stepName == step.Name {
steps[i].Phase = workflowv1alpha1.WorkflowStepPhaseSucceeded
wfUtils.OperateSteps(steps, i, j, workflowv1alpha1.WorkflowStepPhaseRunning)
} else if stepName == sub.Name {
wfUtils.OperateSteps(steps, i, j, workflowv1alpha1.WorkflowStepPhaseRunning)
found = true
break
}
}
for j, sub := range step.SubStepsStatus {
if sub.Type == wfTypes.WorkflowStepTypeSuspend && sub.Phase == workflowv1alpha1.WorkflowStepPhaseRunning {
if stepName == "" {
steps[i].SubStepsStatus[j].Phase = workflowv1alpha1.WorkflowStepPhaseSucceeded
} else if stepName == sub.Name {
steps[i].SubStepsStatus[j].Phase = workflowv1alpha1.WorkflowStepPhaseSucceeded
found = true
break
}
}
}
}
if !found {
return fmt.Errorf("can not find step %s", stepName)
}

View File

@@ -0,0 +1,17 @@
import (
"vela/op"
)
suspend: op.#Suspend & {
if parameter.duration != _|_ {
duration: parameter.duration
}
if parameter.message != _|_ {
message: parameter.message
}
}
parameter: {
duration?: string
message?: string
}

View File

@@ -80,10 +80,14 @@ func NewWorkflowSuspendCommand(c common.Args, ioStream cmdutil.IOStreams, wargs
if err := wargs.getWorkflowInstance(ctx, cmd, args); err != nil {
return err
}
if wargs.StepName != "" {
return wargs.StepOperator.Suspend(ctx, wargs.StepName)
}
return wargs.Operator.Suspend(ctx)
},
}
addNamespaceAndEnvArg(cmd)
cmd.Flags().StringVarP(&wargs.StepName, "step", "s", "", "specify the step name in the workflow")
cmd.Flags().StringVarP(&wargs.Type, "type", "t", "", "the type of the resource, support: [app, workflow]")
return cmd
}

View File

@@ -60,10 +60,12 @@ func TestWorkflowSuspend(t *testing.T) {
testCases := map[string]struct {
app *v1beta1.Application
expectedErr error
expected *v1beta1.Application
step string
expectedErr string
}{
"no app name specified": {
expectedErr: fmt.Errorf("please specify the name of application/workflow"),
expectedErr: "please specify the name of application/workflow",
},
"workflow not running": {
app: &v1beta1.Application{
@@ -74,7 +76,7 @@ func TestWorkflowSuspend(t *testing.T) {
Spec: workflowSpec,
Status: common.AppStatus{},
},
expectedErr: fmt.Errorf("the workflow in application workflow-not-running is not start"),
expectedErr: "the workflow in application workflow-not-running is not start",
},
"suspend successfully": {
app: &v1beta1.Application{
@@ -89,6 +91,204 @@ func TestWorkflowSuspend(t *testing.T) {
},
},
},
expected: &v1beta1.Application{
ObjectMeta: metav1.ObjectMeta{
Name: "workflow",
Namespace: "test",
},
Spec: workflowSpec,
Status: common.AppStatus{
Workflow: &common.WorkflowStatus{
Suspend: true,
},
},
},
},
"step not found": {
app: &v1beta1.Application{
ObjectMeta: metav1.ObjectMeta{
Name: "step-not-found",
Namespace: "default",
},
Spec: workflowSpec,
Status: common.AppStatus{
Workflow: &common.WorkflowStatus{
Suspend: false,
},
},
},
step: "not-found",
expectedErr: "can not find",
},
"suspend all": {
app: &v1beta1.Application{
ObjectMeta: metav1.ObjectMeta{
Name: "suspend-all",
Namespace: "default",
},
Spec: workflowSpec,
Status: common.AppStatus{
Workflow: &common.WorkflowStatus{
Steps: []workflowv1alpha1.WorkflowStepStatus{
{
StepStatus: workflowv1alpha1.StepStatus{
Name: "step1",
Phase: workflowv1alpha1.WorkflowStepPhaseRunning,
},
SubStepsStatus: []workflowv1alpha1.StepStatus{
{
Name: "sub1",
Phase: workflowv1alpha1.WorkflowStepPhaseRunning,
},
},
},
{
StepStatus: workflowv1alpha1.StepStatus{
Name: "step2",
Phase: workflowv1alpha1.WorkflowStepPhaseRunning,
},
SubStepsStatus: []workflowv1alpha1.StepStatus{
{
Name: "sub2",
Phase: workflowv1alpha1.WorkflowStepPhaseSucceeded,
},
},
},
},
},
},
},
expected: &v1beta1.Application{
Status: common.AppStatus{
Workflow: &common.WorkflowStatus{
Suspend: true,
Steps: []workflowv1alpha1.WorkflowStepStatus{
{
StepStatus: workflowv1alpha1.StepStatus{
Name: "step1",
Phase: workflowv1alpha1.WorkflowStepPhaseSuspending,
},
SubStepsStatus: []workflowv1alpha1.StepStatus{
{
Name: "sub1",
Phase: workflowv1alpha1.WorkflowStepPhaseSuspending,
},
},
},
{
StepStatus: workflowv1alpha1.StepStatus{
Name: "step2",
Phase: workflowv1alpha1.WorkflowStepPhaseSuspending,
},
SubStepsStatus: []workflowv1alpha1.StepStatus{
{
Name: "sub2",
Phase: workflowv1alpha1.WorkflowStepPhaseSucceeded,
},
},
},
},
},
},
},
},
"suspend specific step": {
app: &v1beta1.Application{
ObjectMeta: metav1.ObjectMeta{
Name: "suspend-step",
Namespace: "default",
},
Spec: workflowSpec,
Status: common.AppStatus{
Workflow: &common.WorkflowStatus{
Steps: []workflowv1alpha1.WorkflowStepStatus{
{
StepStatus: workflowv1alpha1.StepStatus{
Name: "step1",
Phase: workflowv1alpha1.WorkflowStepPhaseRunning,
},
SubStepsStatus: []workflowv1alpha1.StepStatus{
{
Name: "sub1",
Phase: workflowv1alpha1.WorkflowStepPhaseRunning,
},
},
},
},
},
},
},
expected: &v1beta1.Application{
Status: common.AppStatus{
Workflow: &common.WorkflowStatus{
Suspend: true,
Steps: []workflowv1alpha1.WorkflowStepStatus{
{
StepStatus: workflowv1alpha1.StepStatus{
Name: "step1",
Phase: workflowv1alpha1.WorkflowStepPhaseSuspending,
},
SubStepsStatus: []workflowv1alpha1.StepStatus{
{
Name: "sub1",
Phase: workflowv1alpha1.WorkflowStepPhaseSuspending,
},
},
},
},
},
},
},
step: "step1",
},
"suspend specific sub step": {
app: &v1beta1.Application{
ObjectMeta: metav1.ObjectMeta{
Name: "suspend-sub-step",
Namespace: "default",
},
Spec: workflowSpec,
Status: common.AppStatus{
Workflow: &common.WorkflowStatus{
Steps: []workflowv1alpha1.WorkflowStepStatus{
{
StepStatus: workflowv1alpha1.StepStatus{
Name: "step1",
Phase: workflowv1alpha1.WorkflowStepPhaseRunning,
},
SubStepsStatus: []workflowv1alpha1.StepStatus{
{
Name: "sub1",
Phase: workflowv1alpha1.WorkflowStepPhaseRunning,
},
},
},
},
},
},
},
expected: &v1beta1.Application{
Status: common.AppStatus{
Workflow: &common.WorkflowStatus{
Suspend: true,
Steps: []workflowv1alpha1.WorkflowStepStatus{
{
StepStatus: workflowv1alpha1.StepStatus{
Name: "step1",
Phase: workflowv1alpha1.WorkflowStepPhaseRunning,
},
SubStepsStatus: []workflowv1alpha1.StepStatus{
{
Name: "sub1",
Phase: workflowv1alpha1.WorkflowStepPhaseSuspending,
},
},
},
},
},
},
},
step: "sub1",
},
}
@@ -104,7 +304,7 @@ func TestWorkflowSuspend(t *testing.T) {
if tc.app != nil {
err := client.Create(ctx, tc.app)
r.NoError(err)
cmdArgs := []string{tc.app.Name}
if tc.app.Namespace != corev1.NamespaceDefault {
err := client.Create(ctx, &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
@@ -112,14 +312,17 @@ func TestWorkflowSuspend(t *testing.T) {
},
})
r.NoError(err)
cmdArgs = append(cmdArgs, "-n", tc.app.Namespace)
cmd.SetArgs([]string{tc.app.Name, "-n", tc.app.Namespace})
} else {
cmd.SetArgs([]string{tc.app.Name})
}
if tc.step != "" {
cmdArgs = append(cmdArgs, "--step", tc.step)
}
cmd.SetArgs(cmdArgs)
}
err = cmd.Execute()
if tc.expectedErr != nil {
r.Equal(tc.expectedErr, err)
if tc.expectedErr != "" {
r.Contains(err.Error(), tc.expectedErr)
return
}
r.NoError(err)
@@ -131,6 +334,7 @@ func TestWorkflowSuspend(t *testing.T) {
}, wf)
r.NoError(err)
r.Equal(true, wf.Status.Workflow.Suspend)
r.Equal(tc.expected.Status, wf.Status)
})
}
}
@@ -263,11 +467,11 @@ func TestWorkflowResume(t *testing.T) {
r.Equal(false, wf.Status.Workflow.Suspend)
for _, step := range wf.Status.Workflow.Steps {
if step.Type == "suspend" {
r.Equal(step.Phase, workflowv1alpha1.WorkflowStepPhaseSucceeded)
r.Equal(step.Phase, workflowv1alpha1.WorkflowStepPhaseRunning)
}
for _, sub := range step.SubStepsStatus {
if sub.Type == "suspend" {
r.Equal(sub.Phase, workflowv1alpha1.WorkflowStepPhaseSucceeded)
r.Equal(sub.Phase, workflowv1alpha1.WorkflowStepPhaseRunning)
}
}
}

View File

@@ -1,3 +1,7 @@
import (
"vela/op"
)
"suspend": {
type: "workflow-step"
annotations: {
@@ -7,8 +11,19 @@
description: "Suspend the current workflow, it can be resumed by 'vela workflow resume' command."
}
template: {
suspend: op.#Suspend & {
if parameter.duration != _|_ {
duration: parameter.duration
}
if parameter.message != _|_ {
message: parameter.message
}
}
parameter: {
// +usage=Specify the wait duration time to resume workflow such as "30s", "1min" or "2m15s"
duration?: string
// +usage=The suspend message to show
message?: string
}
}