Feat: add workflow reconciling backoff time and failed limit times (#2881)

* Feat: add workflow failed after retries

Signed-off-by: FogDong <dongtianxin.tx@alibaba-inc.com>

* Feat: add workflow reconcile backoff time

Signed-off-by: FogDong <dongtianxin.tx@alibaba-inc.com>

* fix lint

Signed-off-by: FogDong <dongtianxin.tx@alibaba-inc.com>

* make reviewable

Signed-off-by: FogDong <dongtianxin.tx@alibaba-inc.com>

* resolve some comments

Signed-off-by: FogDong <dongtianxin.tx@alibaba-inc.com>

* add tests

* fix rebase

Signed-off-by: FogDong <dongtianxin.tx@alibaba-inc.com>

* fix test

Signed-off-by: FogDong <dongtianxin.tx@alibaba-inc.com>

* fix status

Signed-off-by: FogDong <dongtianxin.tx@alibaba-inc.com>

* fix requeue time interval

* resolve comments

Signed-off-by: FogDong <dongtianxin.tx@alibaba-inc.com>

* change time to pointer

Signed-off-by: FogDong <dongtianxin.tx@alibaba-inc.com>

* fix pointer test

Signed-off-by: FogDong <dongtianxin.tx@alibaba-inc.com>

* fix test

Signed-off-by: FogDong <dongtianxin.tx@alibaba-inc.com>

* change time to cm

Signed-off-by: FogDong <dongtianxin.tx@alibaba-inc.com>

* resolve comments and add e2e test

Signed-off-by: FogDong <dongtianxin.tx@alibaba-inc.com>
This commit is contained in:
Tianxin Dong
2021-12-15 11:33:33 +08:00
committed by GitHub
parent 4dc213469a
commit 655c2615e1
27 changed files with 1117 additions and 197 deletions

1
.gitignore vendored
View File

@@ -41,6 +41,7 @@ references/cmd/cli/fake/source.go
references/cmd/cli/fake/chart_source.go
charts/vela-core/crds/_.yaml
.test_vela
tmp/
.vela/

View File

@@ -201,6 +201,8 @@ const (
type WorkflowState string
const (
// WorkflowStateInitializing means the workflow is in initial state
WorkflowStateInitializing WorkflowState = "initializing"
// WorkflowStateTerminated means workflow is terminated manually, and it won't be started unless the spec changed.
WorkflowStateTerminated WorkflowState = "terminated"
// WorkflowStateSuspended means workflow is suspended manually, and it can be resumed.
@@ -327,6 +329,7 @@ type PolicyStatus struct {
type WorkflowStatus struct {
AppRevision string `json:"appRevision,omitempty"`
Mode WorkflowMode `json:"mode"`
Message string `json:"message,omitempty"`
Suspend bool `json:"suspend"`
Terminated bool `json:"terminated"`
@@ -351,7 +354,7 @@ type WorkflowStepPhase string
const (
// WorkflowStepPhaseSucceeded will make the controller run the next step.
WorkflowStepPhaseSucceeded WorkflowStepPhase = "succeeded"
// WorkflowStepPhaseFailed will make the controller stop the workflow and report error in `message`.
// WorkflowStepPhaseFailed will report error in `message`.
WorkflowStepPhaseFailed WorkflowStepPhase = "failed"
// WorkflowStepPhaseStopped will make the controller stop the workflow.
WorkflowStepPhaseStopped WorkflowStepPhase = "stopped"

View File

@@ -18,13 +18,14 @@ package types
// reason for Application
const (
ReasonParsed = "Parsed"
ReasonRendered = "Rendered"
ReasonRevisoned = "Revisioned"
ReasonApplied = "Applied"
ReasonHealthCheck = "HealthChecked"
ReasonDeployed = "Deployed"
ReasonRollout = "Rollout"
ReasonParsed = "Parsed"
ReasonRendered = "Rendered"
ReasonPolicyGenerated = "PolicyGenerated"
ReasonRevisoned = "Revisioned"
ReasonApplied = "Applied"
ReasonHealthCheck = "HealthChecked"
ReasonDeployed = "Deployed"
ReasonRollout = "Rollout"
ReasonFailedParse = "FailedParse"
ReasonFailedRender = "FailedRender"
@@ -41,6 +42,7 @@ const (
const (
MessageParsed = "Parsed successfully"
MessageRendered = "Rendered successfully"
MessagePolicyGenerated = "Policy generated successfully"
MessageRevisioned = "Revisioned successfully"
MessageApplied = "Applied successfully"
MessageWorkflowFinished = "Workflow finished"

View File

@@ -986,6 +986,8 @@ spec:
type: object
finished:
type: boolean
message:
type: string
mode:
description: WorkflowMode describes the mode of workflow
type: string
@@ -3244,6 +3246,8 @@ spec:
type: object
finished:
type: boolean
message:
type: string
mode:
description: WorkflowMode describes the mode of workflow
type: string

View File

@@ -667,6 +667,8 @@ spec:
type: object
finished:
type: boolean
message:
type: string
mode:
description: WorkflowMode describes the mode of workflow
type: string
@@ -1491,6 +1493,8 @@ spec:
type: object
finished:
type: boolean
message:
type: string
mode:
description: WorkflowMode describes the mode of workflow
type: string

View File

@@ -986,6 +986,8 @@ spec:
type: object
finished:
type: boolean
message:
type: string
mode:
description: WorkflowMode describes the mode of workflow
type: string
@@ -3244,6 +3246,8 @@ spec:
type: object
finished:
type: boolean
message:
type: string
mode:
description: WorkflowMode describes the mode of workflow
type: string

View File

@@ -667,6 +667,8 @@ spec:
type: object
finished:
type: boolean
message:
type: string
mode:
description: WorkflowMode describes the mode of workflow
type: string
@@ -1491,6 +1493,8 @@ spec:
type: object
finished:
type: boolean
message:
type: string
mode:
description: WorkflowMode describes the mode of workflow
type: string

View File

@@ -986,6 +986,8 @@ spec:
type: object
finished:
type: boolean
message:
type: string
mode:
description: WorkflowMode describes the mode of workflow
type: string
@@ -3244,6 +3246,8 @@ spec:
type: object
finished:
type: boolean
message:
type: string
mode:
description: WorkflowMode describes the mode of workflow
type: string

View File

@@ -901,6 +901,8 @@ spec:
type: object
finished:
type: boolean
message:
type: string
mode:
description: WorkflowMode describes the mode of workflow
type: string
@@ -1992,6 +1994,8 @@ spec:
type: object
finished:
type: boolean
message:
type: string
mode:
description: WorkflowMode describes the mode of workflow
type: string

View File

@@ -19,13 +19,13 @@ package application
import (
"context"
"fmt"
"reflect"
"time"
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
@@ -33,6 +33,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller"
ctrlEvent "sigs.k8s.io/controller-runtime/pkg/event"
ctrlHandler "sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
@@ -64,9 +65,6 @@ const (
)
const (
// baseWorkflowBackoffWaitTime is the time to wait before reconcile workflow again
baseWorkflowBackoffWaitTime = 3000 * time.Millisecond
// baseWorkflowBackoffWaitTime is the time to wait gc check
baseGCBackoffWaitTime = 3000 * time.Millisecond
@@ -121,7 +119,10 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
}
endReconcile, result, err := r.handleFinalizers(logCtx, app, handler)
if err != nil {
return r.endWithNegativeCondition(logCtx, app, condition.ReconcileError(err), common.ApplicationStarting)
if app.GetDeletionTimestamp() == nil {
return r.endWithNegativeCondition(logCtx, app, condition.ReconcileError(err), common.ApplicationStarting)
}
return result, err
}
if endReconcile {
return result, nil
@@ -174,17 +175,17 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
}
app.Status.SetConditions(condition.ReadyCondition(common.PolicyCondition.String()))
app.Status.SetConditions(condition.ReadyCondition(common.RenderCondition.String()))
r.Recorder.Event(app, event.Normal(velatypes.ReasonRendered, velatypes.MessageRendered))
r.Recorder.Event(app, event.Normal(velatypes.ReasonPolicyGenerated, velatypes.MessagePolicyGenerated))
if !appWillRollout(app) {
steps, err := handler.GenerateApplicationSteps(logCtx, app, appParser, appFile, handler.currentAppRev)
if err != nil {
logCtx.Error(err, "[handle workflow]")
r.Recorder.Event(app, event.Warning(velatypes.ReasonFailedWorkflow, err))
return r.endWithNegativeCondition(logCtx, app, condition.ErrorCondition(common.WorkflowCondition.String(), err), common.ApplicationRunningWorkflow)
return r.endWithNegativeCondition(logCtx, app, condition.ErrorCondition(common.WorkflowCondition.String(), err), common.ApplicationRendering)
}
app.Status.SetConditions(condition.ReadyCondition(common.RenderCondition.String()))
r.Recorder.Event(app, event.Normal(velatypes.ReasonRendered, velatypes.MessageRendered))
wf := workflow.NewWorkflow(app, r.Client, appFile.WorkflowMode)
workflowState, err := wf.ExecuteSteps(logCtx.Fork("workflow"), handler.currentAppRev, steps)
if err != nil {
@@ -198,23 +199,26 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
app.Status.AppliedResources = handler.appliedResources
app.Status.Services = handler.services
switch workflowState {
case common.WorkflowStateInitializing:
logCtx.Info("Workflow return state=Initializing")
return r.gcResourceTrackers(logCtx, handler, common.ApplicationRendering, false)
case common.WorkflowStateSuspended:
logCtx.Info("Workflow return state=Suspend")
return r.gcResourceTrackers(logCtx, handler, common.ApplicationWorkflowSuspending, false)
case common.WorkflowStateTerminated:
logCtx.Info("Workflow return state=Terminated")
if err := r.doWorkflowFinish(app, wf); err != nil {
return r.endWithNegativeConditionWithRetry(ctx, app, condition.ErrorCondition(common.WorkflowCondition.String(), errors.WithMessage(err, "DoWorkflowFinish")), common.ApplicationRunningWorkflow)
return r.endWithNegativeCondition(ctx, app, condition.ErrorCondition(common.WorkflowCondition.String(), errors.WithMessage(err, "DoWorkflowFinish")), common.ApplicationRunningWorkflow)
}
return r.gcResourceTrackers(logCtx, handler, common.ApplicationWorkflowTerminated, false)
case common.WorkflowStateExecuting:
logCtx.Info("Workflow return state=Executing")
_, err = r.gcResourceTrackers(logCtx, handler, common.ApplicationRunningWorkflow, false)
return reconcile.Result{RequeueAfter: baseWorkflowBackoffWaitTime}, err
return reconcile.Result{RequeueAfter: wf.GetBackoffWaitTime()}, err
case common.WorkflowStateSucceeded:
logCtx.Info("Workflow return state=Succeeded")
if err := r.doWorkflowFinish(app, wf); err != nil {
return r.endWithNegativeConditionWithRetry(logCtx, app, condition.ErrorCondition(common.WorkflowCondition.String(), errors.WithMessage(err, "DoWorkflowFinish")), common.ApplicationRunningWorkflow)
return r.endWithNegativeCondition(logCtx, app, condition.ErrorCondition(common.WorkflowCondition.String(), errors.WithMessage(err, "DoWorkflowFinish")), common.ApplicationRunningWorkflow)
}
app.Status.SetConditions(condition.ReadyCondition(common.WorkflowCondition.String()))
r.Recorder.Event(app, event.Normal(velatypes.ReasonApplied, velatypes.MessageWorkflowFinished))
@@ -232,15 +236,17 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
if err != nil {
logCtx.Error(err, "Failed to render components")
r.Recorder.Event(app, event.Warning(velatypes.ReasonFailedRender, err))
return r.endWithNegativeConditionWithRetry(logCtx, app, condition.ErrorCondition(common.RenderCondition.String(), err), common.ApplicationRendering)
return r.endWithNegativeCondition(logCtx, app, condition.ErrorCondition(common.RenderCondition.String(), err), common.ApplicationRendering)
}
app.Status.SetConditions(condition.ReadyCondition(common.RenderCondition.String()))
r.Recorder.Event(app, event.Normal(velatypes.ReasonRendered, velatypes.MessageRendered))
assemble.HandleCheckManageWorkloadTrait(*handler.currentAppRev, comps)
if err := handler.HandleComponentsRevision(logCtx, comps); err != nil {
logCtx.Error(err, "Failed to handle components revision")
r.Recorder.Event(app, event.Warning(velatypes.ReasonFailedRevision, err))
return r.endWithNegativeConditionWithRetry(logCtx, app, condition.ErrorCondition(common.RenderCondition.String(), err), common.ApplicationRendering)
return r.endWithNegativeCondition(logCtx, app, condition.ErrorCondition(common.RenderCondition.String(), err), common.ApplicationRendering)
}
klog.Info("Application manifests has prepared and ready for appRollout to handle", "application", klog.KObj(app))
}
@@ -304,7 +310,7 @@ func (r *Reconciler) gcResourceTrackers(logCtx monitorContext.Context, handler *
if err != nil {
logCtx.Error(err, "Failed to gc resourcetrackers")
r.Recorder.Event(handler.app, event.Warning(velatypes.ReasonFailedGC, err))
return r.endWithNegativeConditionWithRetry(logCtx, handler.app, condition.ReconcileError(err), phase)
return r.endWithNegativeCondition(logCtx, handler.app, condition.ReconcileError(err), phase)
}
if !finished {
logCtx.Info("GarbageCollecting resourcetrackers")
@@ -313,10 +319,13 @@ func (r *Reconciler) gcResourceTrackers(logCtx monitorContext.Context, handler *
cond.Message = fmt.Sprintf("Waiting for %s to delete. (At least %d resources are deleting.)", waiting[0].DisplayName(), len(waiting))
}
handler.app.Status.SetConditions(cond)
return ctrl.Result{RequeueAfter: baseGCBackoffWaitTime}, r.patchStatusWithRetryOnConflict(logCtx, handler.app, phase)
return ctrl.Result{RequeueAfter: baseGCBackoffWaitTime}, r.patchStatus(logCtx, handler.app, phase)
}
logCtx.Info("GarbageCollected resourcetrackers")
return ctrl.Result{}, r.patchStatusWithRetryOnConflict(logCtx, handler.app, phase)
if phase == common.ApplicationRendering {
return ctrl.Result{}, r.updateStatus(logCtx, handler.app, common.ApplicationRunningWorkflow)
}
return ctrl.Result{}, r.patchStatus(logCtx, handler.app, phase)
}
// NOTE Because resource tracker is cluster-scoped resources, we cannot garbage collect them
@@ -349,49 +358,24 @@ func (r *Reconciler) handleFinalizers(ctx monitorContext.Context, app *v1beta1.A
return false, ctrl.Result{}, nil
}
func (r *Reconciler) _endWithNegativeCondition(ctx context.Context, app *v1beta1.Application, condition condition.Condition, phase common.ApplicationPhase, retry bool) (ctrl.Result, error) {
func (r *Reconciler) endWithNegativeCondition(ctx context.Context, app *v1beta1.Application, condition condition.Condition, phase common.ApplicationPhase) (ctrl.Result, error) {
app.SetConditions(condition)
handler := r.patchStatus
if retry {
handler = r.patchStatusWithRetryOnConflict
}
if err := handler(ctx, app, phase); err != nil {
if err := r.patchStatus(ctx, app, phase); err != nil {
return ctrl.Result{}, errors.WithMessage(err, "cannot update application status")
}
return ctrl.Result{}, fmt.Errorf("object level reconcile error, type: %q, msg: %q", string(condition.Type), condition.Message)
}
func (r *Reconciler) endWithNegativeCondition(ctx context.Context, app *v1beta1.Application, condition condition.Condition, phase common.ApplicationPhase) (ctrl.Result, error) {
return r._endWithNegativeCondition(ctx, app, condition, phase, false)
}
// Note: Only operations that must override the status should use this function, it should only focus on workflow operations by now.
func (r *Reconciler) endWithNegativeConditionWithRetry(ctx context.Context, app *v1beta1.Application, condition condition.Condition, phase common.ApplicationPhase) (ctrl.Result, error) {
return r._endWithNegativeCondition(ctx, app, condition, phase, true)
}
func (r *Reconciler) patchStatus(ctx context.Context, app *v1beta1.Application, phase common.ApplicationPhase) error {
app.Status.Phase = phase
updateObservedGeneration(app)
return r.Status().Patch(ctx, app, client.Merge)
}
// Note: Only operations that must override the status should use this function, it should only focus on workflow operations by now.
func (r *Reconciler) patchStatusWithRetryOnConflict(ctx context.Context, app *v1beta1.Application, phase common.ApplicationPhase) error {
func (r *Reconciler) updateStatus(ctx context.Context, app *v1beta1.Application, phase common.ApplicationPhase) error {
app.Status.Phase = phase
updateObservedGeneration(app)
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
status := app.Status.DeepCopy()
if err := r.Client.Get(ctx, client.ObjectKeyFromObject(app), app); err != nil {
return errors.WithMessage(err, "failed to get application while patching status")
}
app.Status = *status
err := r.Status().Patch(ctx, app, client.Merge)
if err != nil {
return errors.WithMessage(err, "failed to re-patch status")
}
return nil
})
return r.Status().Update(ctx, app)
}
func (r *Reconciler) doWorkflowFinish(app *v1beta1.Application, wf workflow.Workflow) error {
@@ -456,6 +440,49 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
WithOptions(controller.Options{
MaxConcurrentReconciles: r.concurrentReconciles,
}).
WithEventFilter(predicate.Funcs{
// filter the changes in workflow status
// let workflow handle its reconcile
UpdateFunc: func(e ctrlEvent.UpdateEvent) bool {
new, ok := e.ObjectNew.DeepCopyObject().(*v1beta1.Application)
if !ok {
return true
}
old, ok := e.ObjectOld.DeepCopyObject().(*v1beta1.Application)
if !ok {
return true
}
// if the generation is changed, return true to let the controller handle it
if old.Generation != new.Generation {
return true
}
// ignore the changes in workflow status
if old.Status.Workflow != nil && new.Status.Workflow != nil {
// only workflow execution will change the status.workflow
// let workflow backoff to requeue the event
new.Status.Workflow.Steps = old.Status.Workflow.Steps
new.Status.Workflow.ContextBackend = old.Status.Workflow.ContextBackend
new.Status.Workflow.Message = old.Status.Workflow.Message
// appliedResources and Services will be changed during the execution of workflow
// once the resources is added, the managed fields will also be changed
new.Status.AppliedResources = old.Status.AppliedResources
new.Status.Services = old.Status.Services
new.ManagedFields = old.ManagedFields
// the resource version will be changed if the object is changed
// ignore this change and let reflect.DeepEqual to compare the rest of the object
new.ResourceVersion = old.ResourceVersion
}
return !reflect.DeepEqual(old, new)
},
CreateFunc: func(e ctrlEvent.CreateEvent) bool {
return true
},
DeleteFunc: func(e ctrlEvent.DeleteEvent) bool {
return true
},
}).
For(&v1beta1.Application{}).
Complete(r)
}

View File

@@ -55,6 +55,8 @@ import (
"github.com/oam-dev/kubevela/pkg/oam/testutil"
"github.com/oam-dev/kubevela/pkg/oam/util"
common2 "github.com/oam-dev/kubevela/pkg/utils/common"
"github.com/oam-dev/kubevela/pkg/workflow"
"github.com/oam-dev/kubevela/pkg/workflow/tasks/custom"
)
// TODO: Refactor the tests to not copy and paste duplicated code 10 times
@@ -107,6 +109,13 @@ var _ = Describe("Test Application Controller", func() {
appFailRender.Spec.Components[0].Properties = &runtime.RawExtension{
Raw: []byte(`{"cmd1":["sleep","1000"],"image1":"busybox"}`),
}
appFailRender.Spec.Policies = []v1beta1.AppPolicy{
{
Name: "policy1",
Type: "foopolicy",
Properties: &runtime.RawExtension{Raw: []byte(`{"test":"test"}`)},
},
}
appImportPkg := &v1beta1.Application{
TypeMeta: metav1.TypeMeta{
@@ -229,6 +238,11 @@ var _ = Describe("Test Application Controller", func() {
cd := &v1beta1.ComponentDefinition{}
cDDefJson, _ := yaml.YAMLToJSON([]byte(componentDefYaml))
k8sObjectsCDJson, _ := yaml.YAMLToJSON([]byte(k8sObjectsComponentDefinitionYaml))
pd := &v1beta1.PolicyDefinition{}
pd.Namespace = "vela-system"
pdDefJson, _ := yaml.YAMLToJSON([]byte(policyDefYaml))
importWd := &v1beta1.WorkloadDefinition{}
importWdJson, _ := yaml.YAMLToJSON([]byte(wDImportYaml))
@@ -253,6 +267,12 @@ var _ = Describe("Test Application Controller", func() {
Expect(json.Unmarshal(cDDefJson, cd)).Should(BeNil())
Expect(k8sClient.Create(ctx, cd.DeepCopy())).Should(SatisfyAny(BeNil(), &util.AlreadyExistMatcher{}))
Expect(json.Unmarshal(k8sObjectsCDJson, cd)).Should(BeNil())
Expect(k8sClient.Create(ctx, cd.DeepCopy())).Should(SatisfyAny(BeNil(), &util.AlreadyExistMatcher{}))
Expect(json.Unmarshal(pdDefJson, pd)).Should(BeNil())
Expect(k8sClient.Create(ctx, pd.DeepCopy())).Should(SatisfyAny(BeNil(), &util.AlreadyExistMatcher{}))
Expect(json.Unmarshal(importWdJson, importWd)).Should(BeNil())
Expect(k8sClient.Create(ctx, importWd.DeepCopy())).Should(SatisfyAny(BeNil(), &util.AlreadyExistMatcher{}))
importTdJson, err := yaml.YAMLToJSON([]byte(tdImportedYaml))
@@ -1472,6 +1492,7 @@ var _ = Describe("Test Application Controller", func() {
Expect(k8sClient.Update(ctx, checkApp)).Should(BeNil())
testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey})
testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey})
testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey})
checkApp = &v1beta1.Application{}
Expect(k8sClient.Get(ctx, appKey, checkApp)).Should(BeNil())
Expect(checkApp.Status.Phase).Should(BeEquivalentTo(common.ApplicationRunning))
@@ -1537,11 +1558,7 @@ var _ = Describe("Test Application Controller", func() {
}
Expect(k8sClient.Create(ctx, app)).Should(BeNil())
appKey := types.NamespacedName{Namespace: ns.Name, Name: app.Name}
// first reconcile handle finalizer
testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey})
// second reconcile apply all resources
testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey})
testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey})
testutil.ReconcileOnceAfterFinalizer(reconciler, reconcile.Request{NamespacedName: appKey})
checkApp := &v1beta1.Application{}
Expect(k8sClient.Get(ctx, appKey, checkApp)).Should(BeNil())
Expect(checkApp.Status.Phase).Should(BeEquivalentTo(common.ApplicationRunning))
@@ -1584,11 +1601,7 @@ var _ = Describe("Test Application Controller", func() {
}
Expect(k8sClient.Create(ctx, app)).Should(BeNil())
appKey := types.NamespacedName{Namespace: ns.Name, Name: app.Name}
// first reconcile handle finalizer
testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey})
// second reconcile apply all resources
testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey})
testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey})
testutil.ReconcileOnceAfterFinalizer(reconciler, reconcile.Request{NamespacedName: appKey})
checkApp := &v1beta1.Application{}
Expect(k8sClient.Get(ctx, appKey, checkApp)).Should(BeNil())
Expect(checkApp.Status.Phase).Should(BeEquivalentTo(common.ApplicationRunning))
@@ -1634,11 +1647,7 @@ var _ = Describe("Test Application Controller", func() {
}
Expect(k8sClient.Create(ctx, app)).Should(BeNil())
appKey := types.NamespacedName{Namespace: ns.Name, Name: app.Name}
// first reconcile handle finalizer
testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey})
// second reconcile apply all resources
testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey})
testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey})
testutil.ReconcileOnceAfterFinalizer(reconciler, reconcile.Request{NamespacedName: appKey})
checkApp := &v1beta1.Application{}
Expect(k8sClient.Get(ctx, appKey, checkApp)).Should(BeNil())
Expect(checkApp.Status.Phase).Should(BeEquivalentTo(common.ApplicationRunning))
@@ -1723,6 +1732,189 @@ var _ = Describe("Test Application Controller", func() {
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: "myweb1", Namespace: ns.Name}, deploy)).Should(util.NotFoundMatcher{})
})
It("application with dag workflow failed after retries", func() {
ns := corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "dag-failed-after-retries",
},
}
Expect(k8sClient.Create(ctx, &ns)).Should(BeNil())
app := &v1beta1.Application{
TypeMeta: metav1.TypeMeta{
Kind: "Application",
APIVersion: "core.oam.dev/v1beta1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "dag-failed-after-retries",
Namespace: ns.Name,
},
Spec: v1beta1.ApplicationSpec{
Components: []common.ApplicationComponent{
{
Name: "myweb1",
Type: "worker",
Properties: &runtime.RawExtension{Raw: []byte(`{"cmd":["sleep","1000"],"image":"busybox"}`)},
},
{
Name: "failed-step",
Type: "k8s-objects",
Properties: &runtime.RawExtension{Raw: []byte(`{"objects":[{"apiVersion":"v1","kind":"invalid","metadata":{"name":"test1"}}]}`)},
},
},
},
}
Expect(k8sClient.Create(ctx, app)).Should(BeNil())
appKey := types.NamespacedName{Namespace: ns.Name, Name: app.Name}
testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey})
testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey})
checkApp := &v1beta1.Application{}
Expect(k8sClient.Get(ctx, appKey, checkApp)).Should(BeNil())
Expect(checkApp.Status.Phase).Should(BeEquivalentTo(common.ApplicationRunningWorkflow))
Expect(checkApp.Status.Workflow.Message).Should(BeEquivalentTo(workflow.MessageInitializingWorkflow))
By("verify the first twenty reconciles")
for i := 0; i < custom.MaxErrorTimes; i++ {
testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey})
Expect(k8sClient.Get(ctx, appKey, checkApp)).Should(BeNil())
Expect(checkApp.Status.Phase).Should(BeEquivalentTo(common.ApplicationRunningWorkflow))
Expect(checkApp.Status.Workflow.Message).Should(BeEquivalentTo(string(common.WorkflowStateExecuting)))
Expect(checkApp.Status.Workflow.Steps[1].Phase).Should(BeEquivalentTo(common.WorkflowStepPhaseFailed))
}
By("application should be suspended after failed twenty reconciles")
testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey})
Expect(k8sClient.Get(ctx, appKey, checkApp)).Should(BeNil())
Expect(checkApp.Status.Phase).Should(BeEquivalentTo(common.ApplicationWorkflowSuspending))
Expect(checkApp.Status.Workflow.Message).Should(BeEquivalentTo(workflow.MessageFailedAfterRetries))
Expect(checkApp.Status.Workflow.Steps[1].Phase).Should(BeEquivalentTo(common.WorkflowStepPhaseFailed))
By("resume the suspended application")
Expect(k8sClient.Get(ctx, appKey, checkApp)).Should(BeNil())
checkApp.Status.Workflow.Suspend = false
Expect(k8sClient.Status().Patch(ctx, checkApp, client.Merge)).Should(BeNil())
testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey})
Expect(k8sClient.Get(ctx, appKey, checkApp)).Should(BeNil())
Expect(checkApp.Status.Phase).Should(BeEquivalentTo(common.ApplicationRunningWorkflow))
Expect(checkApp.Status.Workflow.Message).Should(BeEquivalentTo(string(common.WorkflowStateExecuting)))
Expect(checkApp.Status.Workflow.Steps[1].Phase).Should(BeEquivalentTo(common.WorkflowStepPhaseFailed))
By("test failed-after-retries with running steps")
compDef, err := yaml.YAMLToJSON([]byte(unhealthyComponentDefYaml))
Expect(err).Should(BeNil())
component := &v1beta1.ComponentDefinition{}
component.Spec.Extension = &runtime.RawExtension{Raw: compDef}
Expect(json.Unmarshal([]byte(compDef), component)).Should(BeNil())
Expect(k8sClient.Create(ctx, component)).Should(SatisfyAny(BeNil(), &util.AlreadyExistMatcher{}))
checkApp.Spec.Components[0] = common.ApplicationComponent{
Name: "unhealthy-worker",
Type: "unhealthy-worker",
Properties: &runtime.RawExtension{Raw: []byte(`{"cmd":["sleep","1000"],"image":"busybox"}`)},
}
Expect(k8sClient.Update(ctx, checkApp)).Should(BeNil())
testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey})
testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey})
testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey})
Expect(k8sClient.Get(ctx, appKey, checkApp)).Should(BeNil())
Expect(checkApp.Status.Phase).Should(BeEquivalentTo(common.ApplicationRunningWorkflow))
for i := 0; i < custom.MaxErrorTimes+1; i++ {
testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey})
Expect(k8sClient.Get(ctx, appKey, checkApp)).Should(BeNil())
Expect(checkApp.Status.Phase).Should(BeEquivalentTo(common.ApplicationRunningWorkflow))
Expect(checkApp.Status.Workflow.Message).Should(BeEquivalentTo(string(common.WorkflowStateExecuting)))
Expect(checkApp.Status.Workflow.Steps[0].Phase).Should(BeEquivalentTo(common.WorkflowStepPhaseRunning))
Expect(checkApp.Status.Workflow.Steps[1].Phase).Should(BeEquivalentTo(common.WorkflowStepPhaseFailed))
}
})
It("application with step by step workflow failed after retries", func() {
ns := corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "step-by-step-failed-after-retries",
},
}
Expect(k8sClient.Create(ctx, &ns)).Should(BeNil())
app := &v1beta1.Application{
TypeMeta: metav1.TypeMeta{
Kind: "Application",
APIVersion: "core.oam.dev/v1beta1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "step-by-step-failed-after-retries",
Namespace: ns.Name,
},
Spec: v1beta1.ApplicationSpec{
Components: []common.ApplicationComponent{
{
Name: "myweb1",
Type: "worker",
Properties: &runtime.RawExtension{Raw: []byte(`{"cmd":["sleep","1000"],"image":"busybox"}`)},
},
{
Name: "failed-step",
Type: "k8s-objects",
Properties: &runtime.RawExtension{Raw: []byte(`{"objects":[{"apiVersion":"v1","kind":"invalid","metadata":{"name":"test1"}}]}`)},
},
},
Workflow: &v1beta1.Workflow{
Steps: []v1beta1.WorkflowStep{
{
Name: "myweb1",
Type: "apply-component",
Properties: &runtime.RawExtension{Raw: []byte(`{"component":"myweb1"}`)},
},
{
Name: "failed-step",
Type: "apply-component",
Properties: &runtime.RawExtension{Raw: []byte(`{"component":"failed-step"}`)},
},
},
},
},
}
Expect(k8sClient.Create(ctx, app)).Should(BeNil())
appKey := types.NamespacedName{Namespace: ns.Name, Name: app.Name}
testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey})
testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey})
checkApp := &v1beta1.Application{}
Expect(k8sClient.Get(ctx, appKey, checkApp)).Should(BeNil())
Expect(checkApp.Status.Phase).Should(BeEquivalentTo(common.ApplicationRunningWorkflow))
Expect(checkApp.Status.Workflow.Message).Should(BeEquivalentTo(workflow.MessageInitializingWorkflow))
By("verify the first twenty reconciles")
for i := 0; i < custom.MaxErrorTimes; i++ {
testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey})
Expect(k8sClient.Get(ctx, appKey, checkApp)).Should(BeNil())
Expect(checkApp.Status.Phase).Should(BeEquivalentTo(common.ApplicationRunningWorkflow))
Expect(checkApp.Status.Workflow.Message).Should(BeEquivalentTo(string(common.WorkflowStateExecuting)))
Expect(checkApp.Status.Workflow.Steps[1].Phase).Should(BeEquivalentTo(common.WorkflowStepPhaseFailed))
}
By("application should be suspended after failed twenty reconciles")
testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey})
Expect(k8sClient.Get(ctx, appKey, checkApp)).Should(BeNil())
Expect(checkApp.Status.Phase).Should(BeEquivalentTo(common.ApplicationWorkflowSuspending))
Expect(checkApp.Status.Workflow.Message).Should(BeEquivalentTo(workflow.MessageFailedAfterRetries))
Expect(checkApp.Status.Workflow.Steps[1].Phase).Should(BeEquivalentTo(common.WorkflowStepPhaseFailed))
By("resume the suspended application")
Expect(k8sClient.Get(ctx, appKey, checkApp)).Should(BeNil())
checkApp.Status.Workflow.Suspend = false
Expect(k8sClient.Status().Patch(ctx, checkApp, client.Merge)).Should(BeNil())
testutil.ReconcileOnce(reconciler, reconcile.Request{NamespacedName: appKey})
Expect(k8sClient.Get(ctx, appKey, checkApp)).Should(BeNil())
Expect(checkApp.Status.Phase).Should(BeEquivalentTo(common.ApplicationRunningWorkflow))
Expect(checkApp.Status.Workflow.Message).Should(BeEquivalentTo(string(common.WorkflowStateExecuting)))
Expect(checkApp.Status.Workflow.Steps[1].Phase).Should(BeEquivalentTo(common.WorkflowStepPhaseFailed))
})
It("application with input/output run as dag workflow", func() {
ns := corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
@@ -2248,6 +2440,71 @@ spec:
}
`
unhealthyComponentDefYaml = `
apiVersion: core.oam.dev/v1beta1
kind: ComponentDefinition
metadata:
name: unhealthy-worker
namespace: vela-system
annotations:
definition.oam.dev/description: "Long-running scalable backend worker without network endpoint"
spec:
workload:
definition:
apiVersion: apps/v1
kind: Deployment
extension:
template: |
output: {
apiVersion: "apps/v1"
kind: "Deployment"
metadata: {
annotations: {
if context["config"] != _|_ {
for _, v in context.config {
"\(v.name)" : v.value
}
}
}
}
spec: {
selector: matchLabels: {
"app.oam.dev/component": context.name
}
template: {
metadata: labels: {
"app.oam.dev/component": context.name
}
spec: {
containers: [{
name: context.name
image: parameter.image
if parameter["cmd"] != _|_ {
command: parameter.cmd
}
}]
}
}
selector:
matchLabels:
"app.oam.dev/component": context.name
}
}
parameter: {
// +usage=Which image would you like to use for your service
// +short=i
image: string
cmd?: [...string]
}
status:
healthPolicy: |-
isHealth: false
`
wDImportYaml = `
apiVersion: core.oam.dev/v1beta1
kind: WorkloadDefinition
@@ -2991,6 +3248,29 @@ spec:
component: string
}
`
k8sObjectsComponentDefinitionYaml = `
apiVersion: core.oam.dev/v1beta1
kind: ComponentDefinition
metadata:
annotations:
definition.oam.dev/description: K8s-objects allow users to specify raw K8s objects in properties
name: k8s-objects
namespace: vela-system
spec:
schematic:
cue:
template: |
output: parameter.objects[0]
outputs: {
for i, v in parameter.objects {
if i > 0 {
"objects-\(i)": v
}
}
}
parameter: objects: [...{}]
`
)
func newMockHTTP() *httptest.Server {

View File

@@ -134,6 +134,7 @@ var _ = Describe("Test Workflow", func() {
// first try to add finalizer
tryReconcile(reconciler, appWithPolicy.Name, appWithPolicy.Namespace)
tryReconcile(reconciler, appWithPolicy.Name, appWithPolicy.Namespace)
tryReconcile(reconciler, appWithPolicy.Name, appWithPolicy.Namespace)
deploy := &appsv1.Deployment{}
Expect(k8sClient.Get(ctx, client.ObjectKey{
@@ -188,6 +189,7 @@ var _ = Describe("Test Workflow", func() {
// first try to add finalizer
tryReconcile(reconciler, appWithWorkflow.Name, appWithWorkflow.Namespace)
tryReconcile(reconciler, appWithWorkflow.Name, appWithWorkflow.Namespace)
tryReconcile(reconciler, appWithWorkflow.Name, appWithWorkflow.Namespace)
// check resource created
stepObj := &unstructured.Unstructured{}
@@ -245,6 +247,7 @@ var _ = Describe("Test Workflow", func() {
// first try to add finalizer
tryReconcile(reconciler, suspendApp.Name, suspendApp.Namespace)
tryReconcile(reconciler, suspendApp.Name, suspendApp.Namespace)
tryReconcile(reconciler, suspendApp.Name, suspendApp.Namespace)
appObj := &oamcore.Application{}
Expect(k8sClient.Get(ctx, client.ObjectKey{
@@ -297,6 +300,7 @@ var _ = Describe("Test Workflow", func() {
// first try to add finalizer
tryReconcile(reconciler, suspendApp.Name, suspendApp.Namespace)
tryReconcile(reconciler, suspendApp.Name, suspendApp.Namespace)
tryReconcile(reconciler, suspendApp.Name, suspendApp.Namespace)
appObj := &oamcore.Application{}
Expect(k8sClient.Get(ctx, client.ObjectKey{

View File

@@ -64,6 +64,9 @@ func ReconcileOnceAfterFinalizer(r reconcile.Reconciler, req reconcile.Request)
if result, err := r.Reconcile(context.TODO(), req); err != nil {
return result, err
}
if result, err := r.Reconcile(context.TODO(), req); err != nil {
return result, err
}
return r.Reconcile(context.TODO(), req)
}

View File

@@ -71,6 +71,29 @@ func (c ViewContext) SetVar(v *value.Value, paths ...string) error {
return c.vars.Error()
}
// GetStore get configmap of workflow context.
func (c ViewContext) GetStore() *corev1.ConfigMap {
return nil
}
// GetMutableValue get mutable data from workflow context.
func (c ViewContext) GetMutableValue(paths ...string) string {
return ""
}
// SetMutableValue set mutable data in workflow context config map.
func (c ViewContext) SetMutableValue(data string, paths ...string) {
}
// IncreaseMutableCountValue increase mutable count in workflow context.
func (c ViewContext) IncreaseMutableCountValue(paths ...string) int {
return 0
}
// DeleteMutableValue delete mutable data in workflow context.
func (c ViewContext) DeleteMutableValue(paths ...string) {
}
// Commit the workflow context and persist it's content.
func (c ViewContext) Commit() error {
return errors.New("not support func Commit")

View File

@@ -20,6 +20,8 @@ import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
"cuelang.org/go/cue"
@@ -49,7 +51,7 @@ const (
// WorkflowContext is workflow context.
type WorkflowContext struct {
cli client.Client
store corev1.ConfigMap
store *corev1.ConfigMap
components map[string]*ComponentManifest
vars *value.Value
modified bool
@@ -103,6 +105,48 @@ func (wf *WorkflowContext) SetVar(v *value.Value, paths ...string) error {
return nil
}
// GetStore get configmap of workflow context.
func (wf *WorkflowContext) GetStore() *corev1.ConfigMap {
return wf.store
}
// GetMutableValue get mutable data from workflow context.
func (wf *WorkflowContext) GetMutableValue(paths ...string) string {
return wf.store.Data[strings.Join(paths, ".")]
}
// SetMutableValue set mutable data in workflow context config map.
func (wf *WorkflowContext) SetMutableValue(data string, paths ...string) {
wf.store.Data[strings.Join(paths, ".")] = data
wf.modified = true
}
// IncreaseMutableCountValue increase mutable count in workflow context.
func (wf *WorkflowContext) IncreaseMutableCountValue(paths ...string) int {
c := wf.GetMutableValue(paths...)
if c == "" {
wf.SetMutableValue("0", paths...)
return 0
}
count, err := strconv.Atoi(c)
if err != nil {
wf.SetMutableValue("0", paths...)
return 0
}
count++
wf.SetMutableValue(strconv.Itoa(count), paths...)
return count
}
// DeleteMutableValue delete mutable data in workflow context.
func (wf *WorkflowContext) DeleteMutableValue(paths ...string) {
key := strings.Join(paths, ".")
if _, ok := wf.store.Data[key]; ok {
delete(wf.store.Data, strings.Join(paths, "."))
wf.modified = true
}
}
// MakeParameter make 'value' with interface{}
func (wf *WorkflowContext) MakeParameter(parameter interface{}) (*value.Value, error) {
var s = "{}"
@@ -145,18 +189,19 @@ func (wf *WorkflowContext) writeToStore() error {
jsonObject[name] = s
}
wf.store.Data = map[string]string{
ConfigMapKeyComponents: string(util.MustJSONMarshal(jsonObject)),
ConfigMapKeyVars: varStr,
if wf.store.Data == nil {
wf.store.Data = make(map[string]string)
}
wf.store.Data[ConfigMapKeyComponents] = string(util.MustJSONMarshal(jsonObject))
wf.store.Data[ConfigMapKeyVars] = varStr
return nil
}
func (wf *WorkflowContext) sync() error {
ctx := context.Background()
if err := wf.cli.Update(ctx, &wf.store); err != nil {
if err := wf.cli.Update(ctx, wf.store); err != nil {
if kerrors.IsNotFound(err) {
return wf.cli.Create(ctx, &wf.store)
return wf.cli.Create(ctx, wf.store)
}
return err
}
@@ -300,7 +345,7 @@ func newContext(cli client.Client, ns, app string, appUID types.UID) (*WorkflowC
}
wfCtx := &WorkflowContext{
cli: cli,
store: store,
store: &store,
components: map[string]*ComponentManifest{},
modified: true,
}
@@ -321,7 +366,7 @@ func LoadContext(cli client.Client, ns, app string) (Context, error) {
}
ctx := &WorkflowContext{
cli: cli,
store: store,
store: &store,
}
if err := ctx.LoadFromConfigMap(store); err != nil {
return nil, err
@@ -329,6 +374,7 @@ func LoadContext(cli client.Client, ns, app string) (Context, error) {
return ctx, nil
}
// generateStoreName generates the config map name of workflow context.
func generateStoreName(app string) string {
return fmt.Sprintf("workflow-%s-context", app)
}

View File

@@ -22,8 +22,8 @@ import (
"testing"
"github.com/crossplane/crossplane-runtime/pkg/test"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"
"gotest.tools/assert"
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -34,16 +34,17 @@ import (
func TestComponent(t *testing.T) {
wfCtx := newContextForTest(t)
r := require.New(t)
_, err := wfCtx.GetComponent("expected-not-found")
assert.Equal(t, err != nil, true)
r.Equal(err.Error(), "component expected-not-found not found in application")
cmf, err := wfCtx.GetComponent("server")
assert.NilError(t, err)
r.NoError(err)
components := wfCtx.GetComponents()
_, ok := components["server"]
assert.Equal(t, ok, true)
r.Equal(ok, true)
assert.Equal(t, cmf.Workload.String(), `apiVersion: "v1"
r.Equal(cmf.Workload.String(), `apiVersion: "v1"
kind: "Pod"
metadata: {
labels: {
@@ -66,8 +67,8 @@ spec: {
}, ...]
}
`)
assert.Equal(t, len(cmf.Auxiliaries), 1)
assert.Equal(t, cmf.Auxiliaries[0].String(), `apiVersion: "v1"
r.Equal(len(cmf.Auxiliaries), 1)
r.Equal(cmf.Auxiliaries[0].String(), `apiVersion: "v1"
kind: "Service"
metadata: {
name: "my-service"
@@ -89,13 +90,13 @@ spec: containers: [{
// +patchKey=name
env:[{name: "ClusterIP",value: "1.1.1.1"}]}]
`, nil, "")
assert.NilError(t, err)
r.NoError(err)
err = wfCtx.PatchComponent("server", pv)
assert.NilError(t, err)
r.NoError(err)
cmf, err = wfCtx.GetComponent("server")
assert.NilError(t, err)
assert.Equal(t, cmf.Workload.String(), `apiVersion: "v1"
r.NoError(err)
r.Equal(cmf.Workload.String(), `apiVersion: "v1"
kind: "Pod"
metadata: {
labels: {
@@ -124,15 +125,15 @@ spec: {
`)
err = wfCtx.writeToStore()
assert.NilError(t, err)
r.NoError(err)
expected, err := yaml.Marshal(wfCtx.components)
assert.NilError(t, err)
r.NoError(err)
err = wfCtx.LoadFromConfigMap(wfCtx.store)
assert.NilError(t, err)
err = wfCtx.LoadFromConfigMap(*wfCtx.store)
r.NoError(err)
componentsYaml, err := yaml.Marshal(wfCtx.components)
assert.NilError(t, err)
assert.Equal(t, string(expected), string(componentsYaml))
r.NoError(err)
r.Equal(string(expected), string(componentsYaml))
}
func TestVars(t *testing.T) {
@@ -167,49 +168,53 @@ result: 101
},
}
for _, tCase := range testCases {
r := require.New(t)
val, err := value.NewValue(tCase.variable, nil, "")
assert.NilError(t, err)
r.NoError(err)
input, err := val.LookupValue("input")
assert.NilError(t, err)
r.NoError(err)
err = wfCtx.SetVar(input, tCase.paths...)
assert.NilError(t, err)
r.NoError(err)
result, err := wfCtx.GetVar(tCase.paths...)
assert.NilError(t, err)
r.NoError(err)
rStr, err := result.String()
assert.NilError(t, err)
assert.Equal(t, rStr, tCase.expected)
r.NoError(err)
r.Equal(rStr, tCase.expected)
}
r := require.New(t)
param, err := wfCtx.MakeParameter(map[string]interface{}{
"name": "foo",
})
assert.NilError(t, err)
r.NoError(err)
mark, err := wfCtx.GetVar("football")
assert.NilError(t, err)
r.NoError(err)
err = param.FillObject(mark)
assert.NilError(t, err)
r.NoError(err)
rStr, err := param.String()
assert.NilError(t, err)
assert.Equal(t, rStr, `name: "foo"
r.NoError(err)
r.Equal(rStr, `name: "foo"
score: 100
result: 101
`)
conflictV, _ := value.NewValue(`score: 101`, nil, "")
conflictV, err := value.NewValue(`score: 101`, nil, "")
r.NoError(err)
err = wfCtx.SetVar(conflictV, "football")
assert.Equal(t, err != nil, true)
r.Equal(err.Error(), "football.result: conflicting values 100 and 101")
}
func TestRefObj(t *testing.T) {
wfCtx := new(WorkflowContext)
wfCtx.store = corev1.ConfigMap{}
wfCtx.store = &corev1.ConfigMap{}
wfCtx.store.APIVersion = "v1"
wfCtx.store.Kind = "ConfigMap"
wfCtx.store.Name = "app-v1"
ref := wfCtx.StoreRef()
assert.Equal(t, *ref, corev1.ObjectReference{
r := require.New(t)
r.Equal(*ref, corev1.ObjectReference{
APIVersion: "v1",
Kind: "ConfigMap",
Name: "app-v1",
@@ -217,8 +222,73 @@ func TestRefObj(t *testing.T) {
}
func TestContext(t *testing.T) {
var wfCm *corev1.ConfigMap
cli := &test.MockClient{
cli := newCliForTest(t, nil)
r := require.New(t)
wfCtx, err := NewContext(cli, "default", "app-v1", "testuid")
r.NoError(err)
err = wfCtx.Commit()
r.NoError(err)
wfCtx, err = LoadContext(cli, "default", "app-v1")
r.NoError(err)
err = wfCtx.Commit()
r.NoError(err)
cli = newCliForTest(t, nil)
_, err = LoadContext(cli, "default", "app-v1")
r.Equal(err.Error(), `configMap "workflow-app-v1-context" not found`)
wfCtx, err = NewContext(cli, "default", "app-v1", "testuid")
r.NoError(err)
r.Equal(len(wfCtx.GetComponents()), 0)
_, err = wfCtx.GetComponent("server")
r.Equal(err.Error(), "component server not found in application")
}
func TestGetStore(t *testing.T) {
cli := newCliForTest(t, nil)
r := require.New(t)
wfCtx, err := NewContext(cli, "default", "app-v1", "testuid")
r.NoError(err)
err = wfCtx.Commit()
r.NoError(err)
store := wfCtx.GetStore()
r.Equal(store.Name, "workflow-app-v1-context")
}
func TestMutableValue(t *testing.T) {
cli := newCliForTest(t, nil)
r := require.New(t)
wfCtx, err := NewContext(cli, "default", "app-v1", "testuid")
r.NoError(err)
err = wfCtx.Commit()
r.NoError(err)
wfCtx.SetMutableValue("value", "test", "key")
v := wfCtx.GetMutableValue("test", "key")
r.Equal(v, "value")
wfCtx.DeleteMutableValue("test", "key")
v = wfCtx.GetMutableValue("test", "key")
r.Equal(v, "")
wfCtx.SetMutableValue("value", "test", "key")
count := wfCtx.IncreaseMutableCountValue("test", "key")
r.Equal(count, 0)
count = wfCtx.IncreaseMutableCountValue("notfound", "key")
r.Equal(count, 0)
wfCtx.SetMutableValue("10", "number", "key")
count = wfCtx.IncreaseMutableCountValue("number", "key")
r.Equal(count, 11)
}
func newCliForTest(t *testing.T, wfCm *corev1.ConfigMap) *test.MockClient {
r := require.New(t)
return &test.MockClient{
MockGet: func(ctx context.Context, key client.ObjectKey, obj client.Object) error {
o, ok := obj.(*corev1.ConfigMap)
if ok {
@@ -226,9 +296,9 @@ func TestContext(t *testing.T) {
case "app-v1":
var cm corev1.ConfigMap
testCaseJson, err := yamlUtil.YAMLToJSON([]byte(testCaseYaml))
assert.NilError(t, err)
r.NoError(err)
err = json.Unmarshal(testCaseJson, &cm)
assert.NilError(t, err)
r.NoError(err)
*o = cm
return nil
case generateStoreName("app-v1"):
@@ -258,38 +328,21 @@ func TestContext(t *testing.T) {
return nil
},
}
wfCtx, err := NewContext(cli, "default", "app-v1", "testuid")
assert.NilError(t, err)
err = wfCtx.Commit()
assert.NilError(t, err)
wfCtx, err = LoadContext(cli, "default", "app-v1")
assert.NilError(t, err)
err = wfCtx.Commit()
assert.NilError(t, err)
wfCm = nil
_, err = LoadContext(cli, "default", "app-v1")
assert.Equal(t, err != nil, true)
wfCtx, err = NewContext(cli, "default", "app-v1", "testuid")
assert.NilError(t, err)
assert.Equal(t, len(wfCtx.GetComponents()), 0)
_, err = wfCtx.GetComponent("server")
assert.Equal(t, err != nil, true)
}
func newContextForTest(t *testing.T) *WorkflowContext {
r := require.New(t)
var cm corev1.ConfigMap
testCaseJson, err := yamlUtil.YAMLToJSON([]byte(testCaseYaml))
assert.NilError(t, err)
r.NoError(err)
err = json.Unmarshal(testCaseJson, &cm)
assert.NilError(t, err)
r.NoError(err)
wfCtx := new(WorkflowContext)
wfCtx := &WorkflowContext{
store: &cm,
}
err = wfCtx.LoadFromConfigMap(cm)
assert.NilError(t, err)
r.NoError(err)
return wfCtx
}

View File

@@ -29,6 +29,11 @@ type Context interface {
PatchComponent(name string, patchValue *value.Value) error
GetVar(paths ...string) (*value.Value, error)
SetVar(v *value.Value, paths ...string) error
GetStore() *corev1.ConfigMap
GetMutableValue(path ...string) string
SetMutableValue(data string, path ...string)
IncreaseMutableCountValue(paths ...string) int
DeleteMutableValue(paths ...string)
Commit() error
MakeParameter(parameter interface{}) (*value.Value, error)
StoreRef() *corev1.ObjectReference

View File

@@ -16,9 +16,11 @@ limitations under the License.
package workflow
import (
"time"
"github.com/oam-dev/kubevela/apis/core.oam.dev/common"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
"github.com/oam-dev/kubevela/pkg/monitor/context"
monitorContext "github.com/oam-dev/kubevela/pkg/monitor/context"
"github.com/oam-dev/kubevela/pkg/workflow/types"
)
@@ -26,8 +28,14 @@ import (
type Workflow interface {
// ExecuteSteps executes the steps of an Application with given steps of rendered resources.
// It returns done=true only if all steps are executed and succeeded.
ExecuteSteps(ctx context.Context, appRev *v1beta1.ApplicationRevision, taskRunners []types.TaskRunner) (state common.WorkflowState, err error)
ExecuteSteps(ctx monitorContext.Context, appRev *v1beta1.ApplicationRevision, taskRunners []types.TaskRunner) (state common.WorkflowState, err error)
// Trace record workflow state in controllerRevision.
Trace() error
// CleanupCountersInContext cleans up the temporary counters in workflow context.
CleanupCountersInContext(ctx monitorContext.Context)
// GetBackoffWaitTime returns the wait time for next retry.
GetBackoffWaitTime() time.Duration
}

View File

@@ -22,10 +22,6 @@ import (
"fmt"
"strings"
monitorContext "github.com/oam-dev/kubevela/pkg/monitor/context"
"github.com/oam-dev/kubevela/pkg/workflow/hooks"
"cuelang.org/go/cue"
"github.com/pkg/errors"
@@ -35,7 +31,9 @@ import (
"github.com/oam-dev/kubevela/pkg/cue/model/sets"
"github.com/oam-dev/kubevela/pkg/cue/model/value"
"github.com/oam-dev/kubevela/pkg/cue/packages"
monitorContext "github.com/oam-dev/kubevela/pkg/monitor/context"
wfContext "github.com/oam-dev/kubevela/pkg/workflow/context"
"github.com/oam-dev/kubevela/pkg/workflow/hooks"
"github.com/oam-dev/kubevela/pkg/workflow/providers"
wfTypes "github.com/oam-dev/kubevela/pkg/workflow/types"
)
@@ -55,6 +53,8 @@ const (
StatusReasonParameter = "ProcessParameter"
// StatusReasonOutput is the reason of the workflow progress condition which is Output.
StatusReasonOutput = "Output"
// MaxErrorTimes is the max times of the workflow progress condition which is Failed.
MaxErrorTimes = 10
)
// LoadTaskTemplate gets the workflowStep definition from cluster and resolve it.
@@ -160,6 +160,11 @@ func (t *TaskLoader) makeTaskGenerator(templ string) (wfTypes.TaskGenerator, err
tracer.Commit(string(exec.status().Phase))
}()
if exec.operation().FailedAfterRetries {
tracer.Info("failed after retries, skip this step")
return exec.status(), exec.operation(), nil
}
if t.runOptionsProcess != nil {
t.runOptionsProcess(options)
}
@@ -177,7 +182,7 @@ func (t *TaskLoader) makeTaskGenerator(templ string) (wfTypes.TaskGenerator, err
}
if err := paramsValue.Error(); err != nil {
exec.err(err, StatusReasonParameter)
exec.err(ctx, err, StatusReasonParameter)
return exec.status(), exec.operation(), nil
}
@@ -192,7 +197,7 @@ func (t *TaskLoader) makeTaskGenerator(templ string) (wfTypes.TaskGenerator, err
taskv, err := t.makeValue(ctx, strings.Join([]string{templ, paramFile}, "\n"), exec.wfStatus.ID)
if err != nil {
exec.err(err, StatusReasonRendering)
exec.err(ctx, err, StatusReasonRendering)
return exec.status(), exec.operation(), nil
}
@@ -203,13 +208,13 @@ func (t *TaskLoader) makeTaskGenerator(templ string) (wfTypes.TaskGenerator, err
}
if err := exec.doSteps(ctx, taskv); err != nil {
tracer.Error(err, "do steps")
exec.err(err, StatusReasonExecute)
exec.err(ctx, err, StatusReasonExecute)
return exec.status(), exec.operation(), nil
}
for _, hook := range options.PostStopHooks {
if err := hook(ctx, taskv, wfStep, exec.status().Phase); err != nil {
exec.err(err, StatusReasonOutput)
exec.err(ctx, err, StatusReasonOutput)
return exec.status(), exec.operation(), nil
}
}
@@ -237,10 +242,11 @@ func (t *TaskLoader) makeValue(ctx wfContext.Context, templ string, id string) (
type executor struct {
handlers providers.Providers
wfStatus common.WorkflowStepStatus
suspend bool
terminated bool
wait bool
wfStatus common.WorkflowStepStatus
suspend bool
terminated bool
failedAfterRetries bool
wait bool
tracer monitorContext.Context
}
@@ -269,16 +275,28 @@ func (exec *executor) Wait(message string) {
exec.wfStatus.Message = message
}
func (exec *executor) err(err error, reason string) {
func (exec *executor) err(ctx wfContext.Context, err error, reason string) {
exec.wait = true
exec.wfStatus.Phase = common.WorkflowStepPhaseFailed
exec.wfStatus.Message = err.Error()
exec.wfStatus.Reason = reason
exec.checkErrorTimes(ctx)
}
func (exec *executor) checkErrorTimes(ctx wfContext.Context) {
times := ctx.IncreaseMutableCountValue(wfTypes.ContextPrefixFailedTimes, exec.wfStatus.ID)
if times >= MaxErrorTimes {
exec.wait = false
exec.failedAfterRetries = true
}
}
func (exec *executor) operation() *wfTypes.Operation {
return &wfTypes.Operation{
Suspend: exec.suspend,
Terminated: exec.terminated,
Suspend: exec.suspend,
Terminated: exec.terminated,
Waiting: exec.wait,
FailedAfterRetries: exec.failedAfterRetries,
}
}

View File

@@ -22,9 +22,11 @@ import (
"fmt"
"testing"
"github.com/crossplane/crossplane-runtime/pkg/test"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/yaml"
"github.com/stretchr/testify/require"
@@ -183,8 +185,8 @@ close({
Name: "input-err",
Type: "ok",
Properties: &runtime.RawExtension{Raw: []byte(`
{"score": {"y": 101}}
`)},
{"score": {"y": 101}}
`)},
Inputs: common.StepInputs{{
From: "score",
ParameterKey: "score",
@@ -222,20 +224,40 @@ close({
Name: "err",
Type: "error",
},
{
Name: "failed-after-retries",
Type: "error",
},
}
for _, step := range steps {
gen, err := tasksLoader.GetTaskGenerator(context.Background(), step.Type)
r.NoError(err)
run, err := gen(step, &types.GeneratorOptions{})
r.NoError(err)
status, _, err := run.Run(wfCtx, &types.TaskRunOptions{})
status, operation, err := run.Run(wfCtx, &types.TaskRunOptions{})
switch step.Name {
case "input":
r.Equal(err != nil, true)
case "ouput", "output-var-conflict":
r.Equal(err.Error(), "do preStartHook: get input from [podIP]: var(path=podIP) not exist")
case "output", "output-var-conflict":
r.Equal(status.Reason, StatusReasonOutput)
r.Equal(operation.Waiting, true)
r.Equal(status.Phase, common.WorkflowStepPhaseFailed)
case "failed-after-retries":
newCtx := newWorkflowContextForTest(t)
for i := 0; i < MaxErrorTimes; i++ {
status, operation, err = run.Run(newCtx, &types.TaskRunOptions{})
r.NoError(err)
r.Equal(operation.Waiting, true)
r.Equal(operation.FailedAfterRetries, false)
r.Equal(status.Phase, common.WorkflowStepPhaseFailed)
}
status, operation, err = run.Run(newCtx, &types.TaskRunOptions{})
r.NoError(err)
r.Equal(operation.Waiting, false)
r.Equal(operation.FailedAfterRetries, true)
r.Equal(status.Phase, common.WorkflowStepPhaseFailed)
default:
r.Equal(operation.Waiting, true)
r.Equal(status.Phase, common.WorkflowStepPhaseFailed)
}
}
@@ -441,13 +463,25 @@ func newWorkflowContextForTest(t *testing.T) wfContext.Context {
err = json.Unmarshal(testCaseJson, &cm)
r.NoError(err)
wfCtx := new(wfContext.WorkflowContext)
err = wfCtx.LoadFromConfigMap(cm)
cli := &test.MockClient{
MockGet: func(ctx context.Context, key client.ObjectKey, obj client.Object) error {
o, ok := obj.(*corev1.ConfigMap)
if ok {
*o = cm
}
return nil
},
MockUpdate: func(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error {
return nil
},
}
wfCtx, err := wfContext.NewContext(cli, "default", "app-v1", "testuid")
r.NoError(err)
v, _ := value.NewValue(`name: "app"`, nil, "")
r.NoError(wfCtx.SetVar(v, types.ContextKeyMetadata))
return wfCtx
}
func mockLoadTemplate(_ context.Context, name string) (string, error) {
templ := `
parameter: {}

View File

@@ -55,8 +55,10 @@ type TaskPostStopHook func(ctx wfContext.Context, taskValue *value.Value, step v
// Operation is workflow operation object.
type Operation struct {
Suspend bool
Terminated bool
Suspend bool
Terminated bool
Waiting bool
FailedAfterRetries bool
}
// TaskGenerator will generate taskRunner.
@@ -79,4 +81,12 @@ type Action interface {
const (
// ContextKeyMetadata is key that refer to application metadata.
ContextKeyMetadata = "metadata__"
// ContextPrefixFailedTimes is the prefix that refer to the failed times of the step in workflow context config map.
ContextPrefixFailedTimes = "failed_times"
// ContextPrefixBackoffTimes is the prefix that refer to the backoff times in workflow context config map.
ContextPrefixBackoffTimes = "backoff_times"
// ContextKeyLastExecuteTime is the key that refer to the last execute time in workflow context config map.
ContextKeyLastExecuteTime = "last_execute_time"
// ContextKeyNextExecuteTime is the key that refer to the next execute time in workflow context config map.
ContextKeyNextExecuteTime = "next_execute_time"
)

View File

@@ -19,15 +19,17 @@ package workflow
import (
"encoding/json"
"fmt"
"math"
"strconv"
"strings"
"time"
"github.com/oam-dev/kubevela/apis/core.oam.dev/condition"
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/oam-dev/kubevela/apis/core.oam.dev/common"
"github.com/oam-dev/kubevela/apis/core.oam.dev/condition"
oamcore "github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
"github.com/oam-dev/kubevela/pkg/controller/utils"
"github.com/oam-dev/kubevela/pkg/cue/model/value"
@@ -40,9 +42,24 @@ import (
wfTypes "github.com/oam-dev/kubevela/pkg/workflow/types"
)
const (
// minWorkflowBackoffWaitTime is the min time to wait before reconcile workflow again
minWorkflowBackoffWaitTime = 1
// maxWorkflowBackoffWaitTime is the max time to wait before reconcile workflow again
maxWorkflowBackoffWaitTime = 600
// backoffTimeCoefficient is the coefficient of time to wait before reconcile workflow again
backoffTimeCoefficient = 0.05
// MessageFailedAfterRetries is the message of failed after retries
MessageFailedAfterRetries = "The workflow suspends automatically because the failed times of steps have reached the limit(10 times)"
// MessageInitializingWorkflow is the message of initializing workflow
MessageInitializingWorkflow = "Initializing workflow"
)
type workflow struct {
app *oamcore.Application
cli client.Client
wfCtx wfContext.Context
dagMode bool
}
@@ -82,6 +99,7 @@ func (w *workflow) ExecuteSteps(ctx monitorContext.Context, appRev *oamcore.Appl
Mode: common.WorkflowModeStep,
StartTime: metav1.Now(),
}
w.app.Status.Workflow.Message = MessageInitializingWorkflow
if w.dagMode {
w.app.Status.Workflow.Mode = common.WorkflowModeDAG
}
@@ -90,16 +108,17 @@ func (w *workflow) ExecuteSteps(ctx monitorContext.Context, appRev *oamcore.Appl
w.app.Status.AppliedResources = nil
// clean conditions after render
var reservedCondtions []condition.Condition
var reservedConditions []condition.Condition
for i, cond := range w.app.Status.Conditions {
condTpy, err := common.ParseApplicationConditionType(string(cond.Type))
if err == nil {
if condTpy < common.RenderCondition {
reservedCondtions = append(reservedCondtions, w.app.Status.Conditions[i])
if condTpy <= common.RenderCondition {
reservedConditions = append(reservedConditions, w.app.Status.Conditions[i])
}
}
}
w.app.Status.Conditions = reservedCondtions
w.app.Status.Conditions = reservedConditions
return common.WorkflowStateInitializing, nil
}
wfStatus := w.app.Status.Workflow
@@ -120,30 +139,41 @@ func (w *workflow) ExecuteSteps(ctx monitorContext.Context, appRev *oamcore.Appl
wfCtx, err := w.makeContext(w.app.Name)
if err != nil {
ctx.Error(err, "make context")
wfStatus.Message = string(common.WorkflowStateExecuting)
return common.WorkflowStateExecuting, err
}
w.wfCtx = wfCtx
w.checkDuplicateID(ctx)
e := &engine{
status: wfStatus,
dagMode: w.dagMode,
monitorCtx: ctx,
app: w.app,
wfCtx: wfCtx,
}
err = e.run(wfCtx, taskRunners)
err = e.run(taskRunners)
if err != nil {
ctx.Error(err, "run steps")
wfStatus.Message = string(common.WorkflowStateExecuting)
return common.WorkflowStateExecuting, err
}
e.checkWorkflowStatusMessage(wfStatus)
if wfStatus.Terminated {
w.CleanupCountersInContext(ctx)
return common.WorkflowStateTerminated, nil
}
if wfStatus.Suspend {
w.CleanupCountersInContext(ctx)
return common.WorkflowStateSuspended, nil
}
if w.allDone(taskRunners) {
wfStatus.Message = string(common.WorkflowStateSucceeded)
return common.WorkflowStateSucceeded, nil
}
wfStatus.Message = string(common.WorkflowStateExecuting)
return common.WorkflowStateExecuting, nil
}
@@ -156,6 +186,41 @@ func (w *workflow) Trace() error {
return recorder.With(w.cli, w.app).Save("", data).Limit(10).Error()
}
func (w *workflow) CleanupCountersInContext(ctx monitorContext.Context) {
ctxCM := w.wfCtx.GetStore()
for k := range ctxCM.Data {
if strings.HasPrefix(k, wfTypes.ContextPrefixFailedTimes) ||
strings.HasPrefix(k, wfTypes.ContextPrefixBackoffTimes) ||
strings.HasPrefix(k, wfTypes.ContextKeyLastExecuteTime) ||
strings.HasPrefix(k, wfTypes.ContextKeyNextExecuteTime) {
delete(ctxCM.Data, k)
}
}
if err := w.cli.Update(ctx, ctxCM); err != nil {
ctx.Error(err, "failed to update workflow context", "application", w.app.Name, "config map", ctxCM.Name)
}
}
func (w *workflow) GetBackoffWaitTime() time.Duration {
nextTime := w.wfCtx.GetMutableValue(wfTypes.ContextKeyNextExecuteTime)
if nextTime == "" {
return time.Second
}
unix, err := strconv.ParseInt(nextTime, 10, 64)
if err != nil {
return time.Second
}
next := time.Unix(unix, 0)
if next.After(time.Now()) {
return time.Until(next)
}
return time.Second
}
func (w *workflow) allDone(taskRunners []wfTypes.TaskRunner) bool {
status := w.app.Status.Workflow
for _, t := range taskRunners {
@@ -212,16 +277,85 @@ func (w *workflow) setMetadataToContext(wfCtx wfContext.Context) error {
return wfCtx.SetVar(metadata, wfTypes.ContextKeyMetadata)
}
func (e *engine) runAsDAG(wfCtx wfContext.Context, taskRunners []wfTypes.TaskRunner) error {
func (w *workflow) checkDuplicateID(ctx monitorContext.Context) {
if len(w.app.Status.Workflow.Steps) > 0 {
return
}
ctxCM := w.wfCtx.GetStore()
found := false
for k := range ctxCM.Data {
if strings.HasPrefix(k, wfTypes.ContextPrefixBackoffTimes) {
found = true
}
}
if found {
w.CleanupCountersInContext(ctx)
}
}
func getBackoffWaitTime(wfCtx wfContext.Context) int {
ctxCM := wfCtx.GetStore()
// the default value of min times reaches the max workflow backoff wait time
minTimes := 15
found := false
for k, v := range ctxCM.Data {
if strings.HasPrefix(k, wfTypes.ContextPrefixBackoffTimes) {
found = true
times, err := strconv.Atoi(v)
if err != nil {
times = 0
}
if times < minTimes {
minTimes = times
}
}
}
if !found {
return minWorkflowBackoffWaitTime
}
interval := math.Pow(2, float64(minTimes)) * backoffTimeCoefficient
if interval < minWorkflowBackoffWaitTime {
return minWorkflowBackoffWaitTime
}
if interval > maxWorkflowBackoffWaitTime {
return maxWorkflowBackoffWaitTime
}
return int(interval)
}
func (e *engine) setNextExecuteTime() {
interval := getBackoffWaitTime(e.wfCtx)
lastExecuteTime := e.wfCtx.GetMutableValue(wfTypes.ContextKeyLastExecuteTime)
if lastExecuteTime == "" {
e.monitorCtx.Error(fmt.Errorf("failed to get last execute time"), "application", e.app.Name)
}
last, err := strconv.ParseInt(lastExecuteTime, 10, 64)
if err != nil {
e.monitorCtx.Error(err, "failed to parse last execute time", "lastExecuteTime", lastExecuteTime)
}
next := last + int64(interval)
e.wfCtx.SetMutableValue(strconv.FormatInt(next, 10), wfTypes.ContextKeyNextExecuteTime)
if err := e.wfCtx.Commit(); err != nil {
e.monitorCtx.Error(err, "failed to commit next execute time", "nextExecuteTime", next)
}
}
func (e *engine) runAsDAG(taskRunners []wfTypes.TaskRunner) error {
var (
todoTasks []wfTypes.TaskRunner
pendingTasks []wfTypes.TaskRunner
)
wfCtx := e.wfCtx
done := true
for _, tRunner := range taskRunners {
ready := false
var stepID string
for _, ss := range e.status.Steps {
if ss.Name == tRunner.Name() {
stepID = ss.ID
ready = ss.Phase == common.WorkflowStepPhaseSucceeded
break
}
@@ -233,6 +367,8 @@ func (e *engine) runAsDAG(wfCtx wfContext.Context, taskRunners []wfTypes.TaskRun
continue
}
todoTasks = append(todoTasks, tRunner)
} else {
wfCtx.DeleteMutableValue(wfTypes.ContextPrefixBackoffTimes, stepID)
}
}
if done {
@@ -240,28 +376,47 @@ func (e *engine) runAsDAG(wfCtx wfContext.Context, taskRunners []wfTypes.TaskRun
}
if len(todoTasks) > 0 {
err := e.steps(wfCtx, todoTasks)
err := e.steps(todoTasks)
if err != nil {
return err
}
if e.needStop() {
return nil
}
if len(pendingTasks) > 0 {
return e.runAsDAG(wfCtx, pendingTasks)
return e.runAsDAG(pendingTasks)
}
}
return nil
}
func (e *engine) run(wfCtx wfContext.Context, taskRunners []wfTypes.TaskRunner) error {
func (e *engine) run(taskRunners []wfTypes.TaskRunner) error {
var err error
if e.dagMode {
return e.runAsDAG(wfCtx, taskRunners)
err = e.runAsDAG(taskRunners)
} else {
err = e.steps(e.todoByIndex(taskRunners))
}
return e.steps(wfCtx, e.todoByIndex(taskRunners))
e.setNextExecuteTime()
return err
}
func (e *engine) checkWorkflowStatusMessage(wfStatus *common.WorkflowStatus) {
if !e.waiting && e.failedAfterRetries {
e.status.Message = MessageFailedAfterRetries
return
}
if wfStatus.Terminated {
e.status.Message = string(common.WorkflowStateTerminated)
}
if wfStatus.Suspend {
e.status.Message = string(common.WorkflowStateSuspended)
}
}
func (e *engine) todoByIndex(taskRunners []wfTypes.TaskRunner) []wfTypes.TaskRunner {
@@ -279,7 +434,8 @@ func (e *engine) todoByIndex(taskRunners []wfTypes.TaskRunner) []wfTypes.TaskRun
return taskRunners[index:]
}
func (e *engine) steps(wfCtx wfContext.Context, taskRunners []wfTypes.TaskRunner) error {
func (e *engine) steps(taskRunners []wfTypes.TaskRunner) error {
wfCtx := e.wfCtx
for _, runner := range taskRunners {
status, operation, err := runner.Run(wfCtx, &wfTypes.TaskRunOptions{
GetTracer: func(id string, stepStatus oamcore.WorkflowStep) monitorContext.Context {
@@ -294,16 +450,23 @@ func (e *engine) steps(wfCtx wfContext.Context, taskRunners []wfTypes.TaskRunner
e.updateStepStatus(status)
if err := wfCtx.Commit(); err != nil {
return errors.WithMessage(err, "commit workflow context")
}
e.failedAfterRetries = e.failedAfterRetries || operation.FailedAfterRetries
e.waiting = e.waiting || operation.Waiting
if status.Phase != common.WorkflowStepPhaseSucceeded {
wfCtx.IncreaseMutableCountValue(wfTypes.ContextPrefixBackoffTimes, status.ID)
if err := wfCtx.Commit(); err != nil {
return errors.WithMessage(err, "commit workflow context")
}
if e.isDag() {
continue
}
e.checkFailedAfterRetries()
return nil
}
wfCtx.DeleteMutableValue(wfTypes.ContextPrefixBackoffTimes, status.ID)
if err := wfCtx.Commit(); err != nil {
return errors.WithMessage(err, "commit workflow context")
}
e.finishStep(operation)
if e.needStop() {
@@ -314,10 +477,13 @@ func (e *engine) steps(wfCtx wfContext.Context, taskRunners []wfTypes.TaskRunner
}
type engine struct {
dagMode bool
status *common.WorkflowStatus
monitorCtx monitorContext.Context
app *oamcore.Application
dagMode bool
failedAfterRetries bool
waiting bool
status *common.WorkflowStatus
monitorCtx monitorContext.Context
wfCtx wfContext.Context
app *oamcore.Application
}
func (e *engine) isDag() bool {
@@ -336,6 +502,8 @@ func (e *engine) updateStepStatus(status common.WorkflowStepStatus) {
conditionUpdated bool
now = metav1.NewTime(time.Now())
)
e.wfCtx.SetMutableValue(strconv.FormatInt(now.Unix(), 10), wfTypes.ContextKeyLastExecuteTime)
status.LastExecuteTime = now
for i := range e.status.Steps {
if e.status.Steps[i].Name == status.Name {
@@ -351,7 +519,14 @@ func (e *engine) updateStepStatus(status common.WorkflowStepStatus) {
}
}
func (e *engine) checkFailedAfterRetries() {
if !e.waiting && e.failedAfterRetries {
e.status.Suspend = true
}
}
func (e *engine) needStop() bool {
e.checkFailedAfterRetries()
return e.status.Suspend || e.status.Terminated
}

View File

@@ -19,6 +19,7 @@ package workflow
import (
"context"
"encoding/json"
"math"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@@ -71,6 +72,9 @@ var _ = Describe("Test Workflow", func() {
wf := NewWorkflow(app, k8sClient, common.WorkflowModeStep)
state, err := wf.ExecuteSteps(ctx, revision, runners)
Expect(err).ToNot(HaveOccurred())
Expect(state).Should(BeEquivalentTo(common.WorkflowStateInitializing))
state, err = wf.ExecuteSteps(ctx, revision, runners)
Expect(err).ToNot(HaveOccurred())
Expect(state).Should(BeEquivalentTo(common.WorkflowStateExecuting))
workflowStatus := app.Status.Workflow
Expect(workflowStatus.ContextBackend.Name).Should(BeEquivalentTo("workflow-" + app.Name + "-context"))
@@ -79,6 +83,7 @@ var _ = Describe("Test Workflow", func() {
Expect(cmp.Diff(*workflowStatus, common.WorkflowStatus{
AppRevision: workflowStatus.AppRevision,
Mode: common.WorkflowModeStep,
Message: string(common.WorkflowStateExecuting),
Steps: []common.WorkflowStepStatus{{
Name: "s1",
Type: "success",
@@ -113,12 +118,16 @@ var _ = Describe("Test Workflow", func() {
app.Status.Workflow.Finished = true
state, err = wf.ExecuteSteps(ctx, revision, runners)
Expect(err).ToNot(HaveOccurred())
Expect(state).Should(BeEquivalentTo(common.WorkflowStateInitializing))
state, err = wf.ExecuteSteps(ctx, revision, runners)
Expect(err).ToNot(HaveOccurred())
Expect(state).Should(BeEquivalentTo(common.WorkflowStateSucceeded))
app.Status.Workflow.ContextBackend = nil
cleanStepTimeStamp(app.Status.Workflow)
Expect(cmp.Diff(*app.Status.Workflow, common.WorkflowStatus{
AppRevision: app.Status.Workflow.AppRevision,
Mode: common.WorkflowModeStep,
Message: string(common.WorkflowStateSucceeded),
Steps: []common.WorkflowStepStatus{{
Name: "s1",
Type: "success",
@@ -136,6 +145,149 @@ var _ = Describe("Test Workflow", func() {
})
It("Workflow test for failed after retries", func() {
By("Test failed-after-retries in StepByStep mode")
app, runners := makeTestCase([]oamcore.WorkflowStep{
{
Name: "s1",
Type: "success",
},
{
Name: "s2",
Type: "failed-after-retries",
},
{
Name: "s3",
Type: "success",
},
})
ctx := monitorContext.NewTraceContext(context.Background(), "test-app")
wf := NewWorkflow(app, k8sClient, common.WorkflowModeStep)
state, err := wf.ExecuteSteps(ctx, revision, runners)
Expect(err).ToNot(HaveOccurred())
Expect(state).Should(BeEquivalentTo(common.WorkflowStateInitializing))
state, err = wf.ExecuteSteps(ctx, revision, runners)
Expect(err).ToNot(HaveOccurred())
Expect(state).Should(BeEquivalentTo(common.WorkflowStateSuspended))
workflowStatus := app.Status.Workflow
Expect(workflowStatus.ContextBackend.Name).Should(BeEquivalentTo("workflow-" + app.Name + "-context"))
workflowStatus.ContextBackend = nil
cleanStepTimeStamp(workflowStatus)
Expect(cmp.Diff(*workflowStatus, common.WorkflowStatus{
AppRevision: workflowStatus.AppRevision,
Mode: common.WorkflowModeStep,
Message: MessageFailedAfterRetries,
Suspend: true,
Steps: []common.WorkflowStepStatus{{
Name: "s1",
Type: "success",
Phase: common.WorkflowStepPhaseSucceeded,
}, {
Name: "s2",
Type: "failed-after-retries",
Phase: common.WorkflowStepPhaseFailed,
}},
})).Should(BeEquivalentTo(""))
By("Test failed-after-retries in DAG mode")
app, runners = makeTestCase([]oamcore.WorkflowStep{
{
Name: "s1",
Type: "success",
},
{
Name: "s2",
Type: "failed-after-retries",
},
{
Name: "s3",
Type: "success",
},
})
ctx = monitorContext.NewTraceContext(context.Background(), "test-app")
wf = NewWorkflow(app, k8sClient, common.WorkflowModeDAG)
state, err = wf.ExecuteSteps(ctx, revision, runners)
Expect(err).ToNot(HaveOccurred())
Expect(state).Should(BeEquivalentTo(common.WorkflowStateInitializing))
state, err = wf.ExecuteSteps(ctx, revision, runners)
Expect(err).ToNot(HaveOccurred())
Expect(state).Should(BeEquivalentTo(common.WorkflowStateSuspended))
workflowStatus = app.Status.Workflow
Expect(workflowStatus.ContextBackend.Name).Should(BeEquivalentTo("workflow-" + app.Name + "-context"))
workflowStatus.ContextBackend = nil
cleanStepTimeStamp(workflowStatus)
Expect(cmp.Diff(*workflowStatus, common.WorkflowStatus{
AppRevision: workflowStatus.AppRevision,
Mode: common.WorkflowModeDAG,
Message: MessageFailedAfterRetries,
Suspend: true,
Steps: []common.WorkflowStepStatus{{
Name: "s1",
Type: "success",
Phase: common.WorkflowStepPhaseSucceeded,
}, {
Name: "s2",
Type: "failed-after-retries",
Phase: common.WorkflowStepPhaseFailed,
}, {
Name: "s3",
Type: "success",
Phase: common.WorkflowStepPhaseSucceeded,
}},
})).Should(BeEquivalentTo(""))
})
It("Test get backoff time and clean", func() {
app, runners := makeTestCase([]oamcore.WorkflowStep{
{
Name: "s1",
Type: "wait-with-set-var",
},
})
ctx := monitorContext.NewTraceContext(context.Background(), "test-app")
wf := NewWorkflow(app, k8sClient, common.WorkflowModeDAG)
_, err := wf.ExecuteSteps(ctx, revision, runners)
Expect(err).ToNot(HaveOccurred())
By("Test get backoff time")
for i := 0; i < 5; i++ {
_, err = wf.ExecuteSteps(ctx, revision, runners)
Expect(err).ToNot(HaveOccurred())
wfCtx, err := wfContext.LoadContext(k8sClient, app.Namespace, app.Name)
Expect(err).ToNot(HaveOccurred())
interval := getBackoffWaitTime(wfCtx)
Expect(interval).Should(BeEquivalentTo(minWorkflowBackoffWaitTime))
}
for i := 0; i < 9; i++ {
_, err = wf.ExecuteSteps(ctx, revision, runners)
Expect(err).ToNot(HaveOccurred())
wfCtx, err := wfContext.LoadContext(k8sClient, app.Namespace, app.Name)
Expect(err).ToNot(HaveOccurred())
interval := getBackoffWaitTime(wfCtx)
Expect(interval).Should(BeEquivalentTo(int(0.05 * math.Pow(2, float64(i+5)))))
}
_, err = wf.ExecuteSteps(ctx, revision, runners)
Expect(err).ToNot(HaveOccurred())
wfCtx, err := wfContext.LoadContext(k8sClient, app.Namespace, app.Name)
Expect(err).ToNot(HaveOccurred())
interval := getBackoffWaitTime(wfCtx)
Expect(interval).Should(BeEquivalentTo(maxWorkflowBackoffWaitTime))
By("Test get backoff time after clean")
wf.CleanupCountersInContext(ctx)
_, err = wf.ExecuteSteps(ctx, revision, runners)
Expect(err).ToNot(HaveOccurred())
wfCtx, err = wfContext.LoadContext(k8sClient, app.Namespace, app.Name)
Expect(err).ToNot(HaveOccurred())
interval = getBackoffWaitTime(wfCtx)
Expect(interval).Should(BeEquivalentTo(minWorkflowBackoffWaitTime))
})
It("test for suspend", func() {
app, runners := makeTestCase([]oamcore.WorkflowStep{
{
@@ -155,6 +307,9 @@ var _ = Describe("Test Workflow", func() {
wf := NewWorkflow(app, k8sClient, common.WorkflowModeStep)
state, err := wf.ExecuteSteps(ctx, revision, runners)
Expect(err).ToNot(HaveOccurred())
Expect(state).Should(BeEquivalentTo(common.WorkflowStateInitializing))
state, err = wf.ExecuteSteps(ctx, revision, runners)
Expect(err).ToNot(HaveOccurred())
Expect(state).Should(BeEquivalentTo(common.WorkflowStateSuspended))
wfStatus := *app.Status.Workflow
wfStatus.ContextBackend = nil
@@ -163,6 +318,7 @@ var _ = Describe("Test Workflow", func() {
AppRevision: wfStatus.AppRevision,
Mode: common.WorkflowModeStep,
Suspend: true,
Message: string(common.WorkflowStateSuspended),
Steps: []common.WorkflowStepStatus{{
Name: "s1",
Type: "success",
@@ -191,6 +347,7 @@ var _ = Describe("Test Workflow", func() {
Expect(cmp.Diff(*app.Status.Workflow, common.WorkflowStatus{
AppRevision: app.Status.Workflow.AppRevision,
Mode: common.WorkflowModeStep,
Message: string(common.WorkflowStateSucceeded),
Steps: []common.WorkflowStepStatus{{
Name: "s1",
Type: "success",
@@ -226,6 +383,9 @@ var _ = Describe("Test Workflow", func() {
wf := NewWorkflow(app, k8sClient, common.WorkflowModeStep)
state, err := wf.ExecuteSteps(ctx, revision, runners)
Expect(err).ToNot(HaveOccurred())
Expect(state).Should(BeEquivalentTo(common.WorkflowStateInitializing))
state, err = wf.ExecuteSteps(ctx, revision, runners)
Expect(err).ToNot(HaveOccurred())
Expect(state).Should(BeEquivalentTo(common.WorkflowStateTerminated))
app.Status.Workflow.ContextBackend = nil
cleanStepTimeStamp(app.Status.Workflow)
@@ -233,6 +393,7 @@ var _ = Describe("Test Workflow", func() {
AppRevision: app.Status.Workflow.AppRevision,
Mode: common.WorkflowModeStep,
Terminated: true,
Message: string(common.WorkflowStateTerminated),
Steps: []common.WorkflowStepStatus{{
Name: "s1",
Type: "success",
@@ -263,6 +424,9 @@ var _ = Describe("Test Workflow", func() {
ctx := monitorContext.NewTraceContext(context.Background(), "test-app")
wf := NewWorkflow(app, k8sClient, common.WorkflowModeStep)
state, err := wf.ExecuteSteps(ctx, revision, runners)
Expect(err).ToNot(HaveOccurred())
Expect(state).Should(BeEquivalentTo(common.WorkflowStateInitializing))
state, err = wf.ExecuteSteps(ctx, revision, runners)
Expect(err).To(HaveOccurred())
Expect(state).Should(BeEquivalentTo(common.WorkflowStateExecuting))
app.Status.Workflow.ContextBackend = nil
@@ -270,6 +434,7 @@ var _ = Describe("Test Workflow", func() {
Expect(cmp.Diff(*app.Status.Workflow, common.WorkflowStatus{
AppRevision: app.Status.Workflow.AppRevision,
Mode: common.WorkflowModeStep,
Message: string(common.WorkflowStateExecuting),
Steps: []common.WorkflowStepStatus{{
Name: "s1",
Type: "success",
@@ -307,12 +472,16 @@ var _ = Describe("Test Workflow", func() {
ctx := monitorContext.NewTraceContext(context.Background(), "test-app")
state, err := wf.ExecuteSteps(ctx, revision, runners)
Expect(err).ToNot(HaveOccurred())
Expect(state).Should(BeEquivalentTo(common.WorkflowStateInitializing))
state, err = wf.ExecuteSteps(ctx, revision, runners)
Expect(err).ToNot(HaveOccurred())
Expect(state).Should(BeEquivalentTo(common.WorkflowStateExecuting))
app.Status.Workflow.ContextBackend = nil
cleanStepTimeStamp(app.Status.Workflow)
Expect(cmp.Diff(*app.Status.Workflow, common.WorkflowStatus{
AppRevision: app.Status.Workflow.AppRevision,
Mode: common.WorkflowModeDAG,
Message: string(common.WorkflowStateExecuting),
Steps: []common.WorkflowStepStatus{{
Name: "s1",
Type: "success",
@@ -337,6 +506,7 @@ var _ = Describe("Test Workflow", func() {
Expect(cmp.Diff(*app.Status.Workflow, common.WorkflowStatus{
AppRevision: app.Status.Workflow.AppRevision,
Mode: common.WorkflowModeDAG,
Message: string(common.WorkflowStateSucceeded),
Steps: []common.WorkflowStepStatus{{
Name: "s1",
Type: "success",
@@ -368,6 +538,9 @@ var _ = Describe("Test Workflow", func() {
wf := NewWorkflow(app, k8sClient, common.WorkflowModeStep)
state, err := wf.ExecuteSteps(ctx, revision, runners)
Expect(err).ToNot(HaveOccurred())
Expect(state).Should(BeEquivalentTo(common.WorkflowStateInitializing))
state, err = wf.ExecuteSteps(ctx, revision, runners)
Expect(err).ToNot(HaveOccurred())
Expect(state).Should(BeEquivalentTo(common.WorkflowStateExecuting))
Expect(app.Status.Workflow.Steps[0].Phase).Should(BeEquivalentTo(common.WorkflowStepPhaseRunning))
wfCtx, err := wfContext.LoadContext(k8sClient, app.Namespace, app.Name)
@@ -440,6 +613,16 @@ func makeRunner(name string, tpy string) wfTypes.TaskRunner {
Phase: common.WorkflowStepPhaseFailed,
}, &wfTypes.Operation{}, nil
}
case "failed-after-retries":
run = func(ctx wfContext.Context, options *wfTypes.TaskRunOptions) (common.WorkflowStepStatus, *wfTypes.Operation, error) {
return common.WorkflowStepStatus{
Name: name,
Type: "failed-after-retries",
Phase: common.WorkflowStepPhaseFailed,
}, &wfTypes.Operation{
FailedAfterRetries: true,
}, nil
}
case "error":
run = func(ctx wfContext.Context, options *wfTypes.TaskRunOptions) (common.WorkflowStepStatus, *wfTypes.Operation, error) {
return common.WorkflowStepStatus{

View File

@@ -111,9 +111,6 @@ func NewWorkflowResumeCommand(c common.Args, ioStream cmdutil.IOStreams) *cobra.
if err != nil {
return err
}
if app.Spec.Workflow == nil {
return fmt.Errorf("the application must have workflow")
}
if app.Status.Workflow == nil {
return fmt.Errorf("the workflow in application is not running")
}

View File

@@ -148,15 +148,6 @@ func TestWorkflowResume(t *testing.T) {
"no app name specified": {
expectedErr: fmt.Errorf("must specify application name"),
},
"no workflow in app": {
app: &v1beta1.Application{
ObjectMeta: metav1.ObjectMeta{
Name: "no-workflow",
Namespace: "default",
},
},
expectedErr: fmt.Errorf("the application must have workflow"),
},
"workflow not suspended": {
app: &v1beta1.Application{
ObjectMeta: metav1.ObjectMeta{

View File

@@ -251,4 +251,25 @@ var _ = Describe("Application Normal tests", func() {
secondApp.Name = "second-app"
Expect(k8sClient.Create(ctx, &secondApp)).ShouldNot(BeNil())
})
It("Test app failed after retries", func() {
By("Apply an application")
var newApp v1beta1.Application
Expect(common.ReadYamlToObject("testdata/app/app10.yaml", &newApp)).Should(BeNil())
newApp.Namespace = namespaceName
Expect(k8sClient.Create(ctx, &newApp)).Should(BeNil())
By("check application status")
testApp := new(v1beta1.Application)
Eventually(func() error {
err := k8sClient.Get(ctx, client.ObjectKey{Namespace: namespaceName, Name: newApp.Name}, testApp)
if err != nil {
return err
}
if testApp.Status.Phase != oamcomm.ApplicationWorkflowSuspending {
return fmt.Errorf("error application status wants %s, actually %s", oamcomm.ApplicationWorkflowSuspending, testApp.Status.Phase)
}
return nil
}, 60*time.Second).Should(BeNil())
})
})

12
test/e2e-test/testdata/app/app10.yaml vendored Normal file
View File

@@ -0,0 +1,12 @@
apiVersion: core.oam.dev/v1beta1
kind: Application
metadata:
name: failed-app
spec:
components:
- name: myweb
type: worker
properties:
cmd:
- ./podinfo
- stress-cpu=1