mirror of
https://github.com/open-cluster-management-io/ocm.git
synced 2026-02-14 18:09:57 +00:00
✨ configurable controller replicas and master node selector (#468)
* configurable controller replicas and master node selector Signed-off-by: Dong Beiqing <350758787@qq.com> * run make fmt-imports Signed-off-by: Dong Beiqing <350758787@qq.com> * shorter lines Signed-off-by: Dong Beiqing <350758787@qq.com> * rename ControllerReplicas to DeploymentReplicas Signed-off-by: Dong Beiqing <350758787@qq.com> * rename masterNodeLabelSelectors to controlPlaneNodeLabels Signed-off-by: Dong Beiqing <350758787@qq.com> * rename controlPlaneNodeLabels to controlPlaneNodeLabelSelector Signed-off-by: Dong Beiqing <350758787@qq.com> --------- Signed-off-by: Dong Beiqing <350758787@qq.com>
This commit is contained in:
@@ -22,6 +22,11 @@ func NewHubOperatorCmd() *cobra.Command {
|
||||
|
||||
flags := cmd.Flags()
|
||||
flags.BoolVar(&cmOptions.SkipRemoveCRDs, "skip-remove-crds", false, "Skip removing CRDs while ClusterManager is deleting.")
|
||||
flags.StringVar(&cmOptions.ControlPlaneNodeLabelSelector, "control-plane-node-label-selector",
|
||||
"node-role.kubernetes.io/master=", "control plane node labels, "+
|
||||
"e.g. 'environment=production', 'tier notin (frontend,backend)'")
|
||||
flags.Int32Var(&cmOptions.DeploymentReplicas, "deployment-replicas", 0,
|
||||
"Number of deployment replicas, operator will automatically determine replicas if not set")
|
||||
opts.AddFlags(flags)
|
||||
return cmd
|
||||
}
|
||||
|
||||
@@ -34,6 +34,11 @@ func NewKlusterletOperatorCmd() *cobra.Command {
|
||||
cmd.Flags().BoolVar(&klOptions.SkipPlaceholderHubSecret, "skip-placeholder-hub-secret", false,
|
||||
"If set, will skip ensuring a placeholder hub secret which is originally intended for pulling "+
|
||||
"work image before approved")
|
||||
cmd.Flags().StringVar(&klOptions.ControlPlaneNodeLabelSelector, "control-plane-node-label-selector",
|
||||
"node-role.kubernetes.io/master=", "control plane node labels, "+
|
||||
"e.g. 'environment=production', 'tier notin (frontend,backend)'")
|
||||
cmd.Flags().Int32Var(&klOptions.DeploymentReplicas, "deployment-replicas", 0,
|
||||
"Number of deployment replicas, operator will automatically determine replicas if not set")
|
||||
opts.AddFlags(flags)
|
||||
|
||||
return cmd
|
||||
|
||||
@@ -432,7 +432,8 @@ func LoadClientConfigFromSecret(secret *corev1.Secret) (*rest.Config, error) {
|
||||
// - kube version: if the kube version is less than v1.14 reutn 1
|
||||
// - node: list master nodes in the cluster and return 1 if the
|
||||
// number of master nodes is equal or less than 1. Return 3 otherwise.
|
||||
func DetermineReplica(ctx context.Context, kubeClient kubernetes.Interface, mode operatorapiv1.InstallMode, kubeVersion *version.Version) int32 {
|
||||
func DetermineReplica(ctx context.Context, kubeClient kubernetes.Interface, mode operatorapiv1.InstallMode, kubeVersion *version.Version,
|
||||
controlPlaneNodeLabelSelector string) int32 {
|
||||
// For hosted mode, there may be many cluster-manager/klusterlet running on the management cluster,
|
||||
// set the replica to 1 to reduce the footprint of the management cluster.
|
||||
if IsHosted(mode) {
|
||||
@@ -451,14 +452,14 @@ func DetermineReplica(ctx context.Context, kubeClient kubernetes.Interface, mode
|
||||
}
|
||||
}
|
||||
|
||||
return DetermineReplicaByNodes(ctx, kubeClient)
|
||||
return DetermineReplicaByNodes(ctx, kubeClient, controlPlaneNodeLabelSelector)
|
||||
}
|
||||
|
||||
// DetermineReplicaByNodes determines the replica of deployment based on:
|
||||
// list master nodes in the cluster and return 1 if
|
||||
// the number of master nodes is equal or less than 1. Return 3 otherwise.
|
||||
func DetermineReplicaByNodes(ctx context.Context, kubeClient kubernetes.Interface) int32 {
|
||||
nodes, err := kubeClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{LabelSelector: "node-role.kubernetes.io/master="})
|
||||
func DetermineReplicaByNodes(ctx context.Context, kubeClient kubernetes.Interface, controlPlaneNodeLabelSelector string) int32 {
|
||||
nodes, err := kubeClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{LabelSelector: controlPlaneNodeLabelSelector})
|
||||
if err != nil {
|
||||
return defaultReplica
|
||||
}
|
||||
|
||||
@@ -535,7 +535,7 @@ func TestDeterminReplica(t *testing.T) {
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
fakeKubeClient := fakekube.NewSimpleClientset(c.existingNodes...)
|
||||
replica := DetermineReplica(context.Background(), fakeKubeClient, c.mode, c.kubeVersion)
|
||||
replica := DetermineReplica(context.Background(), fakeKubeClient, c.mode, c.kubeVersion, "node-role.kubernetes.io/master=")
|
||||
if replica != c.expectedReplica {
|
||||
t.Errorf("Unexpected replica, actual: %d, expected: %d", replica, c.expectedReplica)
|
||||
}
|
||||
|
||||
@@ -58,8 +58,10 @@ type clusterManagerController struct {
|
||||
mwctrEnabled, addonManagerEnabled bool) error
|
||||
generateHubClusterClients func(hubConfig *rest.Config) (kubernetes.Interface, apiextensionsclient.Interface,
|
||||
migrationclient.StorageVersionMigrationsGetter, error)
|
||||
skipRemoveCRDs bool
|
||||
operatorNamespace string
|
||||
skipRemoveCRDs bool
|
||||
controlPlaneNodeLabelSelector string
|
||||
deploymentReplicas int32
|
||||
operatorNamespace string
|
||||
}
|
||||
|
||||
type clusterManagerReconcile interface {
|
||||
@@ -84,6 +86,8 @@ func NewClusterManagerController(
|
||||
configMapInformer corev1informers.ConfigMapInformer,
|
||||
recorder events.Recorder,
|
||||
skipRemoveCRDs bool,
|
||||
controlPlaneNodeLabelSelector string,
|
||||
deploymentReplicas int32,
|
||||
operatorNamespace string,
|
||||
) factory.Controller {
|
||||
controller := &clusterManagerController{
|
||||
@@ -92,14 +96,16 @@ func NewClusterManagerController(
|
||||
patcher: patcher.NewPatcher[
|
||||
*operatorapiv1.ClusterManager, operatorapiv1.ClusterManagerSpec, operatorapiv1.ClusterManagerStatus](
|
||||
clusterManagerClient),
|
||||
clusterManagerLister: clusterManagerInformer.Lister(),
|
||||
configMapLister: configMapInformer.Lister(),
|
||||
recorder: recorder,
|
||||
generateHubClusterClients: generateHubClients,
|
||||
ensureSAKubeconfigs: ensureSAKubeconfigs,
|
||||
cache: resourceapply.NewResourceCache(),
|
||||
skipRemoveCRDs: skipRemoveCRDs,
|
||||
operatorNamespace: operatorNamespace,
|
||||
clusterManagerLister: clusterManagerInformer.Lister(),
|
||||
configMapLister: configMapInformer.Lister(),
|
||||
recorder: recorder,
|
||||
generateHubClusterClients: generateHubClients,
|
||||
ensureSAKubeconfigs: ensureSAKubeconfigs,
|
||||
cache: resourceapply.NewResourceCache(),
|
||||
skipRemoveCRDs: skipRemoveCRDs,
|
||||
controlPlaneNodeLabelSelector: controlPlaneNodeLabelSelector,
|
||||
deploymentReplicas: deploymentReplicas,
|
||||
operatorNamespace: operatorNamespace,
|
||||
}
|
||||
|
||||
return factory.New().WithSync(controller.sync).
|
||||
@@ -141,6 +147,11 @@ func (n *clusterManagerController) sync(ctx context.Context, controllerContext f
|
||||
workDriver = clusterManager.Spec.WorkConfiguration.WorkDriver
|
||||
}
|
||||
|
||||
replica := n.deploymentReplicas
|
||||
if replica <= 0 {
|
||||
replica = helpers.DetermineReplica(ctx, n.operatorKubeClient, clusterManager.Spec.DeployOption.Mode, nil, n.controlPlaneNodeLabelSelector)
|
||||
}
|
||||
|
||||
// This config is used to render template of manifests.
|
||||
config := manifests.HubConfig{
|
||||
ClusterManagerName: clusterManager.Name,
|
||||
@@ -149,7 +160,7 @@ func (n *clusterManagerController) sync(ctx context.Context, controllerContext f
|
||||
WorkImage: clusterManager.Spec.WorkImagePullSpec,
|
||||
PlacementImage: clusterManager.Spec.PlacementImagePullSpec,
|
||||
AddOnManagerImage: clusterManager.Spec.AddOnManagerImagePullSpec,
|
||||
Replica: helpers.DetermineReplica(ctx, n.operatorKubeClient, clusterManager.Spec.DeployOption.Mode, nil),
|
||||
Replica: replica,
|
||||
HostedMode: clusterManager.Spec.DeployOption.Mode == operatorapiv1.InstallModeHosted,
|
||||
RegistrationWebhook: manifests.Webhook{
|
||||
Port: defaultWebhookPort,
|
||||
|
||||
@@ -23,7 +23,9 @@ import (
|
||||
)
|
||||
|
||||
type Options struct {
|
||||
SkipRemoveCRDs bool
|
||||
SkipRemoveCRDs bool
|
||||
ControlPlaneNodeLabelSelector string
|
||||
DeploymentReplicas int32
|
||||
}
|
||||
|
||||
// RunClusterManagerOperator starts a new cluster manager operator
|
||||
@@ -75,6 +77,8 @@ func (o *Options) RunClusterManagerOperator(ctx context.Context, controllerConte
|
||||
kubeInformer.Core().V1().ConfigMaps(),
|
||||
controllerContext.EventRecorder,
|
||||
o.SkipRemoveCRDs,
|
||||
o.ControlPlaneNodeLabelSelector,
|
||||
o.DeploymentReplicas,
|
||||
controllerContext.OperatorNamespace,
|
||||
)
|
||||
|
||||
|
||||
@@ -32,12 +32,14 @@ import (
|
||||
)
|
||||
|
||||
type klusterletCleanupController struct {
|
||||
patcher patcher.Patcher[*operatorapiv1.Klusterlet, operatorapiv1.KlusterletSpec, operatorapiv1.KlusterletStatus]
|
||||
klusterletLister operatorlister.KlusterletLister
|
||||
kubeClient kubernetes.Interface
|
||||
kubeVersion *version.Version
|
||||
operatorNamespace string
|
||||
managedClusterClientsBuilder managedClusterClientsBuilderInterface
|
||||
patcher patcher.Patcher[*operatorapiv1.Klusterlet, operatorapiv1.KlusterletSpec, operatorapiv1.KlusterletStatus]
|
||||
klusterletLister operatorlister.KlusterletLister
|
||||
kubeClient kubernetes.Interface
|
||||
kubeVersion *version.Version
|
||||
operatorNamespace string
|
||||
managedClusterClientsBuilder managedClusterClientsBuilderInterface
|
||||
controlPlaneNodeLabelSelector string
|
||||
deploymentReplicas int32
|
||||
}
|
||||
|
||||
// NewKlusterletCleanupController construct klusterlet cleanup controller
|
||||
@@ -51,16 +53,20 @@ func NewKlusterletCleanupController(
|
||||
appliedManifestWorkClient workv1client.AppliedManifestWorkInterface,
|
||||
kubeVersion *version.Version,
|
||||
operatorNamespace string,
|
||||
controlPlaneNodeLabelSelector string,
|
||||
deploymentReplicas int32,
|
||||
recorder events.Recorder) factory.Controller {
|
||||
controller := &klusterletCleanupController{
|
||||
kubeClient: kubeClient,
|
||||
patcher: patcher.NewPatcher[
|
||||
*operatorapiv1.Klusterlet, operatorapiv1.KlusterletSpec, operatorapiv1.KlusterletStatus](klusterletClient).
|
||||
WithOptions(patcher.PatchOptions{IgnoreResourceVersion: true}),
|
||||
klusterletLister: klusterletInformer.Lister(),
|
||||
kubeVersion: kubeVersion,
|
||||
operatorNamespace: operatorNamespace,
|
||||
managedClusterClientsBuilder: newManagedClusterClientsBuilder(kubeClient, apiExtensionClient, appliedManifestWorkClient, recorder),
|
||||
klusterletLister: klusterletInformer.Lister(),
|
||||
kubeVersion: kubeVersion,
|
||||
operatorNamespace: operatorNamespace,
|
||||
managedClusterClientsBuilder: newManagedClusterClientsBuilder(kubeClient, apiExtensionClient, appliedManifestWorkClient, recorder),
|
||||
controlPlaneNodeLabelSelector: controlPlaneNodeLabelSelector,
|
||||
deploymentReplicas: deploymentReplicas,
|
||||
}
|
||||
|
||||
return factory.New().WithSync(controller.sync).
|
||||
@@ -93,6 +99,10 @@ func (n *klusterletCleanupController) sync(ctx context.Context, controllerContex
|
||||
_, err := n.patcher.AddFinalizer(ctx, klusterlet, desiredFinalizers...)
|
||||
return err
|
||||
}
|
||||
replica := n.deploymentReplicas
|
||||
if replica <= 0 {
|
||||
replica = helpers.DetermineReplica(ctx, n.kubeClient, klusterlet.Spec.DeployOption.Mode, n.kubeVersion, n.controlPlaneNodeLabelSelector)
|
||||
}
|
||||
// Klusterlet is deleting, we remove its related resources on managed and management cluster
|
||||
config := klusterletConfig{
|
||||
KlusterletName: klusterlet.Name,
|
||||
@@ -105,7 +115,7 @@ func (n *klusterletCleanupController) sync(ctx context.Context, controllerContex
|
||||
HubKubeConfigSecret: helpers.HubKubeConfig,
|
||||
ExternalServerURL: getServersFromKlusterlet(klusterlet),
|
||||
OperatorNamespace: n.operatorNamespace,
|
||||
Replica: helpers.DetermineReplica(ctx, n.kubeClient, klusterlet.Spec.DeployOption.Mode, n.kubeVersion),
|
||||
Replica: replica,
|
||||
|
||||
ExternalManagedKubeConfigSecret: helpers.ExternalManagedKubeConfig,
|
||||
ExternalManagedKubeConfigRegistrationSecret: helpers.ExternalManagedKubeConfigRegistration,
|
||||
|
||||
@@ -45,14 +45,16 @@ const (
|
||||
)
|
||||
|
||||
type klusterletController struct {
|
||||
patcher patcher.Patcher[*operatorapiv1.Klusterlet, operatorapiv1.KlusterletSpec, operatorapiv1.KlusterletStatus]
|
||||
klusterletLister operatorlister.KlusterletLister
|
||||
kubeClient kubernetes.Interface
|
||||
kubeVersion *version.Version
|
||||
operatorNamespace string
|
||||
skipHubSecretPlaceholder bool
|
||||
cache resourceapply.ResourceCache
|
||||
managedClusterClientsBuilder managedClusterClientsBuilderInterface
|
||||
patcher patcher.Patcher[*operatorapiv1.Klusterlet, operatorapiv1.KlusterletSpec, operatorapiv1.KlusterletStatus]
|
||||
klusterletLister operatorlister.KlusterletLister
|
||||
kubeClient kubernetes.Interface
|
||||
kubeVersion *version.Version
|
||||
operatorNamespace string
|
||||
skipHubSecretPlaceholder bool
|
||||
cache resourceapply.ResourceCache
|
||||
managedClusterClientsBuilder managedClusterClientsBuilderInterface
|
||||
controlPlaneNodeLabelSelector string
|
||||
deploymentReplicas int32
|
||||
}
|
||||
|
||||
type klusterletReconcile interface {
|
||||
@@ -78,18 +80,22 @@ func NewKlusterletController(
|
||||
appliedManifestWorkClient workv1client.AppliedManifestWorkInterface,
|
||||
kubeVersion *version.Version,
|
||||
operatorNamespace string,
|
||||
controlPlaneNodeLabelSelector string,
|
||||
deploymentReplicas int32,
|
||||
recorder events.Recorder,
|
||||
skipHubSecretPlaceholder bool) factory.Controller {
|
||||
controller := &klusterletController{
|
||||
kubeClient: kubeClient,
|
||||
patcher: patcher.NewPatcher[
|
||||
*operatorapiv1.Klusterlet, operatorapiv1.KlusterletSpec, operatorapiv1.KlusterletStatus](klusterletClient),
|
||||
klusterletLister: klusterletInformer.Lister(),
|
||||
kubeVersion: kubeVersion,
|
||||
operatorNamespace: operatorNamespace,
|
||||
skipHubSecretPlaceholder: skipHubSecretPlaceholder,
|
||||
cache: resourceapply.NewResourceCache(),
|
||||
managedClusterClientsBuilder: newManagedClusterClientsBuilder(kubeClient, apiExtensionClient, appliedManifestWorkClient, recorder),
|
||||
klusterletLister: klusterletInformer.Lister(),
|
||||
kubeVersion: kubeVersion,
|
||||
operatorNamespace: operatorNamespace,
|
||||
skipHubSecretPlaceholder: skipHubSecretPlaceholder,
|
||||
cache: resourceapply.NewResourceCache(),
|
||||
managedClusterClientsBuilder: newManagedClusterClientsBuilder(kubeClient, apiExtensionClient, appliedManifestWorkClient, recorder),
|
||||
controlPlaneNodeLabelSelector: controlPlaneNodeLabelSelector,
|
||||
deploymentReplicas: deploymentReplicas,
|
||||
}
|
||||
|
||||
return factory.New().WithSync(controller.sync).
|
||||
@@ -179,6 +185,11 @@ func (n *klusterletController) sync(ctx context.Context, controllerContext facto
|
||||
return err
|
||||
}
|
||||
|
||||
replica := n.deploymentReplicas
|
||||
if replica <= 0 {
|
||||
replica = helpers.DetermineReplica(ctx, n.kubeClient, klusterlet.Spec.DeployOption.Mode, n.kubeVersion, n.controlPlaneNodeLabelSelector)
|
||||
}
|
||||
|
||||
config := klusterletConfig{
|
||||
KlusterletName: klusterlet.Name,
|
||||
KlusterletNamespace: helpers.KlusterletNamespace(klusterlet),
|
||||
@@ -192,7 +203,7 @@ func (n *klusterletController) sync(ctx context.Context, controllerContext facto
|
||||
HubKubeConfigSecret: helpers.HubKubeConfig,
|
||||
ExternalServerURL: getServersFromKlusterlet(klusterlet),
|
||||
OperatorNamespace: n.operatorNamespace,
|
||||
Replica: helpers.DetermineReplica(ctx, n.kubeClient, klusterlet.Spec.DeployOption.Mode, n.kubeVersion),
|
||||
Replica: replica,
|
||||
PriorityClassName: helpers.AgentPriorityClassName(klusterlet, n.kubeVersion),
|
||||
AppliedManifestWorkEvictionGracePeriod: getAppliedManifestWorkEvictionGracePeriod(klusterlet),
|
||||
|
||||
|
||||
@@ -25,7 +25,9 @@ import (
|
||||
)
|
||||
|
||||
type Options struct {
|
||||
SkipPlaceholderHubSecret bool
|
||||
SkipPlaceholderHubSecret bool
|
||||
ControlPlaneNodeLabelSelector string
|
||||
DeploymentReplicas int32
|
||||
}
|
||||
|
||||
// RunKlusterletOperator starts a new klusterlet operator
|
||||
@@ -96,6 +98,8 @@ func (o *Options) RunKlusterletOperator(ctx context.Context, controllerContext *
|
||||
workClient.WorkV1().AppliedManifestWorks(),
|
||||
kubeVersion,
|
||||
helpers.GetOperatorNamespace(),
|
||||
o.ControlPlaneNodeLabelSelector,
|
||||
o.DeploymentReplicas,
|
||||
controllerContext.EventRecorder,
|
||||
o.SkipPlaceholderHubSecret)
|
||||
|
||||
@@ -109,6 +113,8 @@ func (o *Options) RunKlusterletOperator(ctx context.Context, controllerContext *
|
||||
workClient.WorkV1().AppliedManifestWorks(),
|
||||
kubeVersion,
|
||||
helpers.GetOperatorNamespace(),
|
||||
o.ControlPlaneNodeLabelSelector,
|
||||
o.DeploymentReplicas,
|
||||
controllerContext.EventRecorder)
|
||||
|
||||
ssarController := ssarcontroller.NewKlusterletSSARController(
|
||||
|
||||
Reference in New Issue
Block a user