mirror of
https://github.com/open-cluster-management-io/ocm.git
synced 2026-02-14 18:09:57 +00:00
expose the spoke informers (#179)
Signed-off-by: Wei Liu <liuweixa@redhat.com>
This commit is contained in:
@@ -4,7 +4,6 @@ import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/openshift/library-go/pkg/controller/controllercmd"
|
||||
"github.com/spf13/pflag"
|
||||
apimachineryvalidation "k8s.io/apimachinery/pkg/api/validation"
|
||||
"k8s.io/client-go/rest"
|
||||
@@ -39,9 +38,9 @@ func (o *AgentOptions) AddFlags(flags *pflag.FlagSet) {
|
||||
}
|
||||
|
||||
// spokeKubeConfig builds kubeconfig for the spoke/managed cluster
|
||||
func (o *AgentOptions) SpokeKubeConfig(controllerContext *controllercmd.ControllerContext) (*rest.Config, error) {
|
||||
func (o *AgentOptions) SpokeKubeConfig(managedRestConfig *rest.Config) (*rest.Config, error) {
|
||||
if o.SpokeKubeconfigFile == "" {
|
||||
return controllerContext.KubeConfig, nil
|
||||
return managedRestConfig, nil
|
||||
}
|
||||
|
||||
spokeRestConfig, err := clientcmd.BuildConfigFromFlags("" /* leave masterurl as empty */, o.SpokeKubeconfigFile)
|
||||
|
||||
@@ -108,15 +108,11 @@ func NewSpokeAgentOptions() *SpokeAgentOptions {
|
||||
// create a valid hub kubeconfig. Once the hub kubeconfig is valid, the
|
||||
// temporary controller is stopped and the main controllers are started.
|
||||
func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext *controllercmd.ControllerContext) error {
|
||||
// create management kube client
|
||||
managementKubeClient, err := kubernetes.NewForConfig(controllerContext.KubeConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
kubeConfig := controllerContext.KubeConfig
|
||||
|
||||
// load spoke client config and create spoke clients,
|
||||
// the registration agent may not running in the spoke/managed cluster.
|
||||
spokeClientConfig, err := o.AgentOptions.SpokeKubeConfig(controllerContext)
|
||||
spokeClientConfig, err := o.AgentOptions.SpokeKubeConfig(kubeConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -126,8 +122,38 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
|
||||
return err
|
||||
}
|
||||
|
||||
spokeClusterClient, err := clusterv1client.NewForConfig(spokeClientConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return o.RunSpokeAgentWithSpokeInformers(
|
||||
ctx,
|
||||
kubeConfig,
|
||||
spokeClientConfig,
|
||||
spokeKubeClient,
|
||||
informers.NewSharedInformerFactory(spokeKubeClient, 10*time.Minute),
|
||||
clusterv1informers.NewSharedInformerFactory(spokeClusterClient, 10*time.Minute),
|
||||
controllerContext.EventRecorder,
|
||||
)
|
||||
}
|
||||
|
||||
func (o *SpokeAgentOptions) RunSpokeAgentWithSpokeInformers(ctx context.Context,
|
||||
kubeConfig, spokeClientConfig *rest.Config,
|
||||
spokeKubeClient kubernetes.Interface,
|
||||
spokeKubeInformerFactory informers.SharedInformerFactory,
|
||||
spokeClusterInformerFactory clusterv1informers.SharedInformerFactory,
|
||||
recorder events.Recorder) error {
|
||||
klog.Infof("Cluster name is %q and agent name is %q", o.AgentOptions.SpokeClusterName, o.AgentName)
|
||||
|
||||
// create management kube client
|
||||
managementKubeClient, err := kubernetes.NewForConfig(kubeConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// the hub kubeconfig secret stored in the cluster where the agent pod runs
|
||||
if err := o.Complete(managementKubeClient.CoreV1(), ctx, controllerContext.EventRecorder); err != nil {
|
||||
if err := o.Complete(managementKubeClient.CoreV1(), ctx, recorder); err != nil {
|
||||
klog.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -135,11 +161,6 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
|
||||
klog.Fatal(err)
|
||||
}
|
||||
|
||||
klog.Infof("Cluster name is %q and agent name is %q", o.AgentOptions.SpokeClusterName, o.AgentName)
|
||||
|
||||
// create shared informer factory for spoke cluster
|
||||
spokeKubeInformerFactory := informers.NewSharedInformerFactory(spokeKubeClient, 10*time.Minute)
|
||||
|
||||
// get spoke cluster CA bundle
|
||||
spokeClusterCABundle, err := o.getSpokeClusterCABundle(spokeClientConfig)
|
||||
if err != nil {
|
||||
@@ -168,7 +189,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
|
||||
o.AgentOptions.SpokeClusterName, o.SpokeExternalServerURLs,
|
||||
spokeClusterCABundle,
|
||||
bootstrapClusterClient,
|
||||
controllerContext.EventRecorder,
|
||||
recorder,
|
||||
)
|
||||
go spokeClusterCreatingController.Run(ctx, 1)
|
||||
|
||||
@@ -177,13 +198,13 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
|
||||
// the hub kubeconfig secret stored in the cluster where the agent pod runs
|
||||
managementKubeClient.CoreV1(),
|
||||
namespacedManagementKubeInformerFactory.Core().V1().Secrets(),
|
||||
controllerContext.EventRecorder,
|
||||
recorder,
|
||||
)
|
||||
go hubKubeconfigSecretController.Run(ctx, 1)
|
||||
go namespacedManagementKubeInformerFactory.Start(ctx.Done())
|
||||
|
||||
// check if there already exists a valid client config for hub
|
||||
ok, err := o.hasValidHubClientConfig()
|
||||
ok, err := o.hasValidHubClientConfig(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -221,7 +242,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
|
||||
o.ClientCertExpirationSeconds,
|
||||
managementKubeClient,
|
||||
registration.GenerateBootstrapStatusUpdater(),
|
||||
controllerContext.EventRecorder,
|
||||
recorder,
|
||||
controllerName,
|
||||
)
|
||||
|
||||
@@ -234,7 +255,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
|
||||
|
||||
// wait for the hub client config is ready.
|
||||
klog.Info("Waiting for hub client config and managed cluster to be ready")
|
||||
if err := wait.PollImmediateInfinite(1*time.Second, o.hasValidHubClientConfig); err != nil {
|
||||
if err := wait.PollUntilContextCancel(bootstrapCtx, 1*time.Second, true, o.hasValidHubClientConfig); err != nil {
|
||||
// TODO need run the bootstrap CSR forever to re-establish the client-cert if it is ever lost.
|
||||
stopBootstrap()
|
||||
return err
|
||||
@@ -273,7 +294,10 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
|
||||
}),
|
||||
)
|
||||
addOnInformerFactory := addoninformers.NewSharedInformerFactoryWithOptions(
|
||||
addOnClient, 10*time.Minute, addoninformers.WithNamespace(o.AgentOptions.SpokeClusterName))
|
||||
addOnClient,
|
||||
10*time.Minute,
|
||||
addoninformers.WithNamespace(o.AgentOptions.SpokeClusterName),
|
||||
)
|
||||
// create a cluster informer factory with name field selector because we just need to handle the current spoke cluster
|
||||
hubClusterInformerFactory := clusterv1informers.NewSharedInformerFactoryWithOptions(
|
||||
hubClusterClient,
|
||||
@@ -283,7 +307,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
|
||||
}),
|
||||
)
|
||||
|
||||
controllerContext.EventRecorder.Event("HubClientConfigReady", "Client config for hub is ready.")
|
||||
recorder.Event("HubClientConfigReady", "Client config for hub is ready.")
|
||||
|
||||
// create a kubeconfig with references to the key/cert files in the same secret
|
||||
kubeconfig := clientcert.BuildKubeconfig(hubClientConfig, clientcert.TLSCertFile, clientcert.TLSKeyFile)
|
||||
@@ -310,7 +334,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
|
||||
hubClusterClient,
|
||||
hubClusterInformerFactory.Cluster().V1().ManagedClusters().Lister(),
|
||||
o.AgentOptions.SpokeClusterName),
|
||||
controllerContext.EventRecorder,
|
||||
recorder,
|
||||
controllerName,
|
||||
)
|
||||
if err != nil {
|
||||
@@ -322,15 +346,9 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
|
||||
o.AgentOptions.SpokeClusterName,
|
||||
hubKubeClient,
|
||||
hubClusterInformerFactory.Cluster().V1().ManagedClusters(),
|
||||
controllerContext.EventRecorder,
|
||||
recorder,
|
||||
)
|
||||
|
||||
spokeClusterClient, err := clusterv1client.NewForConfig(spokeClientConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
spokeClusterInformerFactory := clusterv1informers.NewSharedInformerFactory(spokeClusterClient, 10*time.Minute)
|
||||
|
||||
// create NewManagedClusterStatusController to update the spoke cluster status
|
||||
managedClusterHealthCheckController := managedcluster.NewManagedClusterStatusController(
|
||||
o.AgentOptions.SpokeClusterName,
|
||||
@@ -341,7 +359,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
|
||||
spokeKubeInformerFactory.Core().V1().Nodes(),
|
||||
o.MaxCustomClusterClaims,
|
||||
o.ClusterHealthCheckPeriod,
|
||||
controllerContext.EventRecorder,
|
||||
recorder,
|
||||
)
|
||||
|
||||
var addOnLeaseController factory.Controller
|
||||
@@ -355,7 +373,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
|
||||
managementKubeClient.CoordinationV1(),
|
||||
spokeKubeClient.CoordinationV1(),
|
||||
AddOnLeaseControllerSyncInterval, //TODO: this interval time should be allowed to change from outside
|
||||
controllerContext.EventRecorder,
|
||||
recorder,
|
||||
)
|
||||
|
||||
addOnRegistrationController = addon.NewAddOnRegistrationController(
|
||||
@@ -367,15 +385,16 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
|
||||
spokeKubeClient,
|
||||
csrControl,
|
||||
addOnInformerFactory.Addon().V1alpha1().ManagedClusterAddOns(),
|
||||
controllerContext.EventRecorder,
|
||||
recorder,
|
||||
)
|
||||
}
|
||||
|
||||
go hubKubeInformerFactory.Start(ctx.Done())
|
||||
go hubClusterInformerFactory.Start(ctx.Done())
|
||||
go spokeKubeInformerFactory.Start(ctx.Done())
|
||||
go namespacedManagementKubeInformerFactory.Start(ctx.Done())
|
||||
go addOnInformerFactory.Start(ctx.Done())
|
||||
|
||||
go spokeKubeInformerFactory.Start(ctx.Done())
|
||||
if features.DefaultSpokeRegistrationMutableFeatureGate.Enabled(ocmfeature.ClusterClaim) {
|
||||
go spokeClusterInformerFactory.Start(ctx.Done())
|
||||
}
|
||||
@@ -489,7 +508,7 @@ func generateAgentName() string {
|
||||
// Normally, KubeconfigFile/TLSKeyFile/TLSCertFile will be created once the bootstrap process
|
||||
// completes. Changing the name of the cluster will make the existing hub kubeconfig invalid,
|
||||
// because certificate in TLSCertFile is issued to a specific cluster/agent.
|
||||
func (o *SpokeAgentOptions) hasValidHubClientConfig() (bool, error) {
|
||||
func (o *SpokeAgentOptions) hasValidHubClientConfig(ctx context.Context) (bool, error) {
|
||||
kubeconfigPath := path.Join(o.HubKubeconfigDir, clientcert.KubeconfigFile)
|
||||
if _, err := os.Stat(kubeconfigPath); os.IsNotExist(err) {
|
||||
klog.V(4).Infof("Kubeconfig file %q not found", kubeconfigPath)
|
||||
|
||||
@@ -308,7 +308,7 @@ func TestHasValidHubClientConfig(t *testing.T) {
|
||||
AgentName: c.agentName,
|
||||
HubKubeconfigDir: tempDir,
|
||||
}
|
||||
valid, err := options.hasValidHubClientConfig()
|
||||
valid, err := options.hasValidHubClientConfig(context.TODO())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
@@ -92,7 +92,7 @@ func (o *WorkloadAgentOptions) RunWorkloadAgent(ctx context.Context, controllerC
|
||||
|
||||
// load spoke client config and create spoke clients,
|
||||
// the work agent may not running in the spoke/managed cluster.
|
||||
spokeRestConfig, err := o.AgentOptions.SpokeKubeConfig(controllerContext)
|
||||
spokeRestConfig, err := o.AgentOptions.SpokeKubeConfig(controllerContext.KubeConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user