rewrite spoke cluster creating process

This commit is contained in:
liuwei
2020-04-30 14:37:59 +08:00
parent fed340aca8
commit 6584baa5ab
3 changed files with 45 additions and 48 deletions

View File

@@ -101,7 +101,6 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
// exists a valid client config for hub or not, the controller will be started and then stopped immediately
// in scenario #2 and #3, which results in an error message in log: 'Observed a panic: timeout waiting for
// informer cache'
var stopBootstrap context.CancelFunc
if !ok {
// create bootstrap client and shared informer factory from bootstrap hub kube config
bootstrapClientConfig, err := clientcmd.BuildConfigFromFlags("", o.BootstrapKubeconfig)
@@ -133,6 +132,9 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
// create a SpokeClusterCreatingControlle to create a spoke cluster on hub cluster
hasSpokeCluster := false
setHasSpokeClusterFn := func(isSpokeClusterExists bool) {
hasSpokeCluster = isSpokeClusterExists
}
caBundle := controllerContext.KubeConfig.CAData
if caBundle == nil && controllerContext.KubeConfig.CAFile != "" {
@@ -150,12 +152,11 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
o.ClusterName, o.SpokeExternalServerUrl,
caBundle,
bootstrapClusterClient,
func(isCreated bool) { hasSpokeCluster = isCreated },
setHasSpokeClusterFn,
controllerContext.EventRecorder,
)
var bootstrapCtx context.Context
bootstrapCtx, stopBootstrap = context.WithCancel(ctx)
bootstrapCtx, stopBootstrap := context.WithCancel(ctx)
go bootstrapInformerFactory.Start(bootstrapCtx.Done())
go spokeKubeInformerFactory.Start(bootstrapCtx.Done())
@@ -163,22 +164,19 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
go clientCertForHubController.Run(bootstrapCtx, 1)
go spokeClusterCreatingController.Run(bootstrapCtx, 1)
// wait for the spoke cluster is created
wait.PollImmediateInfinite(1*time.Second, func() (bool, error) { return hasSpokeCluster, nil })
}
// wait for the client config for hub is ready
klog.Info("Waiting for client config for hub to be ready")
if err := wait.PollImmediateInfinite(1*time.Second, o.hasValidHubClientConfig); err != nil {
// TODO we need run the bootstrap CSR forever too to re-establish the client-cert if we ever lose it.
if stopBootstrap != nil {
// wait for the hub client config is ready and the spoke cluster is created.
klog.Info("Waiting for hub client config and spoke cluster to be ready")
if err := wait.PollImmediateInfinite(1*time.Second, func() (bool, error) {
hasValidHubClientConfig, err := o.hasValidHubClientConfig()
return hasValidHubClientConfig && hasSpokeCluster, err
}); err != nil {
// TODO need run the bootstrap CSR forever to re-establish the client-cert if it is ever lost.
stopBootstrap()
return err
}
return err
}
// stop the ClientCertForHubController for bootstrap once the client config for hub is ready
if stopBootstrap != nil {
// stop the clientCertForHubController and spokeClusterCreatingController for bootstrap
// once the hub client config and spoke cluster is ready
stopBootstrap()
}

View File

@@ -13,13 +13,14 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
type setHasSpokeClusterFunc func(bool)
// spokeClusterCreatingController creates a spoke cluster on hub cluster during the spoke agent bootstrap phase
type spokeClusterCreatingController struct {
isSpokeClusterExists bool
clusterName string
spokeExternalServerUrl string
spokeCABundle []byte
hasSpokeClusterFunc func(bool)
setHasSpokeCluster setHasSpokeClusterFunc
hubClusterClient clientset.Interface
}
@@ -28,13 +29,13 @@ func NewSpokeClusterCreatingController(
clusterName, spokeExternalServerUrl string,
spokeCABundle []byte,
hubClusterClient clientset.Interface,
hasSpokeClusterFunc func(bool),
setHasSpokeClusterFn setHasSpokeClusterFunc,
recorder events.Recorder) factory.Controller {
c := &spokeClusterCreatingController{
clusterName: clusterName,
spokeExternalServerUrl: spokeExternalServerUrl,
spokeCABundle: spokeCABundle,
hasSpokeClusterFunc: hasSpokeClusterFunc,
setHasSpokeCluster: setHasSpokeClusterFn,
hubClusterClient: hubClusterClient,
}
return factory.New().
@@ -44,34 +45,32 @@ func NewSpokeClusterCreatingController(
}
func (c *spokeClusterCreatingController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
if c.isSpokeClusterExists {
return nil
}
spokeCluster := &clusterv1.SpokeCluster{
ObjectMeta: metav1.ObjectMeta{
Name: c.clusterName,
},
Spec: clusterv1.SpokeClusterSpec{
SpokeClientConfig: clusterv1.ClientConfig{
URL: c.spokeExternalServerUrl,
CABundle: c.spokeCABundle,
_, err := c.hubClusterClient.ClusterV1().SpokeClusters().Get(ctx, c.clusterName, metav1.GetOptions{})
if errors.IsNotFound(err) {
spokeCluster := &clusterv1.SpokeCluster{
ObjectMeta: metav1.ObjectMeta{
Name: c.clusterName,
},
},
}
_, err := c.hubClusterClient.ClusterV1().SpokeClusters().Create(ctx, spokeCluster, metav1.CreateOptions{})
if errors.IsAlreadyExists(err) {
c.isSpokeClusterExists = true
c.hasSpokeClusterFunc(c.isSpokeClusterExists)
Spec: clusterv1.SpokeClusterSpec{
SpokeClientConfig: clusterv1.ClientConfig{
URL: c.spokeExternalServerUrl,
CABundle: c.spokeCABundle,
},
},
}
_, err := c.hubClusterClient.ClusterV1().SpokeClusters().Create(ctx, spokeCluster, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("unable to create spoke cluster with name %q on hub: %w", c.clusterName, err)
}
c.setHasSpokeCluster(true)
syncCtx.Recorder().Eventf("SpokeClusterCreated", "Spoke cluster %q created on hub", c.clusterName)
return nil
}
if err != nil {
return fmt.Errorf("unable to create spoke cluster with name %q on hub: %w", c.clusterName, err)
return err
}
c.isSpokeClusterExists = true
c.hasSpokeClusterFunc(c.isSpokeClusterExists)
syncCtx.Recorder().Eventf("SpokeClusterCreated", "Spoke cluster %q created on hub", c.clusterName)
c.setHasSpokeCluster(true)
return nil
}

View File

@@ -26,8 +26,8 @@ func TestCreateSpokeCluster(t *testing.T) {
startingObjects: []runtime.Object{},
hasSpokeCluster: true,
validateActions: func(t *testing.T, actions []clienttesting.Action) {
assertActions(t, actions, "create")
actual := actions[0].(clienttesting.CreateActionImpl).Object
assertActions(t, actions, "get", "create")
actual := actions[1].(clienttesting.CreateActionImpl).Object
assertSpokeExternalServerUrl(t, actual, testSpokeExternalServerUrl)
assertSpokeCABundle(t, actual, []byte("testcabundle"))
},
@@ -37,7 +37,7 @@ func TestCreateSpokeCluster(t *testing.T) {
startingObjects: []runtime.Object{newSpokeCluster([]clusterv1.StatusCondition{})},
hasSpokeCluster: true,
validateActions: func(t *testing.T, actions []clienttesting.Action) {
assertActions(t, actions, "create")
assertActions(t, actions, "get")
},
},
}
@@ -50,7 +50,7 @@ func TestCreateSpokeCluster(t *testing.T) {
clusterName: testSpokeClusterName,
spokeExternalServerUrl: testSpokeExternalServerUrl,
spokeCABundle: []byte("testcabundle"),
hasSpokeClusterFunc: func(isCreated bool) { hasSpokeCluster = isCreated },
setHasSpokeCluster: func(isCreated bool) { hasSpokeCluster = isCreated },
hubClusterClient: clusterClient,
}