Add deploymentController struct (#1845)

* Init deployment controller

* Support deployment-claiming

* Nitpicking

* Support deployment-scaling

* Support deployment-releasing

* Handle scale errors

* Fix lint

* Log target size

* Fix texts
This commit is contained in:
whichxjy
2021-06-30 21:11:11 +08:00
committed by GitHub
parent 9c70edeb77
commit be0563a8ea
6 changed files with 278 additions and 252 deletions

View File

@@ -0,0 +1,113 @@
/*
Copyright 2021 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workloads
import (
"context"
"fmt"
"github.com/crossplane/crossplane-runtime/pkg/event"
apps "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
)
// deploymentController is the place to hold fields needed for handle Deployment type of workloads
type deploymentController struct {
workloadController
targetNamespacedName types.NamespacedName
}
// add the parent controller to the owner of the deployment, unpause it and initialize the size
// before kicking start the update and start from every pod in the old version
func (c *deploymentController) claimDeployment(ctx context.Context, deploy *apps.Deployment, initSize *int32) (bool, error) {
if controller := metav1.GetControllerOf(deploy); controller != nil &&
controller.Kind == v1beta1.AppRolloutKind && controller.APIVersion == v1beta1.SchemeGroupVersion.String() {
// it's already there
return true, nil
}
deployPatch := client.MergeFrom(deploy.DeepCopyObject())
// add the parent controller to the owner of the deployment
ref := metav1.NewControllerRef(c.parentController, v1beta1.AppRolloutKindVersionKind)
deploy.SetOwnerReferences(append(deploy.GetOwnerReferences(), *ref))
deploy.Spec.Paused = false
if initSize != nil {
deploy.Spec.Replicas = initSize
}
// patch the Deployment
if err := c.client.Patch(ctx, deploy, deployPatch, client.FieldOwner(c.parentController.GetUID())); err != nil {
c.recorder.Event(c.parentController, event.Warning("Failed to the start the Deployment update", err))
c.rolloutStatus.RolloutRetry(err.Error())
return false, err
}
return false, nil
}
// scale the deployment
func (c *deploymentController) scaleDeployment(ctx context.Context, deploy *apps.Deployment, size int32) error {
deployPatch := client.MergeFrom(deploy.DeepCopyObject())
deploy.Spec.Replicas = pointer.Int32Ptr(size)
// patch the Deployment
if err := c.client.Patch(ctx, deploy, deployPatch, client.FieldOwner(c.parentController.GetUID())); err != nil {
c.recorder.Event(c.parentController, event.Warning(event.Reason(fmt.Sprintf(
"Failed to update the deployment %s to the correct target %d", deploy.GetName(), size)), err))
c.rolloutStatus.RolloutRetry(err.Error())
return err
}
klog.InfoS("Submitted upgrade quest for deployment", "deployment",
deploy.GetName(), "target replica size", size, "batch", c.rolloutStatus.CurrentBatch)
return nil
}
// remove the parent controller from the deployment's owner list
func (c *deploymentController) releaseDeployment(ctx context.Context, deploy *apps.Deployment) (bool, error) {
deployPatch := client.MergeFrom(deploy.DeepCopyObject())
var newOwnerList []metav1.OwnerReference
found := false
for _, owner := range deploy.GetOwnerReferences() {
if owner.Kind == v1beta1.AppRolloutKind && owner.APIVersion == v1beta1.SchemeGroupVersion.String() {
found = true
continue
}
newOwnerList = append(newOwnerList, owner)
}
if !found {
klog.InfoS("the deployment is already released", "deploy", deploy.Name)
return true, nil
}
deploy.SetOwnerReferences(newOwnerList)
// patch the Deployment
if err := c.client.Patch(ctx, deploy, deployPatch, client.FieldOwner(c.parentController.GetUID())); err != nil {
c.recorder.Event(c.parentController, event.Warning("Failed to the release the Deployment", err))
c.rolloutStatus.RolloutRetry(err.Error())
return false, err
}
return false, nil
}

View File

@@ -30,8 +30,6 @@ import (
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
"github.com/oam-dev/kubevela/apis/standard.oam.dev/v1alpha1"
"github.com/oam-dev/kubevela/pkg/controller/utils"
"github.com/oam-dev/kubevela/pkg/oam"
@@ -39,8 +37,7 @@ import (
// DeploymentRolloutController is responsible for handling rollout deployment type of workloads
type DeploymentRolloutController struct {
workloadController
targetNamespacedName types.NamespacedName
deploymentController
sourceNamespacedName types.NamespacedName
sourceDeploy apps.Deployment
targetDeploy apps.Deployment
@@ -51,14 +48,16 @@ func NewDeploymentRolloutController(client client.Client, recorder event.Recorde
rolloutSpec *v1alpha1.RolloutPlan, rolloutStatus *v1alpha1.RolloutStatus, sourceNamespacedName,
targetNamespacedName types.NamespacedName) *DeploymentRolloutController {
return &DeploymentRolloutController{
workloadController: workloadController{
client: client,
recorder: recorder,
parentController: parentController,
rolloutSpec: rolloutSpec,
rolloutStatus: rolloutStatus,
deploymentController: deploymentController{
workloadController: workloadController{
client: client,
recorder: recorder,
parentController: parentController,
rolloutSpec: rolloutSpec,
rolloutStatus: rolloutStatus,
},
targetNamespacedName: targetNamespacedName,
},
targetNamespacedName: targetNamespacedName,
sourceNamespacedName: sourceNamespacedName,
}
}
@@ -74,8 +73,7 @@ func (c *DeploymentRolloutController) VerifySpec(ctx context.Context) (bool, err
}
}()
err := c.fetchDeployments(ctx)
if err != nil {
if err := c.fetchDeployments(ctx); err != nil {
c.rolloutStatus.RolloutRetry(err.Error())
// do not fail the rollout just because we can't get the resource
// nolint:nilerr
@@ -141,23 +139,25 @@ func (c *DeploymentRolloutController) VerifySpec(ctx context.Context) (bool, err
// Initialize makes sure that the source and target deployment is under our control
func (c *DeploymentRolloutController) Initialize(ctx context.Context) (bool, error) {
err := c.fetchDeployments(ctx)
if err != nil {
if err := c.fetchDeployments(ctx); err != nil {
c.rolloutStatus.RolloutRetry(err.Error())
return false, nil
}
err = c.claimDeployment(ctx, &c.sourceDeploy, nil)
if err != nil {
c.rolloutStatus.RolloutRetry(err.Error())
// claim source deployment
if _, err := c.claimDeployment(ctx, &c.sourceDeploy, nil); err != nil {
// nolint:nilerr
return false, nil
}
// claim target deployment
// make sure we start with the matching replicas and target
targetInitSize := pointer.Int32Ptr(c.rolloutStatus.RolloutTargetSize - getDeployReplicaSize(&c.sourceDeploy))
err = c.claimDeployment(ctx, &c.targetDeploy, targetInitSize)
if err != nil {
c.rolloutStatus.RolloutRetry(err.Error())
if _, err := c.claimDeployment(ctx, &c.targetDeploy, targetInitSize); err != nil {
// nolint:nilerr
return false, nil
}
// mark the rollout initialized
c.recorder.Event(c.parentController, event.Normal("Rollout Initialized", "Rollout resource are initialized"))
return true, nil
@@ -166,30 +166,33 @@ func (c *DeploymentRolloutController) Initialize(ctx context.Context) (bool, err
// RolloutOneBatchPods calculates the number of pods we can upgrade once according to the rollout spec
// and then set the partition accordingly
func (c *DeploymentRolloutController) RolloutOneBatchPods(ctx context.Context) (bool, error) {
err := c.fetchDeployments(ctx)
if err != nil {
if err := c.fetchDeployments(ctx); err != nil {
// don't fail the rollout just because of we can't get the resource
// nolint:nilerr
c.rolloutStatus.RolloutRetry(err.Error())
return false, nil
}
currentSizeSetting := *c.sourceDeploy.Spec.Replicas + *c.targetDeploy.Spec.Replicas
// get the rollout strategy
rolloutStrategy := v1alpha1.IncreaseFirstRolloutStrategyType
if len(c.rolloutSpec.RolloutStrategy) != 0 {
rolloutStrategy = c.rolloutSpec.RolloutStrategy
}
// Determine if we are the first or the second part of the current batch rollout
if currentSizeSetting == c.rolloutStatus.RolloutTargetSize {
// we need to finish the first part of the rollout,
// this may conclude that we've already reached the size (in a rollback case)
return c.rolloutBatchFirstHalf(ctx, rolloutStrategy)
}
// we are at the second half
targetSize := c.calculateCurrentTarget(c.rolloutStatus.RolloutTargetSize)
if !c.rolloutBatchSecondHalf(ctx, rolloutStrategy, targetSize) {
return false, nil
}
// record the finished upgrade action
klog.InfoS("upgraded one batch", "current batch", c.rolloutStatus.CurrentBatch,
"target deployment size", targetSize)
@@ -201,12 +204,12 @@ func (c *DeploymentRolloutController) RolloutOneBatchPods(ctx context.Context) (
// CheckOneBatchPods checks to see if the pods are all available according to the rollout plan
func (c *DeploymentRolloutController) CheckOneBatchPods(ctx context.Context) (bool, error) {
err := c.fetchDeployments(ctx)
if err != nil {
if err := c.fetchDeployments(ctx); err != nil {
// don't fail the rollout just because of we can't get the resource
// nolint:nilerr
return false, nil
}
// get the number of ready pod from target
readyTargetPodCount := c.targetDeploy.Status.ReadyReplicas
sourcePodCount := c.sourceDeploy.Status.Replicas
@@ -237,6 +240,7 @@ func (c *DeploymentRolloutController) CheckOneBatchPods(ctx context.Context) (bo
c.rolloutStatus.CurrentBatch, readyTargetPodCount, sourcePodCount, maxUnavail))
return false, nil
}
// record the successful upgrade
c.rolloutStatus.UpgradedReadyReplicas = readyTargetPodCount
klog.InfoS("all pods in current batch are ready", "current batch", c.rolloutStatus.CurrentBatch)
@@ -247,16 +251,16 @@ func (c *DeploymentRolloutController) CheckOneBatchPods(ctx context.Context) (bo
// FinalizeOneBatch makes sure that the rollout status are updated correctly
func (c *DeploymentRolloutController) FinalizeOneBatch(ctx context.Context) (bool, error) {
err := c.fetchDeployments(ctx)
if err != nil {
if err := c.fetchDeployments(ctx); err != nil {
// don't fail the rollout just because of we can't get the resource
// nolint:nilerr
return false, nil
}
sourceTarget := getDeployReplicaSize(&c.sourceDeploy)
targetTarget := getDeployReplicaSize(&c.targetDeploy)
if sourceTarget+targetTarget != c.rolloutStatus.RolloutTargetSize {
err = fmt.Errorf("deployment targets don't match total rollout, sourceTarget = %d, targetTarget = %d, "+
err := fmt.Errorf("deployment targets don't match total rollout, sourceTarget = %d, targetTarget = %d, "+
"rolloutTargetSize = %d", sourceTarget, targetTarget, c.rolloutStatus.RolloutTargetSize)
klog.ErrorS(err, "the batch is not valid", "current batch", c.rolloutStatus.CurrentBatch)
return false, err
@@ -266,21 +270,21 @@ func (c *DeploymentRolloutController) FinalizeOneBatch(ctx context.Context) (boo
// Finalize makes sure the Deployment is all upgraded
func (c *DeploymentRolloutController) Finalize(ctx context.Context, succeed bool) bool {
err := c.fetchDeployments(ctx)
if err != nil {
if err := c.fetchDeployments(ctx); err != nil {
// don't fail the rollout just because of we can't get the resource
return false
}
err = c.releaseDeployment(ctx, &c.sourceDeploy)
if err != nil {
c.rolloutStatus.RolloutRetry(err.Error())
// release source deployment
if _, err := c.releaseDeployment(ctx, &c.sourceDeploy); err != nil {
return false
}
err = c.releaseDeployment(ctx, &c.targetDeploy)
if err != nil {
c.rolloutStatus.RolloutRetry(err.Error())
// release target deployment
if _, err := c.releaseDeployment(ctx, &c.targetDeploy); err != nil {
return false
}
// mark the resource finalized
c.rolloutStatus.LastAppliedPodTemplateIdentifier = c.rolloutStatus.NewPodTemplateIdentifier
c.recorder.Event(c.parentController, event.Normal("Rollout Finalized",
@@ -292,82 +296,20 @@ func (c *DeploymentRolloutController) Finalize(ctx context.Context, succeed bool
The functions below are helper functions
------------------------------------- */
func (c *DeploymentRolloutController) fetchDeployments(ctx context.Context) error {
err := c.client.Get(ctx, c.sourceNamespacedName, &c.sourceDeploy)
if err != nil {
if err := c.client.Get(ctx, c.sourceNamespacedName, &c.sourceDeploy); err != nil {
if !apierrors.IsNotFound(err) {
c.recorder.Event(c.parentController, event.Warning("Failed to get the Deployment", err))
c.recorder.Event(c.parentController, event.Warning("Failed to get the source Deployment", err))
}
return err
}
err = c.client.Get(ctx, c.targetNamespacedName, &c.targetDeploy)
if err != nil {
if err := c.client.Get(ctx, c.targetNamespacedName, &c.targetDeploy); err != nil {
if !apierrors.IsNotFound(err) {
c.recorder.Event(c.parentController, event.Warning("Failed to get the Deployment", err))
c.recorder.Event(c.parentController, event.Warning("Failed to get the target Deployment", err))
}
return err
}
return nil
}
// add the parent controller to the owner of the deployment, unpause it and initialize the size
// before kicking start the update and start from every pod in the old version
func (c *DeploymentRolloutController) claimDeployment(ctx context.Context, deploy *apps.Deployment, initSize *int32) error {
deployPatch := client.MergeFrom(deploy.DeepCopyObject())
if controller := metav1.GetControllerOf(deploy); controller == nil {
ref := metav1.NewControllerRef(c.parentController, v1beta1.AppRolloutKindVersionKind)
deploy.SetOwnerReferences(append(deploy.GetOwnerReferences(), *ref))
}
deploy.Spec.Paused = false
if initSize != nil {
deploy.Spec.Replicas = initSize
}
// patch the Deployment
if err := c.client.Patch(ctx, deploy, deployPatch, client.FieldOwner(c.parentController.GetUID())); err != nil {
c.recorder.Event(c.parentController, event.Warning("Failed to the start the Deployment update", err))
return err
}
return nil
}
// patch the deployment's target, returns if succeeded
func (c *DeploymentRolloutController) patchDeployment(ctx context.Context, target int32, deploy *apps.Deployment) error {
deployPatch := client.MergeFrom(deploy.DeepCopyObject())
deploy.Spec.Replicas = pointer.Int32Ptr(target)
// patch the Deployment
if err := c.client.Patch(ctx, deploy, deployPatch, client.FieldOwner(c.parentController.GetUID())); err != nil {
c.recorder.Event(c.parentController, event.Warning(event.Reason(fmt.Sprintf(
"Failed to update the deployment %s to the correct target %d", deploy.GetName(), target)), err))
return err
}
klog.InfoS("Submitted upgrade quest for deployment", "deployment",
deploy.GetName(), "target replica size", target, "batch", c.rolloutStatus.CurrentBatch)
return nil
}
func (c *DeploymentRolloutController) releaseDeployment(ctx context.Context, deploy *apps.Deployment) error {
deployPatch := client.MergeFrom(deploy.DeepCopyObject())
// remove the parent controller from the resources' owner list
var newOwnerList []metav1.OwnerReference
found := false
for _, owner := range deploy.GetOwnerReferences() {
if owner.Kind == v1beta1.AppRolloutKind && owner.APIVersion == v1beta1.SchemeGroupVersion.String() {
found = true
continue
}
newOwnerList = append(newOwnerList, owner)
}
if !found {
klog.InfoS("the deployment is already released", "deploy", deploy.Name)
return nil
}
deploy.SetOwnerReferences(newOwnerList)
// patch the Deployment
if err := c.client.Patch(ctx, deploy, deployPatch, client.FieldOwner(c.parentController.GetUID())); err != nil {
c.recorder.Event(c.parentController, event.Warning("Failed to the finalize the Deployment", err))
c.rolloutStatus.RolloutRetry(err.Error())
return err
}
return nil
}
@@ -389,16 +331,15 @@ func (c *DeploymentRolloutController) calculateRolloutTotalSize() (int32, error)
// check if the replicas in all the rollout batches add up to the right number
func (c *DeploymentRolloutController) verifyRolloutBatchReplicaValue(totalReplicas int32) error {
// use a common function to check if the sum of all the batches can match the Deployment size
err := verifyBatchesWithRollout(c.rolloutSpec, totalReplicas)
if err != nil {
return err
}
return nil
return verifyBatchesWithRollout(c.rolloutSpec, totalReplicas)
}
// the target deploy size for the current batch
func (c *DeploymentRolloutController) calculateCurrentTarget(totalSize int32) int32 {
return int32(calculateNewBatchTarget(c.rolloutSpec, 0, int(totalSize), int(c.rolloutStatus.CurrentBatch)))
targetSize := int32(calculateNewBatchTarget(c.rolloutSpec, 0, int(totalSize), int(c.rolloutStatus.CurrentBatch)))
klog.InfoS("Calculated the number of pods in the target deployment after current batch",
"current batch", c.rolloutStatus.CurrentBatch, "target deploy size", targetSize)
return targetSize
}
// the source deploy size for the current batch
@@ -422,48 +363,49 @@ func (c *DeploymentRolloutController) rolloutBatchFirstHalf(ctx context.Context,
c.rolloutStatus.UpgradedReplicas = targetSize
}
}()
if rolloutStrategy == v1alpha1.IncreaseFirstRolloutStrategyType {
// set the target replica first which should increase its size
if targetSize > getDeployReplicaSize(&c.targetDeploy) {
if getDeployReplicaSize(&c.targetDeploy) < targetSize {
klog.InfoS("set target deployment replicas", "deploy", c.targetDeploy.Name, "targetSize", targetSize)
if err := c.patchDeployment(ctx, targetSize, &c.targetDeploy); err != nil {
c.rolloutStatus.RolloutRetry(err.Error())
}
_ = c.scaleDeployment(ctx, &c.targetDeploy, targetSize)
c.recorder.Event(c.parentController, event.Normal("Batch Rollout",
fmt.Sprintf("Submitted the increase part of upgrade quests for batch %d, target size = %d",
c.rolloutStatus.CurrentBatch, targetSize)))
return false, nil
}
// do nothing if the target is ready reached
// do nothing if the target is already reached
klog.InfoS("target deployment replicas overshoot the size already", "deploy", c.targetDeploy.Name,
"deployment size", getDeployReplicaSize(&c.targetDeploy), "targetSize", targetSize)
return true, nil
}
if rolloutStrategy == v1alpha1.DecreaseFirstRolloutStrategyType {
// set the source replicas first which should shrink its size
sourceSize := c.calculateCurrentSource(c.rolloutStatus.RolloutTargetSize)
if sourceSize < getDeployReplicaSize(&c.sourceDeploy) {
if getDeployReplicaSize(&c.sourceDeploy) > sourceSize {
klog.InfoS("set source deployment replicas", "source deploy", c.sourceDeploy.Name, "sourceSize", sourceSize)
if err := c.patchDeployment(ctx, sourceSize, &c.sourceDeploy); err != nil {
c.rolloutStatus.RolloutRetry(err.Error())
}
_ = c.scaleDeployment(ctx, &c.sourceDeploy, sourceSize)
c.recorder.Event(c.parentController, event.Normal("Batch Rollout",
fmt.Sprintf("Submitted the decrease part of upgrade quests for batch %d, source size = %d",
c.rolloutStatus.CurrentBatch, sourceSize)))
return false, nil
}
// do nothing if the reduce target is ready reached
// do nothing if the reduce target is already reached
klog.InfoS("source deployment replicas overshoot the size already", "source deploy", c.sourceDeploy.Name,
"deployment size", getDeployReplicaSize(&c.sourceDeploy), "sourceSize", sourceSize)
return true, nil
}
return false, fmt.Errorf("encountered an unknown rolloutStrategy `%s`", rolloutStrategy)
}
func (c *DeploymentRolloutController) rolloutBatchSecondHalf(ctx context.Context,
rolloutStrategy v1alpha1.RolloutStrategyType, targetSize int32) bool {
var err error
sourceSize := c.calculateCurrentSource(c.rolloutStatus.RolloutTargetSize)
if rolloutStrategy == v1alpha1.IncreaseFirstRolloutStrategyType {
// calculate the max unavailable given the target size
maxUnavail := 0
@@ -475,8 +417,7 @@ func (c *DeploymentRolloutController) rolloutBatchSecondHalf(ctx context.Context
if c.targetDeploy.Status.ReadyReplicas+int32(maxUnavail) >= targetSize {
// set the source replicas now which should shrink its size
klog.InfoS("set source deployment replicas", "deploy", c.sourceDeploy.Name, "sourceSize", sourceSize)
if err = c.patchDeployment(ctx, sourceSize, &c.sourceDeploy); err != nil {
c.rolloutStatus.RolloutRetry(err.Error())
if err := c.scaleDeployment(ctx, &c.sourceDeploy, sourceSize); err != nil {
return false
}
c.recorder.Event(c.parentController, event.Normal("Batch Rollout",
@@ -496,8 +437,7 @@ func (c *DeploymentRolloutController) rolloutBatchSecondHalf(ctx context.Context
// we can increase the target deployment as soon as the source deployment's replica is correct
// no need to wait for them to be ready
klog.InfoS("set target deployment replicas", "deploy", c.targetDeploy.Name, "targetSize", targetSize)
if err = c.patchDeployment(ctx, targetSize, &c.targetDeploy); err != nil {
c.rolloutStatus.RolloutRetry(err.Error())
if err := c.scaleDeployment(ctx, &c.targetDeploy, targetSize); err != nil {
return false
}
c.recorder.Event(c.parentController, event.Normal("Batch Rollout",
@@ -512,5 +452,6 @@ func (c *DeploymentRolloutController) rolloutBatchSecondHalf(ctx context.Context
return false
}
}
return true
}

View File

@@ -58,27 +58,29 @@ var _ = Describe("deployment controller", func() {
sourceNamespacedName = client.ObjectKey{Name: sourceName, Namespace: namespaceName}
targetNamespacedName = client.ObjectKey{Name: targetName, Namespace: namespaceName}
c = DeploymentRolloutController{
workloadController: workloadController{
client: k8sClient,
rolloutSpec: &v1alpha1.RolloutPlan{
RolloutBatches: []v1alpha1.RolloutBatch{
{
Replicas: intstr.FromInt(2),
},
{
Replicas: intstr.FromInt(3),
},
{
Replicas: intstr.FromString("50%"),
deploymentController: deploymentController{
workloadController: workloadController{
client: k8sClient,
rolloutSpec: &v1alpha1.RolloutPlan{
RolloutBatches: []v1alpha1.RolloutBatch{
{
Replicas: intstr.FromInt(2),
},
{
Replicas: intstr.FromInt(3),
},
{
Replicas: intstr.FromString("50%"),
},
},
},
rolloutStatus: &v1alpha1.RolloutStatus{RollingState: v1alpha1.RolloutSucceedState},
parentController: &appRollout,
recorder: event.NewAPIRecorder(mgr.GetEventRecorderFor("AppRollout")).
WithAnnotations("controller", "AppRollout"),
},
rolloutStatus: &v1alpha1.RolloutStatus{RollingState: v1alpha1.RolloutSucceedState},
parentController: &appRollout,
recorder: event.NewAPIRecorder(mgr.GetEventRecorderFor("AppRollout")).
WithAnnotations("controller", "AppRollout"),
targetNamespacedName: targetNamespacedName,
},
targetNamespacedName: targetNamespacedName,
sourceNamespacedName: sourceNamespacedName,
}
@@ -144,14 +146,16 @@ var _ = Describe("deployment controller", func() {
got := NewDeploymentRolloutController(k8sClient, recorder, parentController, rolloutSpec, rolloutStatus,
workloadNamespacedName, workloadNamespacedName)
c := &DeploymentRolloutController{
workloadController: workloadController{
client: k8sClient,
recorder: recorder,
parentController: parentController,
rolloutSpec: rolloutSpec,
rolloutStatus: rolloutStatus,
deploymentController: deploymentController{
workloadController: workloadController{
client: k8sClient,
recorder: recorder,
parentController: parentController,
rolloutSpec: rolloutSpec,
rolloutStatus: rolloutStatus,
},
targetNamespacedName: workloadNamespacedName,
},
targetNamespacedName: workloadNamespacedName,
sourceNamespacedName: workloadNamespacedName,
}
Expect(got).Should(Equal(c))

View File

@@ -82,10 +82,12 @@ func TestCalculateCurrentSource(t *testing.T) {
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
controller := DeploymentRolloutController{
workloadController: workloadController{
rolloutSpec: tc.rolloutSpec,
rolloutStatus: &v1alpha1.RolloutStatus{
CurrentBatch: tc.currentBatch,
deploymentController: deploymentController{
workloadController: workloadController{
rolloutSpec: tc.rolloutSpec,
rolloutStatus: &v1alpha1.RolloutStatus{
CurrentBatch: tc.currentBatch,
},
},
},
}

View File

@@ -27,10 +27,8 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
"github.com/oam-dev/kubevela/apis/standard.oam.dev/v1alpha1"
"github.com/oam-dev/kubevela/pkg/oam"
"github.com/oam-dev/kubevela/pkg/oam/util"
@@ -38,22 +36,23 @@ import (
// DeploymentScaleController is responsible for handle scale Deployment type of workloads
type DeploymentScaleController struct {
workloadController
targetNamespacedName types.NamespacedName
deploy *appsv1.Deployment
deploymentController
deploy *appsv1.Deployment
}
// NewDeploymentScaleController creates Deployment scale controller
func NewDeploymentScaleController(client client.Client, recorder event.Recorder, parentController oam.Object, rolloutSpec *v1alpha1.RolloutPlan, rolloutStatus *v1alpha1.RolloutStatus, workloadName types.NamespacedName) *DeploymentScaleController {
return &DeploymentScaleController{
workloadController: workloadController{
client: client,
recorder: recorder,
parentController: parentController,
rolloutSpec: rolloutSpec,
rolloutStatus: rolloutStatus,
deploymentController: deploymentController{
workloadController: workloadController{
client: client,
recorder: recorder,
parentController: parentController,
rolloutSpec: rolloutSpec,
rolloutStatus: rolloutStatus,
},
targetNamespacedName: workloadName,
},
targetNamespacedName: workloadName,
}
}
@@ -125,56 +124,41 @@ func (s *DeploymentScaleController) VerifySpec(ctx context.Context) (bool, error
// Initialize makes sure that the deployment is under our control
func (s *DeploymentScaleController) Initialize(ctx context.Context) (bool, error) {
err := s.fetchDeployment(ctx)
if err != nil {
if err := s.fetchDeployment(ctx); err != nil {
s.rolloutStatus.RolloutRetry(err.Error())
// nolint: nilerr
return false, nil
}
if controller := metav1.GetControllerOf(s.deploy); controller != nil {
if controller.Kind == v1beta1.AppRolloutKind && controller.APIVersion == v1beta1.SchemeGroupVersion.String() {
// it's already there
return true, nil
}
}
// add the parent controller to the owner of the deployment
deployPatch := client.MergeFrom(s.deploy.DeepCopyObject())
ref := metav1.NewControllerRef(s.parentController, v1beta1.AppRolloutKindVersionKind)
s.deploy.SetOwnerReferences(append(s.deploy.GetOwnerReferences(), *ref))
s.deploy.Spec.Paused = false
// patch the deployment
if err := s.client.Patch(ctx, s.deploy, deployPatch, client.FieldOwner(s.parentController.GetUID())); err != nil {
s.recorder.Event(s.parentController, event.Warning("Failed to the start the deployment update", err))
s.rolloutStatus.RolloutRetry(err.Error())
claimedBefore, err := s.claimDeployment(ctx, s.deploy, nil)
if err != nil {
// nolint:nilerr
return false, nil
}
// mark the rollout initialized
s.recorder.Event(s.parentController, event.Normal("Scale Initialized", "deployment is initialized"))
if !claimedBefore {
// mark the rollout initialized
s.recorder.Event(s.parentController, event.Normal("Scale Initialized", "deployment is initialized"))
}
return true, nil
}
// RolloutOneBatchPods calculates the number of pods we can scale to according to the rollout spec
func (s *DeploymentScaleController) RolloutOneBatchPods(ctx context.Context) (bool, error) {
err := s.fetchDeployment(ctx)
if err != nil {
if err := s.fetchDeployment(ctx); err != nil {
s.rolloutStatus.RolloutRetry(err.Error())
// nolint: nilerr
return false, nil
}
deployPatch := client.MergeFrom(s.deploy.DeepCopyObject())
// set the replica according to the batch
newPodTarget := calculateNewBatchTarget(s.rolloutSpec, int(s.rolloutStatus.RolloutOriginalSize),
int(s.rolloutStatus.RolloutTargetSize), int(s.rolloutStatus.CurrentBatch))
s.deploy.Spec.Replicas = pointer.Int32Ptr(int32(newPodTarget))
// patch the deployment
if err := s.client.Patch(ctx, s.deploy, deployPatch, client.FieldOwner(s.parentController.GetUID())); err != nil {
s.recorder.Event(s.parentController, event.Warning("Failed to update the deployment to upgrade", err))
s.rolloutStatus.RolloutRetry(err.Error())
if err := s.scaleDeployment(ctx, s.deploy, int32(newPodTarget)); err != nil {
// nolint:nilerr
return false, nil
}
// record the scale
klog.InfoS("scale one batch", "current batch", s.rolloutStatus.CurrentBatch)
s.recorder.Event(s.parentController, event.Normal("Batch Rollout",
@@ -185,12 +169,12 @@ func (s *DeploymentScaleController) RolloutOneBatchPods(ctx context.Context) (bo
// CheckOneBatchPods checks to see if the pods are scaled according to the rollout plan
func (s *DeploymentScaleController) CheckOneBatchPods(ctx context.Context) (bool, error) {
err := s.fetchDeployment(ctx)
if err != nil {
if err := s.fetchDeployment(ctx); err != nil {
s.rolloutStatus.RolloutRetry(err.Error())
// nolint:nilerr
return false, nil
}
newPodTarget := calculateNewBatchTarget(s.rolloutSpec, int(s.rolloutStatus.RolloutOriginalSize),
int(s.rolloutStatus.RolloutTargetSize), int(s.rolloutStatus.CurrentBatch))
// get the number of ready pod from deployment
@@ -221,6 +205,7 @@ func (s *DeploymentScaleController) CheckOneBatchPods(ctx context.Context) (bool
fmt.Sprintf("Batch %d is available", s.rolloutStatus.CurrentBatch)))
return true, nil
}
// continue to verify
klog.InfoS("the batch is not ready yet", "current batch", s.rolloutStatus.CurrentBatch,
"target", newPodTarget, "readyPodCount", readyPodCount, "max unavailable allowed", unavail)
@@ -274,56 +259,33 @@ func (s *DeploymentScaleController) Finalize(ctx context.Context, succeed bool)
s.rolloutStatus.RolloutRetry(err.Error())
return false
}
deployPatch := client.MergeFrom(s.deploy.DeepCopyObject())
// remove the parent controller from the resources' owner list
var newOwnerList []metav1.OwnerReference
isOwner := false
for _, owner := range s.deploy.GetOwnerReferences() {
if owner.Kind == v1beta1.AppRolloutKind && owner.APIVersion == v1beta1.SchemeGroupVersion.String() {
isOwner = true
continue
}
newOwnerList = append(newOwnerList, owner)
}
if !isOwner {
// nothing to do if we are already not the owner
klog.InfoS("the deployment is already released and not controlled by rollout", "deployment", s.deploy.Name)
return true
}
s.deploy.SetOwnerReferences(newOwnerList)
// patch the deployment
if err := s.client.Patch(ctx, s.deploy, deployPatch, client.FieldOwner(s.parentController.GetUID())); err != nil {
s.recorder.Event(s.parentController, event.Warning("Failed to the finalize the deployment", err))
s.rolloutStatus.RolloutRetry(err.Error())
releasedBefore, err := s.releaseDeployment(ctx, s.deploy)
if err != nil {
return false
}
// mark the resource finalized
s.recorder.Event(s.parentController, event.Normal("Scale Finalized",
fmt.Sprintf("Scale resource are finalized, succeed := %t", succeed)))
if !releasedBefore {
// mark the resource finalized
s.recorder.Event(s.parentController, event.Normal("Scale Finalized",
fmt.Sprintf("Scale resource are finalized, succeed := %t", succeed)))
}
return true
}
// size fetches the Deloyment and returns the replicas (not the actual number of pods)
func (s *DeploymentScaleController) size(ctx context.Context) (int32, error) {
if s.deploy == nil {
err := s.fetchDeployment(ctx)
if err != nil {
if err := s.fetchDeployment(ctx); err != nil {
return 0, err
}
}
// default is 1
if s.deploy.Spec.Replicas == nil {
return 1, nil
}
return *s.deploy.Spec.Replicas, nil
return getDeployReplicaSize(s.deploy), nil
}
func (s *DeploymentScaleController) fetchDeployment(ctx context.Context) error {
// get the deployment
workload := appsv1.Deployment{}
err := s.client.Get(ctx, s.targetNamespacedName, &workload)
if err != nil {
if err := s.client.Get(ctx, s.targetNamespacedName, &workload); err != nil {
if !apierrors.IsNotFound(err) {
s.recorder.Event(s.parentController, event.Warning("Failed to get the Deployment", err))
}

View File

@@ -51,28 +51,30 @@ var _ = Describe("deployment controller", func() {
appRollout := v1beta1.AppRollout{ObjectMeta: metav1.ObjectMeta{Name: name}}
namespacedName = client.ObjectKey{Name: name, Namespace: namespace}
s = DeploymentScaleController{
workloadController: workloadController{
client: k8sClient,
rolloutSpec: &v1alpha1.RolloutPlan{
TargetSize: pointer.Int32Ptr(10),
RolloutBatches: []v1alpha1.RolloutBatch{
{
Replicas: intstr.FromInt(1),
},
{
Replicas: intstr.FromString("20%"),
},
{
Replicas: intstr.FromString("80%"),
deploymentController: deploymentController{
workloadController: workloadController{
client: k8sClient,
rolloutSpec: &v1alpha1.RolloutPlan{
TargetSize: pointer.Int32Ptr(10),
RolloutBatches: []v1alpha1.RolloutBatch{
{
Replicas: intstr.FromInt(1),
},
{
Replicas: intstr.FromString("20%"),
},
{
Replicas: intstr.FromString("80%"),
},
},
},
rolloutStatus: &v1alpha1.RolloutStatus{RollingState: v1alpha1.RolloutSucceedState},
parentController: &appRollout,
recorder: event.NewAPIRecorder(mgr.GetEventRecorderFor("AppRollout")).
WithAnnotations("controller", "AppRollout"),
},
rolloutStatus: &v1alpha1.RolloutStatus{RollingState: v1alpha1.RolloutSucceedState},
parentController: &appRollout,
recorder: event.NewAPIRecorder(mgr.GetEventRecorderFor("AppRollout")).
WithAnnotations("controller", "AppRollout"),
targetNamespacedName: namespacedName,
},
targetNamespacedName: namespacedName,
}
deployment = appsv1.Deployment{
@@ -119,20 +121,22 @@ var _ = Describe("deployment controller", func() {
workloadNamespacedName := client.ObjectKey{Name: name, Namespace: namespace}
got := NewDeploymentScaleController(k8sClient, recorder, parentController, rolloutSpec, rolloutStatus, workloadNamespacedName)
controller := &DeploymentScaleController{
workloadController: workloadController{
client: k8sClient,
recorder: recorder,
parentController: parentController,
rolloutSpec: rolloutSpec,
rolloutStatus: rolloutStatus,
deploymentController: deploymentController{
workloadController: workloadController{
client: k8sClient,
recorder: recorder,
parentController: parentController,
rolloutSpec: rolloutSpec,
rolloutStatus: rolloutStatus,
},
targetNamespacedName: workloadNamespacedName,
},
targetNamespacedName: workloadNamespacedName,
}
Expect(got).Should(Equal(controller))
})
})
Context("VerifySpec", func() {
Context("TestVerifySpec", func() {
It("rollout need a target size", func() {
s.rolloutSpec.TargetSize = nil
ligit, err := s.VerifySpec(ctx)