From 4e9a7fc82efdbd6e612655ddd3293c0aba04767f Mon Sep 17 00:00:00 2001 From: Somefive Date: Fri, 5 Nov 2021 15:55:09 +0800 Subject: [PATCH] Fix: patch status retry while conflict happens (#2629) (#2638) --- .../application/application_controller.go | 49 ++++++++++++++++--- 1 file changed, 41 insertions(+), 8 deletions(-) diff --git a/pkg/controller/core.oam.dev/v1alpha2/application/application_controller.go b/pkg/controller/core.oam.dev/v1alpha2/application/application_controller.go index c63729f27..e5b5fa23f 100644 --- a/pkg/controller/core.oam.dev/v1alpha2/application/application_controller.go +++ b/pkg/controller/core.oam.dev/v1alpha2/application/application_controller.go @@ -23,6 +23,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/retry" "github.com/crossplane/crossplane-runtime/pkg/event" "github.com/crossplane/crossplane-runtime/pkg/meta" @@ -188,11 +189,11 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu app.Status.AppliedResources = handler.appliedResources switch workflowState { case common.WorkflowStateSuspended: - return ctrl.Result{}, r.patchStatus(ctx, app, common.ApplicationWorkflowSuspending) + return ctrl.Result{}, r.patchStatusWithRetryOnConflict(ctx, app, common.ApplicationWorkflowSuspending) case common.WorkflowStateTerminated: - return ctrl.Result{}, r.patchStatus(ctx, app, common.ApplicationWorkflowTerminated) + return ctrl.Result{}, r.patchStatusWithRetryOnConflict(ctx, app, common.ApplicationWorkflowTerminated) case common.WorkflowStateExecuting: - return reconcile.Result{RequeueAfter: baseWorkflowBackoffWaitTime}, r.patchStatus(ctx, app, common.ApplicationRunningWorkflow) + return reconcile.Result{RequeueAfter: baseWorkflowBackoffWaitTime}, r.patchStatusWithRetryOnConflict(ctx, app, common.ApplicationRunningWorkflow) case common.WorkflowStateFinished: wfStatus := app.Status.Workflow if wfStatus != nil { @@ -207,7 +208,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu klog.ErrorS(err, "Failed to gc after workflow", "application", klog.KObj(app)) r.Recorder.Event(app, event.Warning(velatypes.ReasonFailedGC, err)) - return r.endWithNegativeCondition(ctx, app, condition.ErrorCondition("GCAfterWorkflow", err), common.ApplicationRunningWorkflow) + return r.endWithNegativeConditionWithRetry(ctx, app, condition.ErrorCondition("GCAfterWorkflow", err), common.ApplicationRunningWorkflow) } app.Status.ResourceTracker = ref } @@ -221,7 +222,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu if err != nil { klog.ErrorS(err, "Failed to render components", "application", klog.KObj(app)) r.Recorder.Event(app, event.Warning(velatypes.ReasonFailedRender, err)) - return r.endWithNegativeCondition(ctx, app, condition.ErrorCondition("Render", err), common.ApplicationRendering) + return r.endWithNegativeConditionWithRetry(ctx, app, condition.ErrorCondition("Render", err), common.ApplicationRendering) } assemble.HandleCheckManageWorkloadTrait(*handler.currentAppRev, comps) @@ -229,7 +230,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu if err := handler.HandleComponentsRevision(ctx, comps); err != nil { klog.ErrorS(err, "Failed to handle compoents revision", "application", klog.KObj(app)) r.Recorder.Event(app, event.Warning(velatypes.ReasonFailedRevision, err)) - return r.endWithNegativeCondition(ctx, app, condition.ErrorCondition("Render", err), common.ApplicationRendering) + return r.endWithNegativeConditionWithRetry(ctx, app, condition.ErrorCondition("Render", err), common.ApplicationRendering) } klog.Info("Application manifests has prepared and ready for appRollout to handle", "application", klog.KObj(app)) } @@ -329,20 +330,52 @@ func (r *Reconciler) handleFinalizers(ctx context.Context, app *v1beta1.Applicat return false, nil } -func (r *Reconciler) endWithNegativeCondition(ctx context.Context, app *v1beta1.Application, condition condition.Condition, phase common.ApplicationPhase) (ctrl.Result, error) { +func (r *Reconciler) _endWithNegativeCondition(ctx context.Context, app *v1beta1.Application, condition condition.Condition, phase common.ApplicationPhase, retry bool) (ctrl.Result, error) { app.SetConditions(condition) - if err := r.patchStatus(ctx, app, phase); err != nil { + handler := r.patchStatus + if retry { + handler = r.patchStatusWithRetryOnConflict + } + if err := handler(ctx, app, phase); err != nil { return ctrl.Result{}, errors.WithMessage(err, "cannot update application status") } return ctrl.Result{}, fmt.Errorf("object level reconcile error, type: %q, msg: %q", string(condition.Type), condition.Message) } +func (r *Reconciler) endWithNegativeCondition(ctx context.Context, app *v1beta1.Application, condition condition.Condition, phase common.ApplicationPhase) (ctrl.Result, error) { + return r._endWithNegativeCondition(ctx, app, condition, phase, false) +} + +// Note: Only operations that must override the status should use this function, it should only focus on workflow operations by now. +func (r *Reconciler) endWithNegativeConditionWithRetry(ctx context.Context, app *v1beta1.Application, condition condition.Condition, phase common.ApplicationPhase) (ctrl.Result, error) { + return r._endWithNegativeCondition(ctx, app, condition, phase, true) +} + func (r *Reconciler) patchStatus(ctx context.Context, app *v1beta1.Application, phase common.ApplicationPhase) error { app.Status.Phase = phase updateObservedGeneration(app) return r.Client.Status().Patch(ctx, app, client.Merge) } +// Note: Only operations that must override the status should use this function, it should only focus on workflow operations by now. +func (r *Reconciler) patchStatusWithRetryOnConflict(ctx context.Context, app *v1beta1.Application, phase common.ApplicationPhase) error { + app.Status.Phase = phase + updateObservedGeneration(app) + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + status := app.Status.DeepCopy() + if err := r.Client.Get(ctx, client.ObjectKeyFromObject(app), app); err != nil { + klog.ErrorS(err, "failed to get application while patching status", "application", klog.KObj(app)) + return err + } + app.Status = *status + err := r.Client.Status().Patch(ctx, app, client.Merge) + if err != nil { + klog.ErrorS(err, "failed to re-patch status", "application", klog.KObj(app)) + } + return err + }) +} + // appWillRollout judge whether the application will be released by rollout. // If it's true, application controller will only create or update application revision but not emit any other K8s // resources into the cluster. Rollout controller will do real release works.