diff --git a/pkg/controller/common/rollout/workloads/delopyment_controller.go b/pkg/controller/common/rollout/workloads/delopyment_controller.go new file mode 100644 index 000000000..e37988863 --- /dev/null +++ b/pkg/controller/common/rollout/workloads/delopyment_controller.go @@ -0,0 +1,113 @@ +/* +Copyright 2021 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workloads + +import ( + "context" + "fmt" + + "github.com/crossplane/crossplane-runtime/pkg/event" + apps "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" + "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1" +) + +// deploymentController is the place to hold fields needed for handle Deployment type of workloads +type deploymentController struct { + workloadController + targetNamespacedName types.NamespacedName +} + +// add the parent controller to the owner of the deployment, unpause it and initialize the size +// before kicking start the update and start from every pod in the old version +func (c *deploymentController) claimDeployment(ctx context.Context, deploy *apps.Deployment, initSize *int32) (bool, error) { + if controller := metav1.GetControllerOf(deploy); controller != nil && + controller.Kind == v1beta1.AppRolloutKind && controller.APIVersion == v1beta1.SchemeGroupVersion.String() { + // it's already there + return true, nil + } + + deployPatch := client.MergeFrom(deploy.DeepCopyObject()) + + // add the parent controller to the owner of the deployment + ref := metav1.NewControllerRef(c.parentController, v1beta1.AppRolloutKindVersionKind) + deploy.SetOwnerReferences(append(deploy.GetOwnerReferences(), *ref)) + + deploy.Spec.Paused = false + if initSize != nil { + deploy.Spec.Replicas = initSize + } + + // patch the Deployment + if err := c.client.Patch(ctx, deploy, deployPatch, client.FieldOwner(c.parentController.GetUID())); err != nil { + c.recorder.Event(c.parentController, event.Warning("Failed to the start the Deployment update", err)) + c.rolloutStatus.RolloutRetry(err.Error()) + return false, err + } + return false, nil +} + +// scale the deployment +func (c *deploymentController) scaleDeployment(ctx context.Context, deploy *apps.Deployment, size int32) error { + deployPatch := client.MergeFrom(deploy.DeepCopyObject()) + deploy.Spec.Replicas = pointer.Int32Ptr(size) + + // patch the Deployment + if err := c.client.Patch(ctx, deploy, deployPatch, client.FieldOwner(c.parentController.GetUID())); err != nil { + c.recorder.Event(c.parentController, event.Warning(event.Reason(fmt.Sprintf( + "Failed to update the deployment %s to the correct target %d", deploy.GetName(), size)), err)) + c.rolloutStatus.RolloutRetry(err.Error()) + return err + } + + klog.InfoS("Submitted upgrade quest for deployment", "deployment", + deploy.GetName(), "target replica size", size, "batch", c.rolloutStatus.CurrentBatch) + return nil +} + +// remove the parent controller from the deployment's owner list +func (c *deploymentController) releaseDeployment(ctx context.Context, deploy *apps.Deployment) (bool, error) { + deployPatch := client.MergeFrom(deploy.DeepCopyObject()) + + var newOwnerList []metav1.OwnerReference + found := false + for _, owner := range deploy.GetOwnerReferences() { + if owner.Kind == v1beta1.AppRolloutKind && owner.APIVersion == v1beta1.SchemeGroupVersion.String() { + found = true + continue + } + newOwnerList = append(newOwnerList, owner) + } + if !found { + klog.InfoS("the deployment is already released", "deploy", deploy.Name) + return true, nil + } + deploy.SetOwnerReferences(newOwnerList) + + // patch the Deployment + if err := c.client.Patch(ctx, deploy, deployPatch, client.FieldOwner(c.parentController.GetUID())); err != nil { + c.recorder.Event(c.parentController, event.Warning("Failed to the release the Deployment", err)) + c.rolloutStatus.RolloutRetry(err.Error()) + return false, err + } + return false, nil +} diff --git a/pkg/controller/common/rollout/workloads/deployment_rollout_controller.go b/pkg/controller/common/rollout/workloads/deployment_rollout_controller.go index f5af79671..eb1801617 100644 --- a/pkg/controller/common/rollout/workloads/deployment_rollout_controller.go +++ b/pkg/controller/common/rollout/workloads/deployment_rollout_controller.go @@ -30,8 +30,6 @@ import ( "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/client" - "github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1" - "github.com/oam-dev/kubevela/apis/standard.oam.dev/v1alpha1" "github.com/oam-dev/kubevela/pkg/controller/utils" "github.com/oam-dev/kubevela/pkg/oam" @@ -39,8 +37,7 @@ import ( // DeploymentRolloutController is responsible for handling rollout deployment type of workloads type DeploymentRolloutController struct { - workloadController - targetNamespacedName types.NamespacedName + deploymentController sourceNamespacedName types.NamespacedName sourceDeploy apps.Deployment targetDeploy apps.Deployment @@ -51,14 +48,16 @@ func NewDeploymentRolloutController(client client.Client, recorder event.Recorde rolloutSpec *v1alpha1.RolloutPlan, rolloutStatus *v1alpha1.RolloutStatus, sourceNamespacedName, targetNamespacedName types.NamespacedName) *DeploymentRolloutController { return &DeploymentRolloutController{ - workloadController: workloadController{ - client: client, - recorder: recorder, - parentController: parentController, - rolloutSpec: rolloutSpec, - rolloutStatus: rolloutStatus, + deploymentController: deploymentController{ + workloadController: workloadController{ + client: client, + recorder: recorder, + parentController: parentController, + rolloutSpec: rolloutSpec, + rolloutStatus: rolloutStatus, + }, + targetNamespacedName: targetNamespacedName, }, - targetNamespacedName: targetNamespacedName, sourceNamespacedName: sourceNamespacedName, } } @@ -74,8 +73,7 @@ func (c *DeploymentRolloutController) VerifySpec(ctx context.Context) (bool, err } }() - err := c.fetchDeployments(ctx) - if err != nil { + if err := c.fetchDeployments(ctx); err != nil { c.rolloutStatus.RolloutRetry(err.Error()) // do not fail the rollout just because we can't get the resource // nolint:nilerr @@ -141,23 +139,25 @@ func (c *DeploymentRolloutController) VerifySpec(ctx context.Context) (bool, err // Initialize makes sure that the source and target deployment is under our control func (c *DeploymentRolloutController) Initialize(ctx context.Context) (bool, error) { - err := c.fetchDeployments(ctx) - if err != nil { + if err := c.fetchDeployments(ctx); err != nil { c.rolloutStatus.RolloutRetry(err.Error()) return false, nil } - err = c.claimDeployment(ctx, &c.sourceDeploy, nil) - if err != nil { - c.rolloutStatus.RolloutRetry(err.Error()) + + // claim source deployment + if _, err := c.claimDeployment(ctx, &c.sourceDeploy, nil); err != nil { + // nolint:nilerr return false, nil } + + // claim target deployment // make sure we start with the matching replicas and target targetInitSize := pointer.Int32Ptr(c.rolloutStatus.RolloutTargetSize - getDeployReplicaSize(&c.sourceDeploy)) - err = c.claimDeployment(ctx, &c.targetDeploy, targetInitSize) - if err != nil { - c.rolloutStatus.RolloutRetry(err.Error()) + if _, err := c.claimDeployment(ctx, &c.targetDeploy, targetInitSize); err != nil { + // nolint:nilerr return false, nil } + // mark the rollout initialized c.recorder.Event(c.parentController, event.Normal("Rollout Initialized", "Rollout resource are initialized")) return true, nil @@ -166,30 +166,33 @@ func (c *DeploymentRolloutController) Initialize(ctx context.Context) (bool, err // RolloutOneBatchPods calculates the number of pods we can upgrade once according to the rollout spec // and then set the partition accordingly func (c *DeploymentRolloutController) RolloutOneBatchPods(ctx context.Context) (bool, error) { - err := c.fetchDeployments(ctx) - if err != nil { + if err := c.fetchDeployments(ctx); err != nil { // don't fail the rollout just because of we can't get the resource // nolint:nilerr c.rolloutStatus.RolloutRetry(err.Error()) return false, nil } + currentSizeSetting := *c.sourceDeploy.Spec.Replicas + *c.targetDeploy.Spec.Replicas // get the rollout strategy rolloutStrategy := v1alpha1.IncreaseFirstRolloutStrategyType if len(c.rolloutSpec.RolloutStrategy) != 0 { rolloutStrategy = c.rolloutSpec.RolloutStrategy } + // Determine if we are the first or the second part of the current batch rollout if currentSizeSetting == c.rolloutStatus.RolloutTargetSize { // we need to finish the first part of the rollout, // this may conclude that we've already reached the size (in a rollback case) return c.rolloutBatchFirstHalf(ctx, rolloutStrategy) } + // we are at the second half targetSize := c.calculateCurrentTarget(c.rolloutStatus.RolloutTargetSize) if !c.rolloutBatchSecondHalf(ctx, rolloutStrategy, targetSize) { return false, nil } + // record the finished upgrade action klog.InfoS("upgraded one batch", "current batch", c.rolloutStatus.CurrentBatch, "target deployment size", targetSize) @@ -201,12 +204,12 @@ func (c *DeploymentRolloutController) RolloutOneBatchPods(ctx context.Context) ( // CheckOneBatchPods checks to see if the pods are all available according to the rollout plan func (c *DeploymentRolloutController) CheckOneBatchPods(ctx context.Context) (bool, error) { - err := c.fetchDeployments(ctx) - if err != nil { + if err := c.fetchDeployments(ctx); err != nil { // don't fail the rollout just because of we can't get the resource // nolint:nilerr return false, nil } + // get the number of ready pod from target readyTargetPodCount := c.targetDeploy.Status.ReadyReplicas sourcePodCount := c.sourceDeploy.Status.Replicas @@ -237,6 +240,7 @@ func (c *DeploymentRolloutController) CheckOneBatchPods(ctx context.Context) (bo c.rolloutStatus.CurrentBatch, readyTargetPodCount, sourcePodCount, maxUnavail)) return false, nil } + // record the successful upgrade c.rolloutStatus.UpgradedReadyReplicas = readyTargetPodCount klog.InfoS("all pods in current batch are ready", "current batch", c.rolloutStatus.CurrentBatch) @@ -247,16 +251,16 @@ func (c *DeploymentRolloutController) CheckOneBatchPods(ctx context.Context) (bo // FinalizeOneBatch makes sure that the rollout status are updated correctly func (c *DeploymentRolloutController) FinalizeOneBatch(ctx context.Context) (bool, error) { - err := c.fetchDeployments(ctx) - if err != nil { + if err := c.fetchDeployments(ctx); err != nil { // don't fail the rollout just because of we can't get the resource // nolint:nilerr return false, nil } + sourceTarget := getDeployReplicaSize(&c.sourceDeploy) targetTarget := getDeployReplicaSize(&c.targetDeploy) if sourceTarget+targetTarget != c.rolloutStatus.RolloutTargetSize { - err = fmt.Errorf("deployment targets don't match total rollout, sourceTarget = %d, targetTarget = %d, "+ + err := fmt.Errorf("deployment targets don't match total rollout, sourceTarget = %d, targetTarget = %d, "+ "rolloutTargetSize = %d", sourceTarget, targetTarget, c.rolloutStatus.RolloutTargetSize) klog.ErrorS(err, "the batch is not valid", "current batch", c.rolloutStatus.CurrentBatch) return false, err @@ -266,21 +270,21 @@ func (c *DeploymentRolloutController) FinalizeOneBatch(ctx context.Context) (boo // Finalize makes sure the Deployment is all upgraded func (c *DeploymentRolloutController) Finalize(ctx context.Context, succeed bool) bool { - err := c.fetchDeployments(ctx) - if err != nil { + if err := c.fetchDeployments(ctx); err != nil { // don't fail the rollout just because of we can't get the resource return false } - err = c.releaseDeployment(ctx, &c.sourceDeploy) - if err != nil { - c.rolloutStatus.RolloutRetry(err.Error()) + + // release source deployment + if _, err := c.releaseDeployment(ctx, &c.sourceDeploy); err != nil { return false } - err = c.releaseDeployment(ctx, &c.targetDeploy) - if err != nil { - c.rolloutStatus.RolloutRetry(err.Error()) + + // release target deployment + if _, err := c.releaseDeployment(ctx, &c.targetDeploy); err != nil { return false } + // mark the resource finalized c.rolloutStatus.LastAppliedPodTemplateIdentifier = c.rolloutStatus.NewPodTemplateIdentifier c.recorder.Event(c.parentController, event.Normal("Rollout Finalized", @@ -292,82 +296,20 @@ func (c *DeploymentRolloutController) Finalize(ctx context.Context, succeed bool The functions below are helper functions ------------------------------------- */ func (c *DeploymentRolloutController) fetchDeployments(ctx context.Context) error { - err := c.client.Get(ctx, c.sourceNamespacedName, &c.sourceDeploy) - if err != nil { + if err := c.client.Get(ctx, c.sourceNamespacedName, &c.sourceDeploy); err != nil { if !apierrors.IsNotFound(err) { - c.recorder.Event(c.parentController, event.Warning("Failed to get the Deployment", err)) + c.recorder.Event(c.parentController, event.Warning("Failed to get the source Deployment", err)) } return err } - err = c.client.Get(ctx, c.targetNamespacedName, &c.targetDeploy) - if err != nil { + if err := c.client.Get(ctx, c.targetNamespacedName, &c.targetDeploy); err != nil { if !apierrors.IsNotFound(err) { - c.recorder.Event(c.parentController, event.Warning("Failed to get the Deployment", err)) + c.recorder.Event(c.parentController, event.Warning("Failed to get the target Deployment", err)) } return err } - return nil -} -// add the parent controller to the owner of the deployment, unpause it and initialize the size -// before kicking start the update and start from every pod in the old version -func (c *DeploymentRolloutController) claimDeployment(ctx context.Context, deploy *apps.Deployment, initSize *int32) error { - deployPatch := client.MergeFrom(deploy.DeepCopyObject()) - if controller := metav1.GetControllerOf(deploy); controller == nil { - ref := metav1.NewControllerRef(c.parentController, v1beta1.AppRolloutKindVersionKind) - deploy.SetOwnerReferences(append(deploy.GetOwnerReferences(), *ref)) - } - deploy.Spec.Paused = false - if initSize != nil { - deploy.Spec.Replicas = initSize - } - // patch the Deployment - if err := c.client.Patch(ctx, deploy, deployPatch, client.FieldOwner(c.parentController.GetUID())); err != nil { - c.recorder.Event(c.parentController, event.Warning("Failed to the start the Deployment update", err)) - return err - } - return nil -} - -// patch the deployment's target, returns if succeeded -func (c *DeploymentRolloutController) patchDeployment(ctx context.Context, target int32, deploy *apps.Deployment) error { - deployPatch := client.MergeFrom(deploy.DeepCopyObject()) - deploy.Spec.Replicas = pointer.Int32Ptr(target) - // patch the Deployment - if err := c.client.Patch(ctx, deploy, deployPatch, client.FieldOwner(c.parentController.GetUID())); err != nil { - c.recorder.Event(c.parentController, event.Warning(event.Reason(fmt.Sprintf( - "Failed to update the deployment %s to the correct target %d", deploy.GetName(), target)), err)) - return err - } - klog.InfoS("Submitted upgrade quest for deployment", "deployment", - deploy.GetName(), "target replica size", target, "batch", c.rolloutStatus.CurrentBatch) - return nil -} - -func (c *DeploymentRolloutController) releaseDeployment(ctx context.Context, deploy *apps.Deployment) error { - deployPatch := client.MergeFrom(deploy.DeepCopyObject()) - // remove the parent controller from the resources' owner list - var newOwnerList []metav1.OwnerReference - found := false - for _, owner := range deploy.GetOwnerReferences() { - if owner.Kind == v1beta1.AppRolloutKind && owner.APIVersion == v1beta1.SchemeGroupVersion.String() { - found = true - continue - } - newOwnerList = append(newOwnerList, owner) - } - if !found { - klog.InfoS("the deployment is already released", "deploy", deploy.Name) - return nil - } - deploy.SetOwnerReferences(newOwnerList) - // patch the Deployment - if err := c.client.Patch(ctx, deploy, deployPatch, client.FieldOwner(c.parentController.GetUID())); err != nil { - c.recorder.Event(c.parentController, event.Warning("Failed to the finalize the Deployment", err)) - c.rolloutStatus.RolloutRetry(err.Error()) - return err - } return nil } @@ -389,16 +331,15 @@ func (c *DeploymentRolloutController) calculateRolloutTotalSize() (int32, error) // check if the replicas in all the rollout batches add up to the right number func (c *DeploymentRolloutController) verifyRolloutBatchReplicaValue(totalReplicas int32) error { // use a common function to check if the sum of all the batches can match the Deployment size - err := verifyBatchesWithRollout(c.rolloutSpec, totalReplicas) - if err != nil { - return err - } - return nil + return verifyBatchesWithRollout(c.rolloutSpec, totalReplicas) } // the target deploy size for the current batch func (c *DeploymentRolloutController) calculateCurrentTarget(totalSize int32) int32 { - return int32(calculateNewBatchTarget(c.rolloutSpec, 0, int(totalSize), int(c.rolloutStatus.CurrentBatch))) + targetSize := int32(calculateNewBatchTarget(c.rolloutSpec, 0, int(totalSize), int(c.rolloutStatus.CurrentBatch))) + klog.InfoS("Calculated the number of pods in the target deployment after current batch", + "current batch", c.rolloutStatus.CurrentBatch, "target deploy size", targetSize) + return targetSize } // the source deploy size for the current batch @@ -422,48 +363,49 @@ func (c *DeploymentRolloutController) rolloutBatchFirstHalf(ctx context.Context, c.rolloutStatus.UpgradedReplicas = targetSize } }() + if rolloutStrategy == v1alpha1.IncreaseFirstRolloutStrategyType { // set the target replica first which should increase its size - if targetSize > getDeployReplicaSize(&c.targetDeploy) { + if getDeployReplicaSize(&c.targetDeploy) < targetSize { klog.InfoS("set target deployment replicas", "deploy", c.targetDeploy.Name, "targetSize", targetSize) - if err := c.patchDeployment(ctx, targetSize, &c.targetDeploy); err != nil { - c.rolloutStatus.RolloutRetry(err.Error()) - } + _ = c.scaleDeployment(ctx, &c.targetDeploy, targetSize) c.recorder.Event(c.parentController, event.Normal("Batch Rollout", fmt.Sprintf("Submitted the increase part of upgrade quests for batch %d, target size = %d", c.rolloutStatus.CurrentBatch, targetSize))) return false, nil } - // do nothing if the target is ready reached + + // do nothing if the target is already reached klog.InfoS("target deployment replicas overshoot the size already", "deploy", c.targetDeploy.Name, "deployment size", getDeployReplicaSize(&c.targetDeploy), "targetSize", targetSize) return true, nil } + if rolloutStrategy == v1alpha1.DecreaseFirstRolloutStrategyType { // set the source replicas first which should shrink its size sourceSize := c.calculateCurrentSource(c.rolloutStatus.RolloutTargetSize) - if sourceSize < getDeployReplicaSize(&c.sourceDeploy) { + if getDeployReplicaSize(&c.sourceDeploy) > sourceSize { klog.InfoS("set source deployment replicas", "source deploy", c.sourceDeploy.Name, "sourceSize", sourceSize) - if err := c.patchDeployment(ctx, sourceSize, &c.sourceDeploy); err != nil { - c.rolloutStatus.RolloutRetry(err.Error()) - } + _ = c.scaleDeployment(ctx, &c.sourceDeploy, sourceSize) c.recorder.Event(c.parentController, event.Normal("Batch Rollout", fmt.Sprintf("Submitted the decrease part of upgrade quests for batch %d, source size = %d", c.rolloutStatus.CurrentBatch, sourceSize))) return false, nil } - // do nothing if the reduce target is ready reached + + // do nothing if the reduce target is already reached klog.InfoS("source deployment replicas overshoot the size already", "source deploy", c.sourceDeploy.Name, "deployment size", getDeployReplicaSize(&c.sourceDeploy), "sourceSize", sourceSize) return true, nil } + return false, fmt.Errorf("encountered an unknown rolloutStrategy `%s`", rolloutStrategy) } func (c *DeploymentRolloutController) rolloutBatchSecondHalf(ctx context.Context, rolloutStrategy v1alpha1.RolloutStrategyType, targetSize int32) bool { - var err error sourceSize := c.calculateCurrentSource(c.rolloutStatus.RolloutTargetSize) + if rolloutStrategy == v1alpha1.IncreaseFirstRolloutStrategyType { // calculate the max unavailable given the target size maxUnavail := 0 @@ -475,8 +417,7 @@ func (c *DeploymentRolloutController) rolloutBatchSecondHalf(ctx context.Context if c.targetDeploy.Status.ReadyReplicas+int32(maxUnavail) >= targetSize { // set the source replicas now which should shrink its size klog.InfoS("set source deployment replicas", "deploy", c.sourceDeploy.Name, "sourceSize", sourceSize) - if err = c.patchDeployment(ctx, sourceSize, &c.sourceDeploy); err != nil { - c.rolloutStatus.RolloutRetry(err.Error()) + if err := c.scaleDeployment(ctx, &c.sourceDeploy, sourceSize); err != nil { return false } c.recorder.Event(c.parentController, event.Normal("Batch Rollout", @@ -496,8 +437,7 @@ func (c *DeploymentRolloutController) rolloutBatchSecondHalf(ctx context.Context // we can increase the target deployment as soon as the source deployment's replica is correct // no need to wait for them to be ready klog.InfoS("set target deployment replicas", "deploy", c.targetDeploy.Name, "targetSize", targetSize) - if err = c.patchDeployment(ctx, targetSize, &c.targetDeploy); err != nil { - c.rolloutStatus.RolloutRetry(err.Error()) + if err := c.scaleDeployment(ctx, &c.targetDeploy, targetSize); err != nil { return false } c.recorder.Event(c.parentController, event.Normal("Batch Rollout", @@ -512,5 +452,6 @@ func (c *DeploymentRolloutController) rolloutBatchSecondHalf(ctx context.Context return false } } + return true } diff --git a/pkg/controller/common/rollout/workloads/deployment_rollout_integration_test.go b/pkg/controller/common/rollout/workloads/deployment_rollout_integration_test.go index 202cad88c..d523bb63f 100644 --- a/pkg/controller/common/rollout/workloads/deployment_rollout_integration_test.go +++ b/pkg/controller/common/rollout/workloads/deployment_rollout_integration_test.go @@ -58,27 +58,29 @@ var _ = Describe("deployment controller", func() { sourceNamespacedName = client.ObjectKey{Name: sourceName, Namespace: namespaceName} targetNamespacedName = client.ObjectKey{Name: targetName, Namespace: namespaceName} c = DeploymentRolloutController{ - workloadController: workloadController{ - client: k8sClient, - rolloutSpec: &v1alpha1.RolloutPlan{ - RolloutBatches: []v1alpha1.RolloutBatch{ - { - Replicas: intstr.FromInt(2), - }, - { - Replicas: intstr.FromInt(3), - }, - { - Replicas: intstr.FromString("50%"), + deploymentController: deploymentController{ + workloadController: workloadController{ + client: k8sClient, + rolloutSpec: &v1alpha1.RolloutPlan{ + RolloutBatches: []v1alpha1.RolloutBatch{ + { + Replicas: intstr.FromInt(2), + }, + { + Replicas: intstr.FromInt(3), + }, + { + Replicas: intstr.FromString("50%"), + }, }, }, + rolloutStatus: &v1alpha1.RolloutStatus{RollingState: v1alpha1.RolloutSucceedState}, + parentController: &appRollout, + recorder: event.NewAPIRecorder(mgr.GetEventRecorderFor("AppRollout")). + WithAnnotations("controller", "AppRollout"), }, - rolloutStatus: &v1alpha1.RolloutStatus{RollingState: v1alpha1.RolloutSucceedState}, - parentController: &appRollout, - recorder: event.NewAPIRecorder(mgr.GetEventRecorderFor("AppRollout")). - WithAnnotations("controller", "AppRollout"), + targetNamespacedName: targetNamespacedName, }, - targetNamespacedName: targetNamespacedName, sourceNamespacedName: sourceNamespacedName, } @@ -144,14 +146,16 @@ var _ = Describe("deployment controller", func() { got := NewDeploymentRolloutController(k8sClient, recorder, parentController, rolloutSpec, rolloutStatus, workloadNamespacedName, workloadNamespacedName) c := &DeploymentRolloutController{ - workloadController: workloadController{ - client: k8sClient, - recorder: recorder, - parentController: parentController, - rolloutSpec: rolloutSpec, - rolloutStatus: rolloutStatus, + deploymentController: deploymentController{ + workloadController: workloadController{ + client: k8sClient, + recorder: recorder, + parentController: parentController, + rolloutSpec: rolloutSpec, + rolloutStatus: rolloutStatus, + }, + targetNamespacedName: workloadNamespacedName, }, - targetNamespacedName: workloadNamespacedName, sourceNamespacedName: workloadNamespacedName, } Expect(got).Should(Equal(c)) diff --git a/pkg/controller/common/rollout/workloads/deployment_rollout_unit_test.go b/pkg/controller/common/rollout/workloads/deployment_rollout_unit_test.go index ed6efceeb..1fb02ca30 100644 --- a/pkg/controller/common/rollout/workloads/deployment_rollout_unit_test.go +++ b/pkg/controller/common/rollout/workloads/deployment_rollout_unit_test.go @@ -82,10 +82,12 @@ func TestCalculateCurrentSource(t *testing.T) { for name, tc := range cases { t.Run(name, func(t *testing.T) { controller := DeploymentRolloutController{ - workloadController: workloadController{ - rolloutSpec: tc.rolloutSpec, - rolloutStatus: &v1alpha1.RolloutStatus{ - CurrentBatch: tc.currentBatch, + deploymentController: deploymentController{ + workloadController: workloadController{ + rolloutSpec: tc.rolloutSpec, + rolloutStatus: &v1alpha1.RolloutStatus{ + CurrentBatch: tc.currentBatch, + }, }, }, } diff --git a/pkg/controller/common/rollout/workloads/deployment_scale_controller.go b/pkg/controller/common/rollout/workloads/deployment_scale_controller.go index 26545aec9..9fb6760e7 100644 --- a/pkg/controller/common/rollout/workloads/deployment_scale_controller.go +++ b/pkg/controller/common/rollout/workloads/deployment_scale_controller.go @@ -27,10 +27,8 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/klog/v2" - "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/client" - "github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1" "github.com/oam-dev/kubevela/apis/standard.oam.dev/v1alpha1" "github.com/oam-dev/kubevela/pkg/oam" "github.com/oam-dev/kubevela/pkg/oam/util" @@ -38,22 +36,23 @@ import ( // DeploymentScaleController is responsible for handle scale Deployment type of workloads type DeploymentScaleController struct { - workloadController - targetNamespacedName types.NamespacedName - deploy *appsv1.Deployment + deploymentController + deploy *appsv1.Deployment } // NewDeploymentScaleController creates Deployment scale controller func NewDeploymentScaleController(client client.Client, recorder event.Recorder, parentController oam.Object, rolloutSpec *v1alpha1.RolloutPlan, rolloutStatus *v1alpha1.RolloutStatus, workloadName types.NamespacedName) *DeploymentScaleController { return &DeploymentScaleController{ - workloadController: workloadController{ - client: client, - recorder: recorder, - parentController: parentController, - rolloutSpec: rolloutSpec, - rolloutStatus: rolloutStatus, + deploymentController: deploymentController{ + workloadController: workloadController{ + client: client, + recorder: recorder, + parentController: parentController, + rolloutSpec: rolloutSpec, + rolloutStatus: rolloutStatus, + }, + targetNamespacedName: workloadName, }, - targetNamespacedName: workloadName, } } @@ -125,56 +124,41 @@ func (s *DeploymentScaleController) VerifySpec(ctx context.Context) (bool, error // Initialize makes sure that the deployment is under our control func (s *DeploymentScaleController) Initialize(ctx context.Context) (bool, error) { - err := s.fetchDeployment(ctx) - if err != nil { + if err := s.fetchDeployment(ctx); err != nil { s.rolloutStatus.RolloutRetry(err.Error()) // nolint: nilerr return false, nil } - if controller := metav1.GetControllerOf(s.deploy); controller != nil { - if controller.Kind == v1beta1.AppRolloutKind && controller.APIVersion == v1beta1.SchemeGroupVersion.String() { - // it's already there - return true, nil - } - } - // add the parent controller to the owner of the deployment - deployPatch := client.MergeFrom(s.deploy.DeepCopyObject()) - ref := metav1.NewControllerRef(s.parentController, v1beta1.AppRolloutKindVersionKind) - s.deploy.SetOwnerReferences(append(s.deploy.GetOwnerReferences(), *ref)) - s.deploy.Spec.Paused = false - - // patch the deployment - if err := s.client.Patch(ctx, s.deploy, deployPatch, client.FieldOwner(s.parentController.GetUID())); err != nil { - s.recorder.Event(s.parentController, event.Warning("Failed to the start the deployment update", err)) - s.rolloutStatus.RolloutRetry(err.Error()) + claimedBefore, err := s.claimDeployment(ctx, s.deploy, nil) + if err != nil { + // nolint:nilerr return false, nil } - // mark the rollout initialized - s.recorder.Event(s.parentController, event.Normal("Scale Initialized", "deployment is initialized")) + if !claimedBefore { + // mark the rollout initialized + s.recorder.Event(s.parentController, event.Normal("Scale Initialized", "deployment is initialized")) + } return true, nil } // RolloutOneBatchPods calculates the number of pods we can scale to according to the rollout spec func (s *DeploymentScaleController) RolloutOneBatchPods(ctx context.Context) (bool, error) { - err := s.fetchDeployment(ctx) - if err != nil { + if err := s.fetchDeployment(ctx); err != nil { s.rolloutStatus.RolloutRetry(err.Error()) // nolint: nilerr return false, nil } - deployPatch := client.MergeFrom(s.deploy.DeepCopyObject()) // set the replica according to the batch newPodTarget := calculateNewBatchTarget(s.rolloutSpec, int(s.rolloutStatus.RolloutOriginalSize), int(s.rolloutStatus.RolloutTargetSize), int(s.rolloutStatus.CurrentBatch)) - s.deploy.Spec.Replicas = pointer.Int32Ptr(int32(newPodTarget)) - // patch the deployment - if err := s.client.Patch(ctx, s.deploy, deployPatch, client.FieldOwner(s.parentController.GetUID())); err != nil { - s.recorder.Event(s.parentController, event.Warning("Failed to update the deployment to upgrade", err)) - s.rolloutStatus.RolloutRetry(err.Error()) + + if err := s.scaleDeployment(ctx, s.deploy, int32(newPodTarget)); err != nil { + // nolint:nilerr return false, nil } + // record the scale klog.InfoS("scale one batch", "current batch", s.rolloutStatus.CurrentBatch) s.recorder.Event(s.parentController, event.Normal("Batch Rollout", @@ -185,12 +169,12 @@ func (s *DeploymentScaleController) RolloutOneBatchPods(ctx context.Context) (bo // CheckOneBatchPods checks to see if the pods are scaled according to the rollout plan func (s *DeploymentScaleController) CheckOneBatchPods(ctx context.Context) (bool, error) { - err := s.fetchDeployment(ctx) - if err != nil { + if err := s.fetchDeployment(ctx); err != nil { s.rolloutStatus.RolloutRetry(err.Error()) // nolint:nilerr return false, nil } + newPodTarget := calculateNewBatchTarget(s.rolloutSpec, int(s.rolloutStatus.RolloutOriginalSize), int(s.rolloutStatus.RolloutTargetSize), int(s.rolloutStatus.CurrentBatch)) // get the number of ready pod from deployment @@ -221,6 +205,7 @@ func (s *DeploymentScaleController) CheckOneBatchPods(ctx context.Context) (bool fmt.Sprintf("Batch %d is available", s.rolloutStatus.CurrentBatch))) return true, nil } + // continue to verify klog.InfoS("the batch is not ready yet", "current batch", s.rolloutStatus.CurrentBatch, "target", newPodTarget, "readyPodCount", readyPodCount, "max unavailable allowed", unavail) @@ -274,56 +259,33 @@ func (s *DeploymentScaleController) Finalize(ctx context.Context, succeed bool) s.rolloutStatus.RolloutRetry(err.Error()) return false } - deployPatch := client.MergeFrom(s.deploy.DeepCopyObject()) - // remove the parent controller from the resources' owner list - var newOwnerList []metav1.OwnerReference - isOwner := false - for _, owner := range s.deploy.GetOwnerReferences() { - if owner.Kind == v1beta1.AppRolloutKind && owner.APIVersion == v1beta1.SchemeGroupVersion.String() { - isOwner = true - continue - } - newOwnerList = append(newOwnerList, owner) - } - if !isOwner { - // nothing to do if we are already not the owner - klog.InfoS("the deployment is already released and not controlled by rollout", "deployment", s.deploy.Name) - return true - } - s.deploy.SetOwnerReferences(newOwnerList) - // patch the deployment - if err := s.client.Patch(ctx, s.deploy, deployPatch, client.FieldOwner(s.parentController.GetUID())); err != nil { - s.recorder.Event(s.parentController, event.Warning("Failed to the finalize the deployment", err)) - s.rolloutStatus.RolloutRetry(err.Error()) + releasedBefore, err := s.releaseDeployment(ctx, s.deploy) + if err != nil { return false } - // mark the resource finalized - s.recorder.Event(s.parentController, event.Normal("Scale Finalized", - fmt.Sprintf("Scale resource are finalized, succeed := %t", succeed))) + if !releasedBefore { + // mark the resource finalized + s.recorder.Event(s.parentController, event.Normal("Scale Finalized", + fmt.Sprintf("Scale resource are finalized, succeed := %t", succeed))) + } return true } // size fetches the Deloyment and returns the replicas (not the actual number of pods) func (s *DeploymentScaleController) size(ctx context.Context) (int32, error) { if s.deploy == nil { - err := s.fetchDeployment(ctx) - if err != nil { + if err := s.fetchDeployment(ctx); err != nil { return 0, err } } - // default is 1 - if s.deploy.Spec.Replicas == nil { - return 1, nil - } - return *s.deploy.Spec.Replicas, nil + return getDeployReplicaSize(s.deploy), nil } func (s *DeploymentScaleController) fetchDeployment(ctx context.Context) error { // get the deployment workload := appsv1.Deployment{} - err := s.client.Get(ctx, s.targetNamespacedName, &workload) - if err != nil { + if err := s.client.Get(ctx, s.targetNamespacedName, &workload); err != nil { if !apierrors.IsNotFound(err) { s.recorder.Event(s.parentController, event.Warning("Failed to get the Deployment", err)) } diff --git a/pkg/controller/common/rollout/workloads/deployment_scale_integration_test.go b/pkg/controller/common/rollout/workloads/deployment_scale_integration_test.go index ad02a2f1c..1a78104cb 100644 --- a/pkg/controller/common/rollout/workloads/deployment_scale_integration_test.go +++ b/pkg/controller/common/rollout/workloads/deployment_scale_integration_test.go @@ -51,28 +51,30 @@ var _ = Describe("deployment controller", func() { appRollout := v1beta1.AppRollout{ObjectMeta: metav1.ObjectMeta{Name: name}} namespacedName = client.ObjectKey{Name: name, Namespace: namespace} s = DeploymentScaleController{ - workloadController: workloadController{ - client: k8sClient, - rolloutSpec: &v1alpha1.RolloutPlan{ - TargetSize: pointer.Int32Ptr(10), - RolloutBatches: []v1alpha1.RolloutBatch{ - { - Replicas: intstr.FromInt(1), - }, - { - Replicas: intstr.FromString("20%"), - }, - { - Replicas: intstr.FromString("80%"), + deploymentController: deploymentController{ + workloadController: workloadController{ + client: k8sClient, + rolloutSpec: &v1alpha1.RolloutPlan{ + TargetSize: pointer.Int32Ptr(10), + RolloutBatches: []v1alpha1.RolloutBatch{ + { + Replicas: intstr.FromInt(1), + }, + { + Replicas: intstr.FromString("20%"), + }, + { + Replicas: intstr.FromString("80%"), + }, }, }, + rolloutStatus: &v1alpha1.RolloutStatus{RollingState: v1alpha1.RolloutSucceedState}, + parentController: &appRollout, + recorder: event.NewAPIRecorder(mgr.GetEventRecorderFor("AppRollout")). + WithAnnotations("controller", "AppRollout"), }, - rolloutStatus: &v1alpha1.RolloutStatus{RollingState: v1alpha1.RolloutSucceedState}, - parentController: &appRollout, - recorder: event.NewAPIRecorder(mgr.GetEventRecorderFor("AppRollout")). - WithAnnotations("controller", "AppRollout"), + targetNamespacedName: namespacedName, }, - targetNamespacedName: namespacedName, } deployment = appsv1.Deployment{ @@ -119,20 +121,22 @@ var _ = Describe("deployment controller", func() { workloadNamespacedName := client.ObjectKey{Name: name, Namespace: namespace} got := NewDeploymentScaleController(k8sClient, recorder, parentController, rolloutSpec, rolloutStatus, workloadNamespacedName) controller := &DeploymentScaleController{ - workloadController: workloadController{ - client: k8sClient, - recorder: recorder, - parentController: parentController, - rolloutSpec: rolloutSpec, - rolloutStatus: rolloutStatus, + deploymentController: deploymentController{ + workloadController: workloadController{ + client: k8sClient, + recorder: recorder, + parentController: parentController, + rolloutSpec: rolloutSpec, + rolloutStatus: rolloutStatus, + }, + targetNamespacedName: workloadNamespacedName, }, - targetNamespacedName: workloadNamespacedName, } Expect(got).Should(Equal(controller)) }) }) - Context("VerifySpec", func() { + Context("TestVerifySpec", func() { It("rollout need a target size", func() { s.rolloutSpec.TargetSize = nil ligit, err := s.VerifySpec(ctx)