mirror of
https://github.com/open-cluster-management-io/ocm.git
synced 2026-05-21 00:24:08 +00:00
rewrite spoke cluster creating process
This commit is contained in:
@@ -22,6 +22,8 @@ import (
|
||||
"k8s.io/client-go/kubernetes"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"k8s.io/client-go/transport"
|
||||
certutil "k8s.io/client-go/util/cert"
|
||||
"k8s.io/klog"
|
||||
)
|
||||
|
||||
@@ -90,6 +92,39 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
|
||||
}
|
||||
spokeKubeInformerFactory := informers.NewSharedInformerFactory(spokeKubeClient, 10*time.Minute)
|
||||
|
||||
// load bootstrap client config
|
||||
bootstrapClientConfig, err := clientcmd.BuildConfigFromFlags("", o.BootstrapKubeconfig)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to load bootstrap kubeconfig from file %q: %w", o.BootstrapKubeconfig, err)
|
||||
}
|
||||
|
||||
// start a SpokeClusterCreatingController to make sure there is a spoke cluster on hub cluster
|
||||
// if the bootstrap client config is valid
|
||||
if isBootstrapClientConfigValid(bootstrapClientConfig) {
|
||||
bootstrapClusterClient, err := clusterv1client.NewForConfig(bootstrapClientConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
caBundle := controllerContext.KubeConfig.CAData
|
||||
if caBundle == nil && controllerContext.KubeConfig.CAFile != "" {
|
||||
data, err := ioutil.ReadFile(controllerContext.KubeConfig.CAFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to load CA data from file %q: %w", controllerContext.KubeConfig.CAFile, err)
|
||||
}
|
||||
caBundle = data
|
||||
}
|
||||
|
||||
spokeClusterCreatingController := spokecluster.NewSpokeClusterCreatingController(
|
||||
o.ClusterName, o.SpokeExternalServerUrl,
|
||||
caBundle,
|
||||
bootstrapClusterClient,
|
||||
controllerContext.EventRecorder,
|
||||
)
|
||||
|
||||
go spokeClusterCreatingController.Run(ctx, 1)
|
||||
}
|
||||
|
||||
// check if there already exists a valid client config for hub
|
||||
ok, err := o.hasValidHubClientConfig()
|
||||
if err != nil {
|
||||
@@ -103,19 +138,10 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
|
||||
// informer cache'
|
||||
if !ok {
|
||||
// create bootstrap client and shared informer factory from bootstrap hub kube config
|
||||
bootstrapClientConfig, err := clientcmd.BuildConfigFromFlags("", o.BootstrapKubeconfig)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to load bootstrap kubeconfig from file %q: %w", o.BootstrapKubeconfig, err)
|
||||
}
|
||||
bootstrapKubeClient, err := kubernetes.NewForConfig(bootstrapClientConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
bootstrapClusterClient, err := clusterv1client.NewForConfig(bootstrapClientConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
bootstrapInformerFactory := informers.NewSharedInformerFactory(bootstrapKubeClient, 10*time.Minute)
|
||||
|
||||
// create a ClientCertForHubController for spoke agent bootstrap
|
||||
@@ -130,53 +156,22 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
|
||||
"BootstrapClientCertForHubController",
|
||||
)
|
||||
|
||||
// create a SpokeClusterCreatingControlle to create a spoke cluster on hub cluster
|
||||
hasSpokeCluster := false
|
||||
setHasSpokeClusterFn := func(isSpokeClusterExists bool) {
|
||||
hasSpokeCluster = isSpokeClusterExists
|
||||
}
|
||||
|
||||
caBundle := controllerContext.KubeConfig.CAData
|
||||
if caBundle == nil && controllerContext.KubeConfig.CAFile != "" {
|
||||
data, err := ioutil.ReadFile(controllerContext.KubeConfig.CAFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to load CA data from file %q: %w", controllerContext.KubeConfig.CAFile, err)
|
||||
}
|
||||
caBundle = data
|
||||
}
|
||||
|
||||
// TODO there is a corner case, if the hub kubeconfig is ready, but the spoke cluster did not create, and the agent is
|
||||
// restarted, the agent will not be able to create spoke cluster again due to the bootstrap kubeconfig has been switched
|
||||
// to hub kubeconfig, in this case, the hub cluster admin need to create the spoke cluster on hub cluster manually
|
||||
spokeClusterCreatingController := spokecluster.NewSpokeClusterCreatingController(
|
||||
o.ClusterName, o.SpokeExternalServerUrl,
|
||||
caBundle,
|
||||
bootstrapClusterClient,
|
||||
setHasSpokeClusterFn,
|
||||
controllerContext.EventRecorder,
|
||||
)
|
||||
|
||||
bootstrapCtx, stopBootstrap := context.WithCancel(ctx)
|
||||
|
||||
go bootstrapInformerFactory.Start(bootstrapCtx.Done())
|
||||
go spokeKubeInformerFactory.Start(bootstrapCtx.Done())
|
||||
|
||||
go clientCertForHubController.Run(bootstrapCtx, 1)
|
||||
go spokeClusterCreatingController.Run(bootstrapCtx, 1)
|
||||
|
||||
// wait for the hub client config is ready and the spoke cluster is created.
|
||||
// wait for the hub client config is ready.
|
||||
klog.Info("Waiting for hub client config and spoke cluster to be ready")
|
||||
if err := wait.PollImmediateInfinite(1*time.Second, func() (bool, error) {
|
||||
hasValidHubClientConfig, err := o.hasValidHubClientConfig()
|
||||
return hasValidHubClientConfig && hasSpokeCluster, err
|
||||
}); err != nil {
|
||||
if err := wait.PollImmediateInfinite(1*time.Second, o.hasValidHubClientConfig); err != nil {
|
||||
// TODO need run the bootstrap CSR forever to re-establish the client-cert if it is ever lost.
|
||||
stopBootstrap()
|
||||
return err
|
||||
}
|
||||
|
||||
// stop the clientCertForHubController and spokeClusterCreatingController for bootstrap
|
||||
// once the hub client config and spoke cluster is ready
|
||||
// stop the clientCertForHubController for bootstrap once the hub client config is ready
|
||||
stopBootstrap()
|
||||
}
|
||||
|
||||
@@ -359,3 +354,28 @@ func (o *SpokeAgentOptions) getOrGenerateClusterAgentNames() (string, string) {
|
||||
|
||||
return clusterName, agentName
|
||||
}
|
||||
|
||||
func isBootstrapClientConfigValid(bootstrapClientConfig *restclient.Config) bool {
|
||||
transportConfig, err := bootstrapClientConfig.TransportConfig()
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
// has side effect of populating transport config data fields
|
||||
if _, err := transport.TLSConfigFor(transportConfig); err != nil {
|
||||
return false
|
||||
}
|
||||
certs, err := certutil.ParseCertsPEM(transportConfig.TLS.CertData)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
if len(certs) == 0 {
|
||||
return false
|
||||
}
|
||||
now := time.Now()
|
||||
for _, cert := range certs {
|
||||
if now.After(cert.NotAfter) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -11,16 +11,14 @@ import (
|
||||
"github.com/openshift/library-go/pkg/operator/events"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/klog"
|
||||
)
|
||||
|
||||
type setHasSpokeClusterFunc func(bool)
|
||||
|
||||
// spokeClusterCreatingController creates a spoke cluster on hub cluster during the spoke agent bootstrap phase
|
||||
type spokeClusterCreatingController struct {
|
||||
clusterName string
|
||||
spokeExternalServerUrl string
|
||||
spokeCABundle []byte
|
||||
setHasSpokeCluster setHasSpokeClusterFunc
|
||||
hubClusterClient clientset.Interface
|
||||
}
|
||||
|
||||
@@ -29,18 +27,16 @@ func NewSpokeClusterCreatingController(
|
||||
clusterName, spokeExternalServerUrl string,
|
||||
spokeCABundle []byte,
|
||||
hubClusterClient clientset.Interface,
|
||||
setHasSpokeClusterFn setHasSpokeClusterFunc,
|
||||
recorder events.Recorder) factory.Controller {
|
||||
c := &spokeClusterCreatingController{
|
||||
clusterName: clusterName,
|
||||
spokeExternalServerUrl: spokeExternalServerUrl,
|
||||
spokeCABundle: spokeCABundle,
|
||||
setHasSpokeCluster: setHasSpokeClusterFn,
|
||||
hubClusterClient: hubClusterClient,
|
||||
}
|
||||
return factory.New().
|
||||
WithSync(c.sync).
|
||||
ResyncEvery(5*time.Minute).
|
||||
ResyncEvery(60*time.Minute).
|
||||
ToController("SpokeClusterCreatingController", recorder)
|
||||
}
|
||||
|
||||
@@ -62,15 +58,19 @@ func (c *spokeClusterCreatingController) sync(ctx context.Context, syncCtx facto
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to create spoke cluster with name %q on hub: %w", c.clusterName, err)
|
||||
}
|
||||
c.setHasSpokeCluster(true)
|
||||
syncCtx.Recorder().Eventf("SpokeClusterCreated", "Spoke cluster %q created on hub", c.clusterName)
|
||||
return nil
|
||||
}
|
||||
|
||||
// the cluster client may be expired
|
||||
if errors.IsUnauthorized(err) {
|
||||
klog.V(4).Infof("unable to get the spoke cluster %q from hub: %v", c.clusterName, err)
|
||||
return nil
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.setHasSpokeCluster(true)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -18,13 +18,11 @@ func TestCreateSpokeCluster(t *testing.T) {
|
||||
name string
|
||||
startingObjects []runtime.Object
|
||||
validateActions func(t *testing.T, actions []clienttesting.Action)
|
||||
hasSpokeCluster bool
|
||||
expectedErr string
|
||||
}{
|
||||
{
|
||||
name: "create a new cluster",
|
||||
startingObjects: []runtime.Object{},
|
||||
hasSpokeCluster: true,
|
||||
validateActions: func(t *testing.T, actions []clienttesting.Action) {
|
||||
assertActions(t, actions, "get", "create")
|
||||
actual := actions[1].(clienttesting.CreateActionImpl).Object
|
||||
@@ -35,7 +33,6 @@ func TestCreateSpokeCluster(t *testing.T) {
|
||||
{
|
||||
name: "create an existed cluster",
|
||||
startingObjects: []runtime.Object{newSpokeCluster([]clusterv1.StatusCondition{})},
|
||||
hasSpokeCluster: true,
|
||||
validateActions: func(t *testing.T, actions []clienttesting.Action) {
|
||||
assertActions(t, actions, "get")
|
||||
},
|
||||
@@ -44,13 +41,11 @@ func TestCreateSpokeCluster(t *testing.T) {
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
hasSpokeCluster := false
|
||||
clusterClient := clusterfake.NewSimpleClientset(c.startingObjects...)
|
||||
ctrl := spokeClusterCreatingController{
|
||||
clusterName: testSpokeClusterName,
|
||||
spokeExternalServerUrl: testSpokeExternalServerUrl,
|
||||
spokeCABundle: []byte("testcabundle"),
|
||||
setHasSpokeCluster: func(isCreated bool) { hasSpokeCluster = isCreated },
|
||||
hubClusterClient: clusterClient,
|
||||
}
|
||||
|
||||
@@ -67,11 +62,6 @@ func TestCreateSpokeCluster(t *testing.T) {
|
||||
t.Errorf("unexpected err: %v", syncErr)
|
||||
}
|
||||
|
||||
if hasSpokeCluster != c.hasSpokeCluster {
|
||||
t.Errorf("expected %t error, but failed", c.hasSpokeCluster)
|
||||
return
|
||||
}
|
||||
|
||||
c.validateActions(t, clusterClient.Actions())
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user