mirror of
https://github.com/kubevela/kubevela.git
synced 2026-05-04 16:37:29 +00:00
add deployment scale controller (#1557)
add unit test Signed-off-by: roy wang <seiwy2010@gmail.com>
This commit is contained in:
@@ -349,9 +349,13 @@ func (r *Controller) GetWorkloadController() (workloads.WorkloadController, erro
|
||||
|
||||
if r.targetWorkload.GroupVersionKind().Group == apps.GroupName {
|
||||
if r.targetWorkload.GetKind() == reflect.TypeOf(apps.Deployment{}).Name() {
|
||||
// TODO: create deployment scale controller when current rollout plan is for scale
|
||||
return workloads.NewDeploymentController(r.client, r.recorder, r.parentController,
|
||||
r.rolloutSpec, r.rolloutStatus, source, target), nil
|
||||
// check whether current rollout plan is for workload rolling or scaling
|
||||
if r.sourceWorkload != nil {
|
||||
return workloads.NewDeploymentController(r.client, r.recorder, r.parentController,
|
||||
r.rolloutSpec, r.rolloutStatus, source, target), nil
|
||||
}
|
||||
return workloads.NewDeploymentScaleController(r.client, r.recorder, r.parentController,
|
||||
r.rolloutSpec, r.rolloutStatus, target), nil
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("the workload kind `%s` is not supported", kind)
|
||||
|
||||
@@ -0,0 +1,334 @@
|
||||
/*
|
||||
Copyright 2021 The KubeVela Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package workloads
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/crossplane/crossplane-runtime/pkg/event"
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/utils/pointer"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
|
||||
"github.com/oam-dev/kubevela/apis/standard.oam.dev/v1alpha1"
|
||||
"github.com/oam-dev/kubevela/pkg/oam"
|
||||
"github.com/oam-dev/kubevela/pkg/oam/util"
|
||||
)
|
||||
|
||||
// DeploymentScaleController is responsible for handle scale Deployment type of workloads
|
||||
type DeploymentScaleController struct {
|
||||
workloadController
|
||||
targetNamespacedName types.NamespacedName
|
||||
deploy *appsv1.Deployment
|
||||
}
|
||||
|
||||
// NewDeploymentScaleController creates Deployment scale controller
|
||||
func NewDeploymentScaleController(client client.Client, recorder event.Recorder, parentController oam.Object, rolloutSpec *v1alpha1.RolloutPlan, rolloutStatus *v1alpha1.RolloutStatus, workloadName types.NamespacedName) *DeploymentScaleController {
|
||||
return &DeploymentScaleController{
|
||||
workloadController: workloadController{
|
||||
client: client,
|
||||
recorder: recorder,
|
||||
parentController: parentController,
|
||||
rolloutSpec: rolloutSpec,
|
||||
rolloutStatus: rolloutStatus,
|
||||
},
|
||||
targetNamespacedName: workloadName,
|
||||
}
|
||||
}
|
||||
|
||||
// VerifySpec verifies that the deployment is stable and can be scaled
|
||||
func (s *DeploymentScaleController) VerifySpec(ctx context.Context) (bool, error) {
|
||||
var verifyErr error
|
||||
defer func() {
|
||||
if verifyErr != nil {
|
||||
klog.Error(verifyErr)
|
||||
s.recorder.Event(s.parentController, event.Warning("VerifyFailed", verifyErr))
|
||||
}
|
||||
}()
|
||||
|
||||
// the rollout has to have a target size in the scale case
|
||||
if s.rolloutSpec.TargetSize == nil {
|
||||
return false, fmt.Errorf("the rollout plan is attempting to scale the deployment %s without a target",
|
||||
s.targetNamespacedName.Name)
|
||||
}
|
||||
// record the target size
|
||||
s.rolloutStatus.RolloutTargetSize = *s.rolloutSpec.TargetSize
|
||||
klog.InfoS("record the target size", "target size", *s.rolloutSpec.TargetSize)
|
||||
|
||||
// fetch the deployment and get its current size
|
||||
originalSize, verifyErr := s.size(ctx)
|
||||
if verifyErr != nil {
|
||||
// do not fail the rollout because we can't get the resource
|
||||
s.rolloutStatus.RolloutRetry(verifyErr.Error())
|
||||
// nolint: nilerr
|
||||
return false, nil
|
||||
}
|
||||
s.rolloutStatus.RolloutOriginalSize = originalSize
|
||||
klog.InfoS("record the original size", "original size", originalSize)
|
||||
|
||||
// check if the rollout batch replicas scale up/down to the replicas target
|
||||
if verifyErr = verifyBatchesWithScale(s.rolloutSpec, int(originalSize),
|
||||
int(s.rolloutStatus.RolloutTargetSize)); verifyErr != nil {
|
||||
return false, verifyErr
|
||||
}
|
||||
|
||||
// check if the deployment is scaling
|
||||
if originalSize != s.deploy.Status.Replicas {
|
||||
verifyErr = fmt.Errorf("the deployment %s is in the middle of scaling, target size = %d, real size = %d",
|
||||
s.deploy.GetName(), originalSize, s.deploy.Status.Replicas)
|
||||
// do not fail the rollout, we can wait
|
||||
s.rolloutStatus.RolloutRetry(verifyErr.Error())
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// check if the deployment is upgrading
|
||||
if !s.deploy.Spec.Paused && s.deploy.Status.UpdatedReplicas != originalSize {
|
||||
verifyErr = fmt.Errorf("the deployment %s is in the middle of updating, target size = %d, updated pod = %d",
|
||||
s.deploy.GetName(), originalSize, s.deploy.Status.UpdatedReplicas)
|
||||
// do not fail the rollout, we can wait
|
||||
s.rolloutStatus.RolloutRetry(verifyErr.Error())
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// check if the deployment has any controller
|
||||
if controller := metav1.GetControllerOf(s.deploy); controller != nil {
|
||||
return false, fmt.Errorf("the deployment %s has a controller owner %s",
|
||||
s.deploy.GetName(), controller.String())
|
||||
}
|
||||
|
||||
// mark the scale verified
|
||||
s.recorder.Event(s.parentController, event.Normal("Scale Verified",
|
||||
"Rollout spec and the deployment resource are verified"))
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Initialize makes sure that the deployment is under our control
|
||||
func (s *DeploymentScaleController) Initialize(ctx context.Context) (bool, error) {
|
||||
err := s.fetchDeployment(ctx)
|
||||
if err != nil {
|
||||
s.rolloutStatus.RolloutRetry(err.Error())
|
||||
// nolint: nilerr
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if controller := metav1.GetControllerOf(s.deploy); controller != nil {
|
||||
if controller.Kind == v1beta1.AppRolloutKind && controller.APIVersion == v1beta1.SchemeGroupVersion.String() {
|
||||
// it's already there
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
// add the parent controller to the owner of the deployment
|
||||
deployPatch := client.MergeFrom(s.deploy.DeepCopyObject())
|
||||
ref := metav1.NewControllerRef(s.parentController, v1beta1.AppRolloutKindVersionKind)
|
||||
s.deploy.SetOwnerReferences(append(s.deploy.GetOwnerReferences(), *ref))
|
||||
s.deploy.Spec.Paused = false
|
||||
|
||||
// patch the deployment
|
||||
if err := s.client.Patch(ctx, s.deploy, deployPatch, client.FieldOwner(s.parentController.GetUID())); err != nil {
|
||||
s.recorder.Event(s.parentController, event.Warning("Failed to the start the deployment update", err))
|
||||
s.rolloutStatus.RolloutRetry(err.Error())
|
||||
return false, nil
|
||||
}
|
||||
// mark the rollout initialized
|
||||
s.recorder.Event(s.parentController, event.Normal("Scale Initialized", "deployment is initialized"))
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// RolloutOneBatchPods calculates the number of pods we can scale to according to the rollout spec
|
||||
func (s *DeploymentScaleController) RolloutOneBatchPods(ctx context.Context) (bool, error) {
|
||||
err := s.fetchDeployment(ctx)
|
||||
if err != nil {
|
||||
s.rolloutStatus.RolloutRetry(err.Error())
|
||||
// nolint: nilerr
|
||||
return false, nil
|
||||
}
|
||||
|
||||
deployPatch := client.MergeFrom(s.deploy.DeepCopyObject())
|
||||
// set the replica according to the batch
|
||||
newPodTarget := calculateNewBatchTarget(s.rolloutSpec, int(s.rolloutStatus.RolloutOriginalSize),
|
||||
int(s.rolloutStatus.RolloutTargetSize), int(s.rolloutStatus.CurrentBatch))
|
||||
s.deploy.Spec.Replicas = pointer.Int32Ptr(int32(newPodTarget))
|
||||
// patch the deployment
|
||||
if err := s.client.Patch(ctx, s.deploy, deployPatch, client.FieldOwner(s.parentController.GetUID())); err != nil {
|
||||
s.recorder.Event(s.parentController, event.Warning("Failed to update the deployment to upgrade", err))
|
||||
s.rolloutStatus.RolloutRetry(err.Error())
|
||||
return false, nil
|
||||
}
|
||||
// record the scale
|
||||
klog.InfoS("scale one batch", "current batch", s.rolloutStatus.CurrentBatch)
|
||||
s.recorder.Event(s.parentController, event.Normal("Batch Rollout",
|
||||
fmt.Sprintf("Submitted scale quest for batch %d", s.rolloutStatus.CurrentBatch)))
|
||||
s.rolloutStatus.UpgradedReplicas = int32(newPodTarget)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// CheckOneBatchPods checks to see if the pods are scaled according to the rollout plan
|
||||
func (s *DeploymentScaleController) CheckOneBatchPods(ctx context.Context) (bool, error) {
|
||||
err := s.fetchDeployment(ctx)
|
||||
if err != nil {
|
||||
s.rolloutStatus.RolloutRetry(err.Error())
|
||||
// nolint:nilerr
|
||||
return false, nil
|
||||
}
|
||||
newPodTarget := calculateNewBatchTarget(s.rolloutSpec, int(s.rolloutStatus.RolloutOriginalSize),
|
||||
int(s.rolloutStatus.RolloutTargetSize), int(s.rolloutStatus.CurrentBatch))
|
||||
// get the number of ready pod from deployment
|
||||
// TODO: should we use the replica number when we shrink?
|
||||
readyPodCount := int(s.deploy.Status.ReadyReplicas)
|
||||
currentBatch := s.rolloutSpec.RolloutBatches[s.rolloutStatus.CurrentBatch]
|
||||
unavail := 0
|
||||
if currentBatch.MaxUnavailable != nil {
|
||||
unavail, _ = intstr.GetValueFromIntOrPercent(currentBatch.MaxUnavailable,
|
||||
util.Abs(int(s.rolloutStatus.RolloutTargetSize-s.rolloutStatus.RolloutOriginalSize)), true)
|
||||
}
|
||||
klog.InfoS("checking the scaling progress", "current batch", s.rolloutStatus.CurrentBatch,
|
||||
"new pod count target", newPodTarget, "new ready pod count", readyPodCount,
|
||||
"max unavailable pod allowed", unavail)
|
||||
s.rolloutStatus.UpgradedReadyReplicas = int32(readyPodCount)
|
||||
targetReached := false
|
||||
// nolint
|
||||
if s.rolloutStatus.RolloutOriginalSize <= s.rolloutStatus.RolloutTargetSize && unavail+readyPodCount >= newPodTarget {
|
||||
targetReached = true
|
||||
} else if s.rolloutStatus.RolloutOriginalSize > s.rolloutStatus.RolloutTargetSize && readyPodCount <= newPodTarget {
|
||||
targetReached = true
|
||||
}
|
||||
if targetReached {
|
||||
// record the successful upgrade
|
||||
klog.InfoS("the current batch is ready", "current batch", s.rolloutStatus.CurrentBatch,
|
||||
"target", newPodTarget, "readyPodCount", readyPodCount, "max unavailable allowed", unavail)
|
||||
s.recorder.Event(s.parentController, event.Normal("Batch Available",
|
||||
fmt.Sprintf("Batch %d is available", s.rolloutStatus.CurrentBatch)))
|
||||
return true, nil
|
||||
}
|
||||
// continue to verify
|
||||
klog.InfoS("the batch is not ready yet", "current batch", s.rolloutStatus.CurrentBatch,
|
||||
"target", newPodTarget, "readyPodCount", readyPodCount, "max unavailable allowed", unavail)
|
||||
s.rolloutStatus.RolloutRetry("the batch is not ready yet")
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// FinalizeOneBatch makes sure that the current batch and replica count in the status are validate
|
||||
func (s *DeploymentScaleController) FinalizeOneBatch(ctx context.Context) (bool, error) {
|
||||
status := s.rolloutStatus
|
||||
spec := s.rolloutSpec
|
||||
if spec.BatchPartition != nil && *spec.BatchPartition < status.CurrentBatch {
|
||||
err := fmt.Errorf("the current batch value in the status is greater than the batch partition")
|
||||
klog.ErrorS(err, "we have moved past the user defined partition", "user specified batch partition",
|
||||
*spec.BatchPartition, "current batch we are working on", status.CurrentBatch)
|
||||
return false, err
|
||||
}
|
||||
// special case the equal case
|
||||
if s.rolloutStatus.RolloutOriginalSize == s.rolloutStatus.RolloutTargetSize {
|
||||
return true, nil
|
||||
}
|
||||
// we just make sure the target is right
|
||||
finishedPodCount := int(status.UpgradedReplicas)
|
||||
currentBatch := int(status.CurrentBatch)
|
||||
// calculate the pod target just before the current batch
|
||||
preBatchTarget := calculateNewBatchTarget(s.rolloutSpec, int(s.rolloutStatus.RolloutOriginalSize),
|
||||
int(s.rolloutStatus.RolloutTargetSize), currentBatch-1)
|
||||
// calculate the pod target with the current batch
|
||||
curBatchTarget := calculateNewBatchTarget(s.rolloutSpec, int(s.rolloutStatus.RolloutOriginalSize),
|
||||
int(s.rolloutStatus.RolloutTargetSize), currentBatch)
|
||||
// the recorded number should be at least as much as the all the pods before the current batch
|
||||
if finishedPodCount < util.Min(preBatchTarget, curBatchTarget) {
|
||||
err := fmt.Errorf("the upgraded replica in the status is less than the lower bound")
|
||||
klog.ErrorS(err, "rollout status inconsistent", "existing pod target", finishedPodCount,
|
||||
"the lower bound", util.Min(preBatchTarget, curBatchTarget))
|
||||
return false, err
|
||||
}
|
||||
// the recorded number should be not as much as the all the pods including the active batch
|
||||
if finishedPodCount > util.Max(preBatchTarget, curBatchTarget) {
|
||||
err := fmt.Errorf("the upgraded replica in the status is greater than the upper bound")
|
||||
klog.ErrorS(err, "rollout status inconsistent", "existing pod target", finishedPodCount,
|
||||
"the upper bound", util.Max(preBatchTarget, curBatchTarget))
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Finalize makes sure the deployment is scaled and ready to use
|
||||
func (s *DeploymentScaleController) Finalize(ctx context.Context, succeed bool) bool {
|
||||
if err := s.fetchDeployment(ctx); err != nil {
|
||||
s.rolloutStatus.RolloutRetry(err.Error())
|
||||
return false
|
||||
}
|
||||
deployPatch := client.MergeFrom(s.deploy.DeepCopyObject())
|
||||
// remove the parent controller from the resources' owner list
|
||||
var newOwnerList []metav1.OwnerReference
|
||||
isOwner := false
|
||||
for _, owner := range s.deploy.GetOwnerReferences() {
|
||||
if owner.Kind == v1beta1.AppRolloutKind && owner.APIVersion == v1beta1.SchemeGroupVersion.String() {
|
||||
isOwner = true
|
||||
continue
|
||||
}
|
||||
newOwnerList = append(newOwnerList, owner)
|
||||
}
|
||||
if !isOwner {
|
||||
// nothing to do if we are already not the owner
|
||||
klog.InfoS("the deployment is already released and not controlled by rollout", "deployment", s.deploy.Name)
|
||||
return true
|
||||
}
|
||||
|
||||
s.deploy.SetOwnerReferences(newOwnerList)
|
||||
// patch the deployment
|
||||
if err := s.client.Patch(ctx, s.deploy, deployPatch, client.FieldOwner(s.parentController.GetUID())); err != nil {
|
||||
s.recorder.Event(s.parentController, event.Warning("Failed to the finalize the deployment", err))
|
||||
s.rolloutStatus.RolloutRetry(err.Error())
|
||||
return false
|
||||
}
|
||||
// mark the resource finalized
|
||||
s.recorder.Event(s.parentController, event.Normal("Scale Finalized",
|
||||
fmt.Sprintf("Scale resource are finalized, succeed := %t", succeed)))
|
||||
return true
|
||||
}
|
||||
|
||||
// size fetches the Deloyment and returns the replicas (not the actual number of pods)
|
||||
func (s *DeploymentScaleController) size(ctx context.Context) (int32, error) {
|
||||
if s.deploy == nil {
|
||||
err := s.fetchDeployment(ctx)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
// default is 1
|
||||
if s.deploy.Spec.Replicas == nil {
|
||||
return 1, nil
|
||||
}
|
||||
return *s.deploy.Spec.Replicas, nil
|
||||
}
|
||||
|
||||
func (s *DeploymentScaleController) fetchDeployment(ctx context.Context) error {
|
||||
// get the deployment
|
||||
workload := appsv1.Deployment{}
|
||||
err := s.client.Get(ctx, s.targetNamespacedName, &workload)
|
||||
if err != nil {
|
||||
if !apierrors.IsNotFound(err) {
|
||||
s.recorder.Event(s.parentController, event.Warning("Failed to get the Deployment", err))
|
||||
}
|
||||
return err
|
||||
}
|
||||
s.deploy = &workload
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,485 @@
|
||||
/*
|
||||
|
||||
Copyright 2021 The KubeVela Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
|
||||
*/
|
||||
|
||||
package workloads
|
||||
|
||||
import (
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
|
||||
"github.com/crossplane/crossplane-runtime/pkg/event"
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/utils/pointer"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
|
||||
"github.com/oam-dev/kubevela/apis/standard.oam.dev/v1alpha1"
|
||||
"github.com/oam-dev/kubevela/pkg/oam/util"
|
||||
)
|
||||
|
||||
var _ = Describe("deployment controller", func() {
|
||||
var (
|
||||
s DeploymentScaleController
|
||||
ns corev1.Namespace
|
||||
name string
|
||||
namespace string
|
||||
deployment appsv1.Deployment
|
||||
namespacedName client.ObjectKey
|
||||
)
|
||||
|
||||
BeforeEach(func() {
|
||||
namespace = "rollout-ns"
|
||||
name = "rollout1"
|
||||
appRollout := v1beta1.AppRollout{ObjectMeta: metav1.ObjectMeta{Name: name}}
|
||||
namespacedName = client.ObjectKey{Name: name, Namespace: namespace}
|
||||
s = DeploymentScaleController{
|
||||
workloadController: workloadController{
|
||||
client: k8sClient,
|
||||
rolloutSpec: &v1alpha1.RolloutPlan{
|
||||
TargetSize: pointer.Int32Ptr(10),
|
||||
RolloutBatches: []v1alpha1.RolloutBatch{
|
||||
{
|
||||
Replicas: intstr.FromInt(1),
|
||||
},
|
||||
{
|
||||
Replicas: intstr.FromString("20%"),
|
||||
},
|
||||
{
|
||||
Replicas: intstr.FromString("80%"),
|
||||
},
|
||||
},
|
||||
},
|
||||
rolloutStatus: &v1alpha1.RolloutStatus{RollingState: v1alpha1.RolloutSucceedState},
|
||||
parentController: &appRollout,
|
||||
recorder: event.NewAPIRecorder(mgr.GetEventRecorderFor("AppRollout")).
|
||||
WithAnnotations("controller", "AppRollout"),
|
||||
},
|
||||
targetNamespacedName: namespacedName,
|
||||
}
|
||||
|
||||
deployment = appsv1.Deployment{
|
||||
TypeMeta: metav1.TypeMeta{APIVersion: appsv1.SchemeGroupVersion.String(), Kind: "Deployment"},
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: name},
|
||||
Spec: appsv1.DeploymentSpec{
|
||||
Replicas: pointer.Int32Ptr(1),
|
||||
Selector: &metav1.LabelSelector{
|
||||
MatchLabels: map[string]string{"env": "staging"},
|
||||
},
|
||||
Template: corev1.PodTemplateSpec{
|
||||
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"env": "staging"}},
|
||||
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: name, Image: "nginx"}}},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
ns = corev1.Namespace{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: namespace,
|
||||
},
|
||||
}
|
||||
By("Create a namespace")
|
||||
Expect(k8sClient.Create(ctx, &ns)).Should(SatisfyAny(Succeed(), &util.AlreadyExistMatcher{}))
|
||||
})
|
||||
|
||||
AfterEach(func() {
|
||||
By("clean up")
|
||||
k8sClient.Delete(ctx, &deployment)
|
||||
})
|
||||
|
||||
Context("TestNewDeploymentController", func() {
|
||||
It("init a Deployment Controller", func() {
|
||||
recorder := event.NewAPIRecorder(mgr.GetEventRecorderFor("AppRollout")).
|
||||
WithAnnotations("controller", "AppRollout")
|
||||
parentController := &v1beta1.AppRollout{ObjectMeta: metav1.ObjectMeta{Name: name}}
|
||||
rolloutSpec := &v1alpha1.RolloutPlan{
|
||||
RolloutBatches: []v1alpha1.RolloutBatch{{
|
||||
Replicas: intstr.FromInt(1),
|
||||
},
|
||||
},
|
||||
}
|
||||
rolloutStatus := &v1alpha1.RolloutStatus{RollingState: v1alpha1.RolloutSucceedState}
|
||||
workloadNamespacedName := client.ObjectKey{Name: name, Namespace: namespace}
|
||||
got := NewDeploymentScaleController(k8sClient, recorder, parentController, rolloutSpec, rolloutStatus, workloadNamespacedName)
|
||||
controller := &DeploymentScaleController{
|
||||
workloadController: workloadController{
|
||||
client: k8sClient,
|
||||
recorder: recorder,
|
||||
parentController: parentController,
|
||||
rolloutSpec: rolloutSpec,
|
||||
rolloutStatus: rolloutStatus,
|
||||
},
|
||||
targetNamespacedName: workloadNamespacedName,
|
||||
}
|
||||
Expect(got).Should(Equal(controller))
|
||||
})
|
||||
})
|
||||
|
||||
Context("VerifySpec", func() {
|
||||
It("rollout need a target size", func() {
|
||||
s.rolloutSpec.TargetSize = nil
|
||||
ligit, err := s.VerifySpec(ctx)
|
||||
Expect(ligit).Should(BeFalse())
|
||||
Expect(err.Error()).Should(ContainSubstring("without a target"))
|
||||
})
|
||||
|
||||
It("could not fetch Deployment workload", func() {
|
||||
ligit, err := s.VerifySpec(ctx)
|
||||
Expect(ligit).Should(BeFalse())
|
||||
Expect(err).Should(BeNil())
|
||||
})
|
||||
|
||||
It("rollout batch doesn't fit scale target", func() {
|
||||
By("Create a Deployment")
|
||||
deployment.Spec.Replicas = pointer.Int32Ptr(15)
|
||||
Expect(k8sClient.Create(ctx, &deployment)).Should(Succeed())
|
||||
|
||||
By("Verify should fail as the scale batches don't match")
|
||||
s.rolloutSpec.RolloutBatches[2].Replicas = intstr.FromInt(10)
|
||||
consistent, err := s.VerifySpec(ctx)
|
||||
Expect(err).ShouldNot(BeNil())
|
||||
Expect(consistent).Should(BeFalse())
|
||||
})
|
||||
|
||||
It("the deployment is in the middle of scaling", func() {
|
||||
By("Create a Deployment")
|
||||
Expect(k8sClient.Create(ctx, &deployment)).Should(Succeed())
|
||||
By("verify should fail because replica does not match")
|
||||
consistent, err := s.VerifySpec(ctx)
|
||||
Expect(err).Should(BeNil())
|
||||
Expect(consistent).Should(BeFalse())
|
||||
})
|
||||
|
||||
It("the deployment is in the middle of updating", func() {
|
||||
By("Create a Deployment and set as paused")
|
||||
Expect(k8sClient.Create(ctx, &deployment)).Should(Succeed())
|
||||
By("Update the Deployment status")
|
||||
deployment.Status.Replicas = 1
|
||||
Expect(k8sClient.Status().Update(ctx, &deployment)).Should(Succeed())
|
||||
|
||||
By("verify should fail because replica are not upgraded")
|
||||
consistent, err := s.VerifySpec(ctx)
|
||||
Expect(err).Should(BeNil())
|
||||
Expect(consistent).Should(BeFalse())
|
||||
})
|
||||
|
||||
It("spec is valid", func() {
|
||||
By("Create a Deployment and set as paused")
|
||||
deployment.Spec.Paused = true
|
||||
Expect(k8sClient.Create(ctx, &deployment)).Should(Succeed())
|
||||
By("Update the Deployment status")
|
||||
deployment.Status.Replicas = 1
|
||||
deployment.Status.UpdatedReplicas = 1
|
||||
deployment.Status.ReadyReplicas = 1
|
||||
Expect(k8sClient.Status().Update(ctx, &deployment)).Should(Succeed())
|
||||
|
||||
By("verify should pass and record the size")
|
||||
consistent, err := s.VerifySpec(ctx)
|
||||
Expect(err).Should(BeNil())
|
||||
Expect(consistent).Should(BeTrue())
|
||||
Expect(s.rolloutStatus.RolloutTargetSize).Should(BeEquivalentTo(10))
|
||||
Expect(s.rolloutStatus.RolloutOriginalSize).Should(BeEquivalentTo(1))
|
||||
})
|
||||
})
|
||||
|
||||
Context("TestInitialize", func() {
|
||||
BeforeEach(func() {
|
||||
deployment.Spec.Paused = true
|
||||
})
|
||||
|
||||
It("could not fetch Deployment workload", func() {
|
||||
consistent, err := s.Initialize(ctx)
|
||||
Expect(err).Should(BeNil())
|
||||
Expect(consistent).Should(BeFalse())
|
||||
})
|
||||
|
||||
It("failed to patch the owner of Deployment", func() {
|
||||
By("Create a Deployment")
|
||||
Expect(k8sClient.Create(ctx, &deployment)).Should(Succeed())
|
||||
|
||||
By("initialize will fail because deployment has wrong owner reference")
|
||||
initialized, err := s.Initialize(ctx)
|
||||
Expect(initialized).Should(BeFalse())
|
||||
Expect(err).Should(BeNil())
|
||||
})
|
||||
|
||||
It("workload Deployment is controlled by appRollout already", func() {
|
||||
By("Create a Deployment")
|
||||
deployment.SetOwnerReferences([]metav1.OwnerReference{{
|
||||
APIVersion: v1beta1.SchemeGroupVersion.String(),
|
||||
Kind: v1beta1.AppRolloutKind,
|
||||
Name: "def",
|
||||
UID: "123456",
|
||||
Controller: pointer.BoolPtr(true),
|
||||
}})
|
||||
Expect(k8sClient.Create(ctx, &deployment)).Should(Succeed())
|
||||
|
||||
By("initialize succeed without patching")
|
||||
initialized, err := s.Initialize(ctx)
|
||||
Expect(initialized).Should(BeTrue())
|
||||
Expect(err).Should(BeNil())
|
||||
})
|
||||
|
||||
It("successfully initialized Deployment", func() {
|
||||
By("create deployment")
|
||||
Expect(k8sClient.Create(ctx, &deployment)).Should(Succeed())
|
||||
|
||||
By("initialize succeeds")
|
||||
s.parentController.SetUID("1231586900")
|
||||
initialized, err := s.Initialize(ctx)
|
||||
Expect(initialized).Should(BeTrue())
|
||||
Expect(err).Should(BeNil())
|
||||
})
|
||||
})
|
||||
|
||||
Context("TestRolloutOneBatchPods", func() {
|
||||
It("could not fetch Deployment workload", func() {
|
||||
consistent, err := s.RolloutOneBatchPods(ctx)
|
||||
Expect(err).Should(BeNil())
|
||||
Expect(consistent).Should(BeFalse())
|
||||
})
|
||||
|
||||
It("successfully rollout, current batch number is not equal to the expected one", func() {
|
||||
By("Create a Deployment")
|
||||
Expect(k8sClient.Create(ctx, &deployment)).Should(Succeed())
|
||||
|
||||
By("rollout the second batch of current deployment")
|
||||
s.rolloutStatus.CurrentBatch = 1
|
||||
s.rolloutStatus.RolloutOriginalSize = 0
|
||||
s.rolloutStatus.RolloutTargetSize = 10
|
||||
done, err := s.RolloutOneBatchPods(ctx)
|
||||
Expect(done).Should(BeTrue())
|
||||
Expect(err).Should(BeNil())
|
||||
Expect(s.rolloutStatus.UpgradedReplicas).Should(BeEquivalentTo(3))
|
||||
Expect(k8sClient.Get(ctx, s.targetNamespacedName, &deployment)).Should(Succeed())
|
||||
Expect(*deployment.Spec.Replicas).Should(BeEquivalentTo(3))
|
||||
})
|
||||
})
|
||||
|
||||
Context("TestCheckOneBatchPods", func() {
|
||||
It("could not fetch Deployment workload", func() {
|
||||
consistent, err := s.CheckOneBatchPods(ctx)
|
||||
Expect(err).Should(BeNil())
|
||||
Expect(consistent).Should(BeFalse())
|
||||
})
|
||||
|
||||
It("current ready Pod is less than expected during increase", func() {
|
||||
By("Create the Deployment")
|
||||
Expect(k8sClient.Create(ctx, &deployment)).Should(Succeed())
|
||||
By("Update the Deployment status")
|
||||
deployment.Status.Replicas = 3
|
||||
deployment.Status.ReadyReplicas = 3
|
||||
Expect(k8sClient.Status().Update(ctx, &deployment)).Should(Succeed())
|
||||
|
||||
By("checking should fail as not enough pod ready")
|
||||
s.rolloutStatus.CurrentBatch = 1
|
||||
s.rolloutStatus.RolloutOriginalSize = 2
|
||||
s.rolloutStatus.RolloutTargetSize = 10
|
||||
done, err := s.CheckOneBatchPods(ctx)
|
||||
Expect(done).Should(BeFalse())
|
||||
Expect(err).Should(BeNil())
|
||||
Expect(s.rolloutStatus.UpgradedReadyReplicas).Should(BeEquivalentTo(deployment.Status.ReadyReplicas))
|
||||
|
||||
// set the rollout batch spec allow unavailable
|
||||
perc := intstr.FromString("20%")
|
||||
s.rolloutSpec.RolloutBatches[1] = v1alpha1.RolloutBatch{
|
||||
Replicas: perc,
|
||||
MaxUnavailable: &perc,
|
||||
}
|
||||
By("checking one batch should succeed with unavailble allowed")
|
||||
done, err = s.CheckOneBatchPods(ctx)
|
||||
Expect(done).Should(BeTrue())
|
||||
Expect(err).Should(BeNil())
|
||||
Expect(s.rolloutStatus.UpgradedReadyReplicas).Should(BeEquivalentTo(deployment.Status.ReadyReplicas))
|
||||
})
|
||||
|
||||
It("current ready Pod is more than expected during decrease", func() {
|
||||
By("Create the Deployment")
|
||||
Expect(k8sClient.Create(ctx, &deployment)).Should(Succeed())
|
||||
By("Update the Deployment status")
|
||||
deployment.Status.Replicas = 10
|
||||
deployment.Status.ReadyReplicas = 10
|
||||
Expect(k8sClient.Status().Update(ctx, &deployment)).Should(Succeed())
|
||||
|
||||
By("checking should fail as not enough pod ready")
|
||||
s.rolloutStatus.CurrentBatch = 1
|
||||
s.rolloutStatus.RolloutOriginalSize = 12
|
||||
s.rolloutStatus.RolloutTargetSize = 5
|
||||
done, err := s.CheckOneBatchPods(ctx)
|
||||
Expect(done).Should(BeFalse())
|
||||
Expect(err).Should(BeNil())
|
||||
Expect(s.rolloutStatus.UpgradedReadyReplicas).Should(BeEquivalentTo(deployment.Status.ReadyReplicas))
|
||||
|
||||
// set the rollout batch spec allow unavailable
|
||||
perc := intstr.FromString("20%")
|
||||
s.rolloutSpec.RolloutBatches[1] = v1alpha1.RolloutBatch{
|
||||
Replicas: perc,
|
||||
MaxUnavailable: &perc,
|
||||
}
|
||||
By("checking one batch should still fail even with unavailble allowed")
|
||||
done, err = s.CheckOneBatchPods(ctx)
|
||||
Expect(done).Should(BeFalse())
|
||||
Expect(err).Should(BeNil())
|
||||
Expect(s.rolloutStatus.UpgradedReadyReplicas).Should(BeEquivalentTo(deployment.Status.ReadyReplicas))
|
||||
})
|
||||
|
||||
It("there are more pods shrunk during decrease", func() {
|
||||
By("Create the Deployment")
|
||||
Expect(k8sClient.Create(ctx, &deployment)).Should(Succeed())
|
||||
By("Update the Deployment status")
|
||||
deployment.Status.Replicas = 8
|
||||
deployment.Status.ReadyReplicas = 8
|
||||
Expect(k8sClient.Status().Update(ctx, &deployment)).Should(Succeed())
|
||||
|
||||
By("checking should pass even with not enough pod ready")
|
||||
s.rolloutStatus.CurrentBatch = 1
|
||||
s.rolloutStatus.RolloutOriginalSize = 12
|
||||
s.rolloutStatus.RolloutTargetSize = 5
|
||||
done, err := s.CheckOneBatchPods(ctx)
|
||||
Expect(done).Should(BeTrue())
|
||||
Expect(err).Should(BeNil())
|
||||
Expect(s.rolloutStatus.UpgradedReadyReplicas).Should(BeEquivalentTo(deployment.Status.ReadyReplicas))
|
||||
})
|
||||
})
|
||||
|
||||
Context("TestFinalizeOneBatch", func() {
|
||||
BeforeEach(func() {
|
||||
s.rolloutSpec.RolloutBatches[0] = v1alpha1.RolloutBatch{
|
||||
Replicas: intstr.FromInt(2),
|
||||
}
|
||||
})
|
||||
|
||||
It("test illegal batch partition", func() {
|
||||
By("finalizing one batch")
|
||||
s.rolloutSpec.BatchPartition = pointer.Int32Ptr(2)
|
||||
s.rolloutStatus.CurrentBatch = 3
|
||||
done, err := s.FinalizeOneBatch(ctx)
|
||||
Expect(done).Should(BeFalse())
|
||||
Expect(err.Error()).Should(ContainSubstring("the current batch value in the status is greater than the batch partition"))
|
||||
})
|
||||
|
||||
It("test finalize during increase", func() {
|
||||
By("finalizing one batch with not enough")
|
||||
s.rolloutStatus.UpgradedReplicas = 6
|
||||
s.rolloutStatus.CurrentBatch = 1
|
||||
s.rolloutStatus.RolloutOriginalSize = 5
|
||||
s.rolloutStatus.RolloutTargetSize = 12
|
||||
done, err := s.FinalizeOneBatch(ctx)
|
||||
Expect(done).Should(BeFalse())
|
||||
Expect(err.Error()).Should(ContainSubstring(" upgraded replica in the status is less than the lower bound"))
|
||||
|
||||
By("finalizing one batch with just enough")
|
||||
s.rolloutStatus.UpgradedReplicas = 7
|
||||
done, err = s.FinalizeOneBatch(ctx)
|
||||
Expect(done).Should(BeTrue())
|
||||
Expect(err).Should(BeNil())
|
||||
|
||||
By("finalizing one batch with all")
|
||||
s.rolloutStatus.UpgradedReplicas = 9
|
||||
done, err = s.FinalizeOneBatch(ctx)
|
||||
Expect(done).Should(BeTrue())
|
||||
Expect(err).Should(BeNil())
|
||||
|
||||
By("finalizing one batch with more than")
|
||||
s.rolloutStatus.UpgradedReplicas = 12
|
||||
done, err = s.FinalizeOneBatch(ctx)
|
||||
Expect(done).Should(BeFalse())
|
||||
Expect(err.Error()).Should(ContainSubstring("upgraded replica in the status is greater than the upper bound"))
|
||||
})
|
||||
|
||||
It("test finalize during decrease", func() {
|
||||
By("finalizing one batch with too many")
|
||||
s.rolloutStatus.UpgradedReplicas = 13
|
||||
s.rolloutStatus.CurrentBatch = 1
|
||||
s.rolloutStatus.RolloutOriginalSize = 14
|
||||
s.rolloutStatus.RolloutTargetSize = 2
|
||||
done, err := s.FinalizeOneBatch(ctx)
|
||||
Expect(done).Should(BeFalse())
|
||||
Expect(err.Error()).Should(ContainSubstring("upgraded replica in the status is greater than the upper bound"))
|
||||
|
||||
By("finalizing one batch with just enough")
|
||||
s.rolloutStatus.UpgradedReplicas = 12
|
||||
done, err = s.FinalizeOneBatch(ctx)
|
||||
Expect(done).Should(BeTrue())
|
||||
Expect(err).Should(BeNil())
|
||||
|
||||
By("finalizing one batch with all")
|
||||
s.rolloutStatus.UpgradedReplicas = 9
|
||||
done, err = s.FinalizeOneBatch(ctx)
|
||||
Expect(done).Should(BeTrue())
|
||||
Expect(err).Should(BeNil())
|
||||
|
||||
By("finalizing one batch with not enough")
|
||||
s.rolloutStatus.UpgradedReplicas = 8
|
||||
done, err = s.FinalizeOneBatch(ctx)
|
||||
Expect(done).Should(BeFalse())
|
||||
Expect(err.Error()).Should(ContainSubstring(" upgraded replica in the status is less than the lower bound"))
|
||||
})
|
||||
})
|
||||
|
||||
Context("TestFinalize", func() {
|
||||
It("failed to fetch Deployment", func() {
|
||||
By("finalizing")
|
||||
finalized := s.Finalize(ctx, true)
|
||||
Expect(finalized).Should(BeFalse())
|
||||
})
|
||||
|
||||
It("Already finalize Deployment", func() {
|
||||
By("Create a Deployment")
|
||||
deployment.SetOwnerReferences([]metav1.OwnerReference{{
|
||||
APIVersion: v1beta1.SchemeGroupVersion.String(),
|
||||
Kind: "notRollout",
|
||||
Name: "def",
|
||||
UID: "123456",
|
||||
}})
|
||||
Expect(k8sClient.Create(ctx, &deployment)).Should(Succeed())
|
||||
|
||||
By("finalizing without patch")
|
||||
finalized := s.Finalize(ctx, true)
|
||||
Expect(finalized).Should(BeTrue())
|
||||
})
|
||||
|
||||
It("successfully to finalize Deployment", func() {
|
||||
By("Create a Deployment")
|
||||
deployment.SetOwnerReferences([]metav1.OwnerReference{
|
||||
{
|
||||
APIVersion: v1beta1.SchemeGroupVersion.String(),
|
||||
Kind: v1beta1.AppRolloutKind,
|
||||
Name: "def",
|
||||
UID: "123456",
|
||||
},
|
||||
{
|
||||
APIVersion: corev1.SchemeGroupVersion.String(),
|
||||
Kind: "Deployment",
|
||||
Name: "def",
|
||||
UID: "998877745",
|
||||
},
|
||||
})
|
||||
Expect(k8sClient.Create(ctx, &deployment)).Should(Succeed())
|
||||
|
||||
By("finalizing with patch")
|
||||
finalized := s.Finalize(ctx, false)
|
||||
Expect(finalized).Should(BeTrue())
|
||||
Expect(k8sClient.Get(ctx, s.targetNamespacedName, &deployment)).Should(Succeed())
|
||||
Expect(len(deployment.GetOwnerReferences())).Should(BeEquivalentTo(1))
|
||||
Expect(deployment.GetOwnerReferences()[0].Kind).Should(Equal("Deployment"))
|
||||
})
|
||||
})
|
||||
})
|
||||
Reference in New Issue
Block a user