mirror of
https://github.com/open-cluster-management-io/ocm.git
synced 2026-05-16 22:27:34 +00:00
Merge pull request #15 from skeeey/agent-controller
Resolve comments on PR #11 and #8
This commit is contained in:
@@ -34,7 +34,7 @@ type csrApprovingController struct {
|
||||
func NewCSRApprovingController(kubeClient kubernetes.Interface, csrInformer factory.Informer, recorder events.Recorder) factory.Controller {
|
||||
c := &csrApprovingController{
|
||||
kubeClient: kubeClient,
|
||||
eventRecorder: recorder.WithComponentSuffix("csr-controller"),
|
||||
eventRecorder: recorder.WithComponentSuffix("csr-approving-controller"),
|
||||
}
|
||||
return factory.New().
|
||||
WithInformersQueueKeyFunc(func(obj runtime.Object) string {
|
||||
@@ -42,7 +42,7 @@ func NewCSRApprovingController(kubeClient kubernetes.Interface, csrInformer fact
|
||||
return accessor.GetName()
|
||||
}, csrInformer).
|
||||
WithSync(c.sync).
|
||||
ToController("CSRController", recorder)
|
||||
ToController("CSRApprovingController", recorder)
|
||||
}
|
||||
|
||||
func (c *csrApprovingController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
|
||||
@@ -74,6 +74,7 @@ func (c *csrApprovingController) sync(ctx context.Context, syncCtx factory.SyncC
|
||||
return err
|
||||
}
|
||||
if !allowed {
|
||||
//TODO find a way to avoid looking at this CSR again.
|
||||
klog.V(4).Infof("Spoke cluster csr %q cannont be auto approved due to subject access review was not approved", csr.Name)
|
||||
return nil
|
||||
}
|
||||
@@ -81,7 +82,7 @@ func (c *csrApprovingController) sync(ctx context.Context, syncCtx factory.SyncC
|
||||
// Auto approve the spoke cluster csr
|
||||
csr.Status.Conditions = append(csr.Status.Conditions, certificatesv1beta1.CertificateSigningRequestCondition{
|
||||
Type: certificatesv1beta1.CertificateApproved,
|
||||
Reason: "AutoApproved",
|
||||
Reason: "AutoApprovedByHubCSRApprovingController",
|
||||
Message: "Auto approving spoke cluster agent certificate after SubjectAccessReview.",
|
||||
})
|
||||
_, err = c.kubeClient.CertificatesV1beta1().CertificateSigningRequests().UpdateApproval(ctx, csr, metav1.UpdateOptions{})
|
||||
@@ -165,14 +166,13 @@ func isSpokeClusterClientCertRenewal(csr *certificatesv1beta1.CertificateSigning
|
||||
|
||||
// Check whether a CSR is in terminal state
|
||||
func isCSRInTerminalState(status *certificatesv1beta1.CertificateSigningRequestStatus) bool {
|
||||
approved, denied := false, false
|
||||
for _, c := range status.Conditions {
|
||||
if c.Type == certificatesv1beta1.CertificateApproved {
|
||||
approved = true
|
||||
return true
|
||||
}
|
||||
if c.Type == certificatesv1beta1.CertificateDenied {
|
||||
denied = true
|
||||
return true
|
||||
}
|
||||
}
|
||||
return approved || denied
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -41,35 +41,35 @@ func TestSync(t *testing.T) {
|
||||
name: "sync a deleted csr",
|
||||
startingCSRs: []runtime.Object{},
|
||||
validateActions: func(t *testing.T, actions []clienttesting.Action) {
|
||||
assertAction(t, actions, "get")
|
||||
assertActions(t, actions, "get")
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "sync a denied csr",
|
||||
startingCSRs: []runtime.Object{newDeniedCSR()},
|
||||
validateActions: func(t *testing.T, actions []clienttesting.Action) {
|
||||
assertAction(t, actions, "get")
|
||||
assertActions(t, actions, "get")
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "sync an approved csr",
|
||||
startingCSRs: []runtime.Object{newApprovedCSR()},
|
||||
validateActions: func(t *testing.T, actions []clienttesting.Action) {
|
||||
assertAction(t, actions, "get")
|
||||
assertActions(t, actions, "get")
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "sync an invalid csr",
|
||||
startingCSRs: []runtime.Object{newInvalidCSR()},
|
||||
validateActions: func(t *testing.T, actions []clienttesting.Action) {
|
||||
assertAction(t, actions, "get")
|
||||
assertActions(t, actions, "get")
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "deny an auto approving csr",
|
||||
startingCSRs: []runtime.Object{newRenewalCSR()},
|
||||
validateActions: func(t *testing.T, actions []clienttesting.Action) {
|
||||
assertAction(t, actions, "get", "create")
|
||||
assertActions(t, actions, "get", "create")
|
||||
assertSubjectAccessReviewCreated(t, actions[1].(clienttesting.CreateActionImpl).Object)
|
||||
},
|
||||
},
|
||||
@@ -78,8 +78,8 @@ func TestSync(t *testing.T) {
|
||||
startingCSRs: []runtime.Object{newRenewalCSR()},
|
||||
autoApprovingAllowed: true,
|
||||
validateActions: func(t *testing.T, actions []clienttesting.Action) {
|
||||
assertAction(t, actions, "get", "create", "update")
|
||||
assertCondition(t, actions[2].(clienttesting.UpdateActionImpl).Object, certificatesv1beta1.CertificateApproved, "AutoApproved")
|
||||
assertActions(t, actions, "get", "create", "update")
|
||||
assertCondition(t, actions[2].(clienttesting.UpdateActionImpl).Object, certificatesv1beta1.CertificateApproved, "AutoApprovedByHubCSRApprovingController")
|
||||
},
|
||||
},
|
||||
}
|
||||
@@ -249,7 +249,7 @@ func newApprovedCSR() *certificatesv1beta1.CertificateSigningRequest {
|
||||
return csr
|
||||
}
|
||||
|
||||
func assertAction(t *testing.T, actualActions []clienttesting.Action, expectedActions ...string) {
|
||||
func assertActions(t *testing.T, actualActions []clienttesting.Action, expectedActions ...string) {
|
||||
if len(actualActions) != len(expectedActions) {
|
||||
t.Errorf("expected %d call but got: %#v", len(expectedActions), actualActions)
|
||||
}
|
||||
|
||||
@@ -27,7 +27,7 @@ func RunControllerManager(ctx context.Context, controllerContext *controllercmd.
|
||||
}
|
||||
|
||||
clusterInformers := clusterv1informers.NewSharedInformerFactory(clusterClient, 10*time.Minute)
|
||||
csrInformers := kubeinformers.NewSharedInformerFactory(kubeClient, 10*time.Minute)
|
||||
kubeInfomers := kubeinformers.NewSharedInformerFactory(kubeClient, 10*time.Minute)
|
||||
|
||||
spokeClusterController := spokecluster.NewSpokeClusterController(
|
||||
kubeClient,
|
||||
@@ -38,12 +38,12 @@ func RunControllerManager(ctx context.Context, controllerContext *controllercmd.
|
||||
|
||||
csrController := csr.NewCSRApprovingController(
|
||||
kubeClient,
|
||||
csrInformers.Certificates().V1beta1().CertificateSigningRequests().Informer(),
|
||||
kubeInfomers.Certificates().V1beta1().CertificateSigningRequests().Informer(),
|
||||
controllerContext.EventRecorder,
|
||||
)
|
||||
|
||||
go clusterInformers.Start(ctx.Done())
|
||||
go csrInformers.Start(ctx.Done())
|
||||
go kubeInfomers.Start(ctx.Done())
|
||||
|
||||
go spokeClusterController.Run(ctx, 1)
|
||||
go csrController.Run(ctx, 1)
|
||||
|
||||
@@ -65,7 +65,7 @@ type ClientCertForHubController struct {
|
||||
csrName string
|
||||
// keyData is the private key data used to created a csr
|
||||
// csrName and keyData store the internal state of the controller. They are set after controller creates a new csr
|
||||
// and cleared once the csr is approved and processed by controller. There are 4 combination of thier values:
|
||||
// and cleared once the csr is approved and processed by controller. There are 4 combination of their values:
|
||||
// 1. csrName empty, keyData empty: means we aren't trying to create a new client cert, our current one is valid
|
||||
// 2. csrName set, keyData empty: there was bug
|
||||
// 3. csrName set, keyData set: we are waiting for a new cert to be signed.
|
||||
@@ -74,10 +74,14 @@ type ClientCertForHubController struct {
|
||||
}
|
||||
|
||||
// NewClientCertForHubController return a ClientCertForHubController
|
||||
func NewClientCertForHubController(clusterName, agentName, hubKubeconfigSecretNamespace,
|
||||
kubeconfigSecretName string, hubClientConfig *restclient.Config, spokeCoreClient corev1client.CoreV1Interface,
|
||||
hubCSRClient csrclient.CertificateSigningRequestInterface, hubCSRInformer certificatesinformers.CertificateSigningRequestInformer,
|
||||
spokeSecretInformer corev1informers.SecretInformer, recorder events.Recorder, controllerNameOverride string) (factory.Controller, error) {
|
||||
func NewClientCertForHubController(
|
||||
clusterName, agentName, hubKubeconfigSecretNamespace, kubeconfigSecretName string,
|
||||
hubClientConfig *restclient.Config,
|
||||
spokeCoreClient corev1client.CoreV1Interface,
|
||||
hubCSRClient csrclient.CertificateSigningRequestInterface,
|
||||
hubCSRInformer certificatesinformers.CertificateSigningRequestInformer,
|
||||
spokeSecretInformer corev1informers.SecretInformer,
|
||||
recorder events.Recorder, controllerName string) factory.Controller {
|
||||
c := &ClientCertForHubController{
|
||||
clusterName: clusterName,
|
||||
agentName: agentName,
|
||||
@@ -90,16 +94,11 @@ func NewClientCertForHubController(clusterName, agentName, hubKubeconfigSecretNa
|
||||
spokeCoreClient: spokeCoreClient,
|
||||
}
|
||||
|
||||
controllerName := "ClientCertForHubController"
|
||||
if controllerNameOverride != "" {
|
||||
controllerName = controllerNameOverride
|
||||
}
|
||||
|
||||
return factory.New().
|
||||
WithInformers(hubCSRInformer.Informer(), spokeSecretInformer.Informer()).
|
||||
WithSync(c.sync).
|
||||
ResyncEvery(5*time.Minute).
|
||||
ToController(controllerName, recorder), nil
|
||||
ToController(controllerName, recorder)
|
||||
}
|
||||
|
||||
func (c *ClientCertForHubController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
|
||||
|
||||
@@ -158,7 +158,7 @@ func TestSyncWithoutHubKubeconfigSecret(t *testing.T) {
|
||||
t.Errorf("expected agent name %q but got: %s", agentName, string(secret.Data[AgentNameFile]))
|
||||
}
|
||||
|
||||
// checkc if there is new csr created
|
||||
// check if there is new csr created
|
||||
csrs, err := fakeKubeClient.CertificatesV1beta1().CertificateSigningRequests().List(context.Background(), metav1.ListOptions{})
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
|
||||
@@ -15,13 +15,11 @@ import (
|
||||
"github.com/open-cluster-management/registration/pkg/spoke/spokecluster"
|
||||
"github.com/openshift/library-go/pkg/controller/controllercmd"
|
||||
"github.com/spf13/pflag"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
utilrand "k8s.io/apimachinery/pkg/util/rand"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"k8s.io/klog"
|
||||
@@ -36,13 +34,13 @@ const (
|
||||
|
||||
// SpokeAgentOptions holds configuration for spoke cluster agent
|
||||
type SpokeAgentOptions struct {
|
||||
ComponentNamespace string
|
||||
ClusterName string
|
||||
AgentName string
|
||||
BootstrapKubeconfig string
|
||||
HubKubeconfigSecret string
|
||||
HubKubeconfigDir string
|
||||
SpokeServerUrl string
|
||||
ComponentNamespace string
|
||||
ClusterName string
|
||||
AgentName string
|
||||
BootstrapKubeconfig string
|
||||
HubKubeconfigSecret string
|
||||
HubKubeconfigDir string
|
||||
SpokeExternalServerUrl string
|
||||
}
|
||||
|
||||
// NewSpokeAgentOptions returns a SpokeAgentOptions
|
||||
@@ -110,26 +108,52 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to load bootstrap kubeconfig from file %q: %w", o.BootstrapKubeconfig, err)
|
||||
}
|
||||
bootstrapClient, err := kubernetes.NewForConfig(bootstrapClientConfig)
|
||||
bootstrapKubeClient, err := kubernetes.NewForConfig(bootstrapClientConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
bootstrapInformerFactory := informers.NewSharedInformerFactory(bootstrapClient, 10*time.Minute)
|
||||
bootstrapClusterClient, err := clusterv1client.NewForConfig(bootstrapClientConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
bootstrapInformerFactory := informers.NewSharedInformerFactory(bootstrapKubeClient, 10*time.Minute)
|
||||
|
||||
// create a ClientCertForHubController for spoke agent bootstrap
|
||||
clientCertForHubController, err := hubclientcert.NewClientCertForHubController(
|
||||
clientCertForHubController := hubclientcert.NewClientCertForHubController(
|
||||
o.ClusterName, o.AgentName, o.ComponentNamespace, o.HubKubeconfigSecret,
|
||||
restclient.AnonymousClientConfig(bootstrapClientConfig),
|
||||
spokeKubeClient.CoreV1(),
|
||||
bootstrapClient.CertificatesV1beta1().CertificateSigningRequests(),
|
||||
bootstrapKubeClient.CertificatesV1beta1().CertificateSigningRequests(),
|
||||
bootstrapInformerFactory.Certificates().V1beta1().CertificateSigningRequests(),
|
||||
spokeKubeInformerFactory.Core().V1().Secrets(),
|
||||
controllerContext.EventRecorder,
|
||||
"BoostrapClientCertForHubController",
|
||||
"BootstrapClientCertForHubController",
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
// create a SpokeClusterCreatingControlle to create a spoke cluster on hub cluster
|
||||
hasSpokeCluster := false
|
||||
|
||||
caBundle := controllerContext.KubeConfig.CAData
|
||||
if caBundle == nil && controllerContext.KubeConfig.CAFile != "" {
|
||||
data, err := ioutil.ReadFile(controllerContext.KubeConfig.CAFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to load CA data from file %q: %w", controllerContext.KubeConfig.CAFile, err)
|
||||
}
|
||||
caBundle = data
|
||||
}
|
||||
|
||||
// TODO there is a corner case, if the hub kubeconfig is ready, but the spoke cluster did not create, and the agent is
|
||||
// restarted, the agent will not be able to create spoke cluster again due to the bootstrap kubeconfig has been switched
|
||||
// to hub kubeconfig, in this case, the hub cluster admin need to create the spoke cluster on hub cluster manually
|
||||
spokeClusterCreatingController := spokecluster.NewSpokeClusterCreatingController(
|
||||
o.ClusterName, o.SpokeExternalServerUrl,
|
||||
caBundle,
|
||||
bootstrapClusterClient,
|
||||
func(isCreated bool) { hasSpokeCluster = isCreated },
|
||||
controllerContext.EventRecorder,
|
||||
)
|
||||
|
||||
var bootstrapCtx context.Context
|
||||
bootstrapCtx, stopBootstrap = context.WithCancel(ctx)
|
||||
|
||||
@@ -137,12 +161,16 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
|
||||
go spokeKubeInformerFactory.Start(bootstrapCtx.Done())
|
||||
|
||||
go clientCertForHubController.Run(bootstrapCtx, 1)
|
||||
go spokeClusterCreatingController.Run(bootstrapCtx, 1)
|
||||
|
||||
// wait for the spoke cluster is created
|
||||
wait.PollImmediateInfinite(1*time.Second, func() (bool, error) { return hasSpokeCluster, nil })
|
||||
}
|
||||
|
||||
// wait for the client config for hub is ready
|
||||
klog.Info("Waiting for client config for hub to be ready")
|
||||
err = wait.PollImmediateInfinite(1*time.Second, o.hasValidHubClientConfig)
|
||||
if err != nil {
|
||||
if err := wait.PollImmediateInfinite(1*time.Second, o.hasValidHubClientConfig); err != nil {
|
||||
// TODO we need run the bootstrap CSR forever too to re-establish the client-cert if we ever lose it.
|
||||
if stopBootstrap != nil {
|
||||
stopBootstrap()
|
||||
}
|
||||
@@ -176,7 +204,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
|
||||
controllerContext.EventRecorder.Event("HubClientConfigReady", "Client config for hub is ready.")
|
||||
|
||||
// create another ClientCertForHubController for client certificate rotation
|
||||
clientCertForHubController, err := hubclientcert.NewClientCertForHubController(
|
||||
clientCertForHubController := hubclientcert.NewClientCertForHubController(
|
||||
o.ClusterName, o.AgentName, o.ComponentNamespace, o.HubKubeconfigSecret,
|
||||
restclient.AnonymousClientConfig(hubClientConfig),
|
||||
spokeKubeClient.CoreV1(),
|
||||
@@ -184,30 +212,12 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
|
||||
hubKubeInformerFactory.Certificates().V1beta1().CertificateSigningRequests(),
|
||||
spokeKubeInformerFactory.Core().V1().Secrets(),
|
||||
controllerContext.EventRecorder,
|
||||
"",
|
||||
"ClientCertForHubController",
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// create SpokeClusterController to reconcile instances of SpokeCluster on the spoke cluster
|
||||
caBundle := controllerContext.KubeConfig.CAData
|
||||
if caBundle == nil && controllerContext.KubeConfig.CAFile != "" {
|
||||
data, err := ioutil.ReadFile(controllerContext.KubeConfig.CAFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to load CA data from file %q: %w", controllerContext.KubeConfig.CAFile, err)
|
||||
}
|
||||
caBundle = data
|
||||
}
|
||||
|
||||
spokeServerURL, err := o.getSpokeServerURL(spokeKubeClient.CoreV1())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
spokeClusterController := spokecluster.NewSpokeClusterController(
|
||||
o.ClusterName, spokeServerURL,
|
||||
caBundle,
|
||||
o.ClusterName,
|
||||
hubClusterClient,
|
||||
hubClusterInformerFactory.Cluster().V1().SpokeClusters(),
|
||||
spokeKubeClient.Discovery(),
|
||||
@@ -236,7 +246,7 @@ func (o *SpokeAgentOptions) AddFlags(fs *pflag.FlagSet) {
|
||||
"The name of secret in component namespace storing kubeconfig for hub.")
|
||||
fs.StringVar(&o.HubKubeconfigDir, "hub-kubeconfig-dir", o.HubKubeconfigDir,
|
||||
"The mount path of hub-kubeconfig-secret in the container.")
|
||||
fs.StringVar(&o.SpokeServerUrl, "spoke-server-url", o.SpokeServerUrl,
|
||||
fs.StringVar(&o.SpokeExternalServerUrl, "spoke-external-server-url", o.SpokeExternalServerUrl,
|
||||
"A reachable URL of spoke cluster api server for hub cluster.")
|
||||
}
|
||||
|
||||
@@ -254,6 +264,10 @@ func (o *SpokeAgentOptions) Validate() error {
|
||||
return errors.New("agent name is empty")
|
||||
}
|
||||
|
||||
if o.SpokeExternalServerUrl == "" {
|
||||
return errors.New("spoke cluster external api server url is empty")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -347,32 +361,3 @@ func (o *SpokeAgentOptions) getOrGenerateClusterAgentNames() (string, string) {
|
||||
|
||||
return clusterName, agentName
|
||||
}
|
||||
|
||||
// getSpokeServerURL returns the api server url of spoke cluster
|
||||
func (o *SpokeAgentOptions) getSpokeServerURL(spokeCoreClient corev1client.CoreV1Interface) (string, error) {
|
||||
// use user specified spoke server url if exists
|
||||
if o.SpokeServerUrl != "" {
|
||||
return o.SpokeServerUrl, nil
|
||||
}
|
||||
|
||||
// otherwise try to get it from the default/kubernetes endpoints
|
||||
klog.Infof("Try to get spoke server URL from default/kubernetes endpoints")
|
||||
kubeEndpoints, err := spokeCoreClient.Endpoints("default").Get(context.TODO(), "kubernetes", metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("unable to get server URL from default/kubernetes endpoint: %w", err)
|
||||
}
|
||||
|
||||
if len(kubeEndpoints.Subsets) == 0 {
|
||||
return "", fmt.Errorf("unable to get server URL from default/kubernetes endpoint: no subsets")
|
||||
}
|
||||
|
||||
if len(kubeEndpoints.Subsets[0].Addresses) == 0 {
|
||||
return "", fmt.Errorf("unable to get server URL from default/kubernetes endpoint: no addresses")
|
||||
}
|
||||
|
||||
if len(kubeEndpoints.Subsets[0].Ports) == 0 {
|
||||
return "", fmt.Errorf("unable to get server URL from default/kubernetes endpoint: no ports")
|
||||
}
|
||||
|
||||
return fmt.Sprintf("%s:%d", kubeEndpoints.Subsets[0].Addresses[0].IP, kubeEndpoints.Subsets[0].Ports[0].Port), nil
|
||||
}
|
||||
|
||||
@@ -9,7 +9,6 @@ import (
|
||||
|
||||
"github.com/openshift/library-go/pkg/controller/factory"
|
||||
"github.com/openshift/library-go/pkg/operator/events"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
@@ -27,8 +26,6 @@ import (
|
||||
// spokeClusterController reconciles instances of SpokeCluster on the spoke cluster.
|
||||
type spokeClusterController struct {
|
||||
clusterName string
|
||||
spokeServerURL string
|
||||
spokeCABundle []byte
|
||||
hubClusterClient clientset.Interface
|
||||
hubClusterLister clusterv1listers.SpokeClusterLister
|
||||
spokeDiscoveryClient discovery.DiscoveryInterface
|
||||
@@ -37,8 +34,7 @@ type spokeClusterController struct {
|
||||
|
||||
// NewSpokeClusterController creates a new spoke cluster controller on the spoke cluster.
|
||||
func NewSpokeClusterController(
|
||||
clusterName, spokeServerURL string,
|
||||
spokeCABundle []byte,
|
||||
clusterName string,
|
||||
hubClusterClient clientset.Interface,
|
||||
hubSpokeClusterInformer clusterv1informer.SpokeClusterInformer,
|
||||
spokeDiscoveryClient discovery.DiscoveryInterface,
|
||||
@@ -46,25 +42,25 @@ func NewSpokeClusterController(
|
||||
recorder events.Recorder) factory.Controller {
|
||||
c := &spokeClusterController{
|
||||
clusterName: clusterName,
|
||||
spokeServerURL: spokeServerURL,
|
||||
spokeCABundle: spokeCABundle,
|
||||
hubClusterClient: hubClusterClient,
|
||||
hubClusterLister: hubSpokeClusterInformer.Lister(),
|
||||
spokeDiscoveryClient: spokeDiscoveryClient,
|
||||
spokeNodeLister: spokeNodeInformer.Lister(),
|
||||
}
|
||||
|
||||
return factory.New().
|
||||
// TODO need to have conditional node capacity recalculation based on number of nodes here and decoupling
|
||||
// the node capacity calculation to another controller.
|
||||
WithInformers(hubSpokeClusterInformer.Informer(), spokeNodeInformer.Informer()).
|
||||
WithSync(c.sync).
|
||||
ResyncEvery(5*time.Minute).
|
||||
ToController("SpokeClusterController", recorder)
|
||||
}
|
||||
|
||||
// sync maintains the spoke-side status of a SpokeCluster, it maintains the SpokeClusterJoined condition according to
|
||||
// the value of the SpokeClusterHubAccepted condition and ensures resource and version are up to date.
|
||||
func (c spokeClusterController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
|
||||
spokeCluster, err := c.hubClusterLister.Get(c.clusterName)
|
||||
if errors.IsNotFound(err) {
|
||||
return c.createSpokeCluster(ctx, syncCtx)
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to get spoke cluster with name %q from hub: %w", c.clusterName, err)
|
||||
}
|
||||
@@ -72,7 +68,7 @@ func (c spokeClusterController) sync(ctx context.Context, syncCtx factory.SyncCo
|
||||
// current spoke cluster is not accepted, do nothing.
|
||||
acceptedCondition := helpers.FindSpokeClusterCondition(spokeCluster.Status.Conditions, clusterv1.SpokeClusterConditionHubAccepted)
|
||||
if !helpers.IsConditionTrue(acceptedCondition) {
|
||||
klog.V(4).Infof("Spoke cluster %q is not accepted by hub yet", c.clusterName)
|
||||
syncCtx.Recorder().Eventf("SpokeClusterIsNotAccepted", "Spoke cluster %q is not accepted by hub yet", c.clusterName)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -119,27 +115,6 @@ func (c spokeClusterController) sync(ctx context.Context, syncCtx factory.SyncCo
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *spokeClusterController) createSpokeCluster(ctx context.Context, syncCtx factory.SyncContext) error {
|
||||
spokeCluster := &clusterv1.SpokeCluster{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: c.clusterName,
|
||||
},
|
||||
Spec: clusterv1.SpokeClusterSpec{
|
||||
SpokeClientConfig: clusterv1.ClientConfig{
|
||||
URL: c.spokeServerURL,
|
||||
CABundle: c.spokeCABundle,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if _, err := c.hubClusterClient.ClusterV1().SpokeClusters().Create(ctx, spokeCluster, metav1.CreateOptions{}); err != nil {
|
||||
return fmt.Errorf("unable to create spoke cluster with name %q on hub: %w", c.clusterName, err)
|
||||
}
|
||||
|
||||
syncCtx.Recorder().Eventf("SpokeClusterCreated", "Spoke cluster %q created on hub", c.clusterName)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *spokeClusterController) getSpokeVersion() (*clusterv1.SpokeVersion, error) {
|
||||
serverVersion, err := c.spokeDiscoveryClient.ServerVersion()
|
||||
if err != nil {
|
||||
@@ -167,15 +142,15 @@ func (c *spokeClusterController) getClusterResources() (capacity, allocatable cl
|
||||
|
||||
return clusterv1.ResourceList{
|
||||
clusterv1.ResourceCPU: cpuCapacity,
|
||||
clusterv1.ResourceMemory: formatQuatityToMi(memoryCapacity),
|
||||
clusterv1.ResourceMemory: formatQuantityToMi(memoryCapacity),
|
||||
},
|
||||
clusterv1.ResourceList{
|
||||
clusterv1.ResourceCPU: cpuAllocatable,
|
||||
clusterv1.ResourceMemory: formatQuatityToMi(memoryAllocatable),
|
||||
clusterv1.ResourceMemory: formatQuantityToMi(memoryAllocatable),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func formatQuatityToMi(q resource.Quantity) resource.Quantity {
|
||||
func formatQuantityToMi(q resource.Quantity) resource.Quantity {
|
||||
raw, _ := q.AsInt64()
|
||||
raw /= (1024 * 1024)
|
||||
rq, err := resource.ParseQuantity(fmt.Sprintf("%dMi", raw))
|
||||
|
||||
@@ -37,12 +37,11 @@ func TestSyncSpokeCluster(t *testing.T) {
|
||||
name: "sync no spoke cluster",
|
||||
startingObjects: []runtime.Object{},
|
||||
validateActions: func(t *testing.T, actions []clienttesting.Action) {
|
||||
if len(actions) != 1 {
|
||||
t.Errorf("expected 1 call but got: %#v", actions)
|
||||
if len(actions) != 0 {
|
||||
t.Errorf("expected 0 call but got: %#v", actions)
|
||||
}
|
||||
assertAction(t, actions[0], "create")
|
||||
assertSpokeCluster(t, actions[0].(clienttesting.CreateActionImpl).Object, testSpokeClusterName)
|
||||
},
|
||||
expectedErr: "unable to get spoke cluster with name \"testspokecluster\" from hub: spokecluster.cluster.open-cluster-management.io \"testspokecluster\" not found",
|
||||
},
|
||||
{
|
||||
name: "sync an unaccepted spoke cluster",
|
||||
@@ -58,10 +57,7 @@ func TestSyncSpokeCluster(t *testing.T) {
|
||||
startingObjects: []runtime.Object{newAcceptedSpokeCluster()},
|
||||
nodes: []runtime.Object{newNode("testnode1", newResourceList(32, 64), newResourceList(16, 32))},
|
||||
validateActions: func(t *testing.T, actions []clienttesting.Action) {
|
||||
if len(actions) != 2 {
|
||||
t.Errorf("expected 2 call but got: %#v", actions)
|
||||
}
|
||||
assertAction(t, actions[1], "update")
|
||||
assertActions(t, actions, "get", "update")
|
||||
actual := actions[1].(clienttesting.UpdateActionImpl).Object
|
||||
assertCondition(t, actual, clusterv1.SpokeClusterConditionJoined, metav1.ConditionTrue)
|
||||
assertStatusVersion(t, actual, kubeversion.Get())
|
||||
@@ -73,10 +69,7 @@ func TestSyncSpokeCluster(t *testing.T) {
|
||||
startingObjects: []runtime.Object{newJoinedSpokeCluster(newResourceList(32, 64), newResourceList(16, 32))},
|
||||
nodes: []runtime.Object{newNode("testnode1", newResourceList(32, 64), newResourceList(16, 32))},
|
||||
validateActions: func(t *testing.T, actions []clienttesting.Action) {
|
||||
if len(actions) != 1 {
|
||||
t.Errorf("expected 1 call but got: %#v", actions)
|
||||
}
|
||||
assertAction(t, actions[0], "get")
|
||||
assertActions(t, actions, "get")
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -87,10 +80,7 @@ func TestSyncSpokeCluster(t *testing.T) {
|
||||
newNode("testnode2", newResourceList(32, 64), newResourceList(16, 32)),
|
||||
},
|
||||
validateActions: func(t *testing.T, actions []clienttesting.Action) {
|
||||
if len(actions) != 2 {
|
||||
t.Errorf("expected 1 call but got: %#v", actions)
|
||||
}
|
||||
assertAction(t, actions[1], "update")
|
||||
assertActions(t, actions, "get", "update")
|
||||
actual := actions[1].(clienttesting.UpdateActionImpl).Object
|
||||
assertCondition(t, actual, clusterv1.SpokeClusterConditionJoined, metav1.ConditionTrue)
|
||||
assertStatusVersion(t, actual, kubeversion.Get())
|
||||
@@ -117,8 +107,6 @@ func TestSyncSpokeCluster(t *testing.T) {
|
||||
|
||||
ctrl := spokeClusterController{
|
||||
clusterName: testSpokeClusterName,
|
||||
spokeServerURL: "https://127.0.0.1:32768",
|
||||
spokeCABundle: []byte("testspokeclusterca"),
|
||||
hubClusterClient: clusterClient,
|
||||
hubClusterLister: clusterInformerFactory.Cluster().V1().SpokeClusters().Lister(),
|
||||
spokeDiscoveryClient: kubeClient.Discovery(),
|
||||
@@ -134,7 +122,7 @@ func TestSyncSpokeCluster(t *testing.T) {
|
||||
t.Errorf("expected %q error, got %q", c.expectedErr, syncErr.Error())
|
||||
return
|
||||
}
|
||||
if syncErr != nil {
|
||||
if len(c.expectedErr) == 0 && syncErr != nil {
|
||||
t.Errorf("unexpected err: %v", syncErr)
|
||||
}
|
||||
|
||||
@@ -143,9 +131,14 @@ func TestSyncSpokeCluster(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func assertAction(t *testing.T, actual clienttesting.Action, expected string) {
|
||||
if actual.GetVerb() != expected {
|
||||
t.Errorf("expected %s action but got: %#v", expected, actual)
|
||||
func assertActions(t *testing.T, actualActions []clienttesting.Action, expectedActions ...string) {
|
||||
if len(actualActions) != len(expectedActions) {
|
||||
t.Errorf("expected %d call but got: %#v", len(expectedActions), actualActions)
|
||||
}
|
||||
for i, expected := range expectedActions {
|
||||
if actualActions[i].GetVerb() != expected {
|
||||
t.Errorf("expected %s action but got: %#v", expected, actualActions[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
77
pkg/spoke/spokecluster/creating_controller.go
Normal file
77
pkg/spoke/spokecluster/creating_controller.go
Normal file
@@ -0,0 +1,77 @@
|
||||
package spokecluster
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
clientset "github.com/open-cluster-management/api/client/cluster/clientset/versioned"
|
||||
clusterv1 "github.com/open-cluster-management/api/cluster/v1"
|
||||
"github.com/openshift/library-go/pkg/controller/factory"
|
||||
"github.com/openshift/library-go/pkg/operator/events"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
// spokeClusterCreatingController creates a spoke cluster on hub cluster during the spoke agent bootstrap phase
|
||||
type spokeClusterCreatingController struct {
|
||||
isSpokeClusterExists bool
|
||||
clusterName string
|
||||
spokeExternalServerUrl string
|
||||
spokeCABundle []byte
|
||||
hasSpokeClusterFunc func(bool)
|
||||
hubClusterClient clientset.Interface
|
||||
}
|
||||
|
||||
// NewSpokeClusterCreatingController creates a new spoke cluster creating controller on the spoke cluster.
|
||||
func NewSpokeClusterCreatingController(
|
||||
clusterName, spokeExternalServerUrl string,
|
||||
spokeCABundle []byte,
|
||||
hubClusterClient clientset.Interface,
|
||||
hasSpokeClusterFunc func(bool),
|
||||
recorder events.Recorder) factory.Controller {
|
||||
c := &spokeClusterCreatingController{
|
||||
clusterName: clusterName,
|
||||
spokeExternalServerUrl: spokeExternalServerUrl,
|
||||
spokeCABundle: spokeCABundle,
|
||||
hasSpokeClusterFunc: hasSpokeClusterFunc,
|
||||
hubClusterClient: hubClusterClient,
|
||||
}
|
||||
return factory.New().
|
||||
WithSync(c.sync).
|
||||
ResyncEvery(5*time.Minute).
|
||||
ToController("SpokeClusterCreatingController", recorder)
|
||||
}
|
||||
|
||||
func (c *spokeClusterCreatingController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
|
||||
if c.isSpokeClusterExists {
|
||||
return nil
|
||||
}
|
||||
|
||||
spokeCluster := &clusterv1.SpokeCluster{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: c.clusterName,
|
||||
},
|
||||
Spec: clusterv1.SpokeClusterSpec{
|
||||
SpokeClientConfig: clusterv1.ClientConfig{
|
||||
URL: c.spokeExternalServerUrl,
|
||||
CABundle: c.spokeCABundle,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
_, err := c.hubClusterClient.ClusterV1().SpokeClusters().Create(ctx, spokeCluster, metav1.CreateOptions{})
|
||||
if errors.IsAlreadyExists(err) {
|
||||
c.isSpokeClusterExists = true
|
||||
c.hasSpokeClusterFunc(c.isSpokeClusterExists)
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to create spoke cluster with name %q on hub: %w", c.clusterName, err)
|
||||
}
|
||||
|
||||
c.isSpokeClusterExists = true
|
||||
c.hasSpokeClusterFunc(c.isSpokeClusterExists)
|
||||
syncCtx.Recorder().Eventf("SpokeClusterCreated", "Spoke cluster %q created on hub", c.clusterName)
|
||||
return nil
|
||||
}
|
||||
92
pkg/spoke/spokecluster/creating_controller_test.go
Normal file
92
pkg/spoke/spokecluster/creating_controller_test.go
Normal file
@@ -0,0 +1,92 @@
|
||||
package spokecluster
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
clusterfake "github.com/open-cluster-management/api/client/cluster/clientset/versioned/fake"
|
||||
clusterv1 "github.com/open-cluster-management/api/cluster/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
clienttesting "k8s.io/client-go/testing"
|
||||
)
|
||||
|
||||
const testSpokeExternalServerUrl = "https://192.168.3.77:32769"
|
||||
|
||||
func TestCreateSpokeCluster(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
startingObjects []runtime.Object
|
||||
validateActions func(t *testing.T, actions []clienttesting.Action)
|
||||
hasSpokeCluster bool
|
||||
expectedErr string
|
||||
}{
|
||||
{
|
||||
name: "create a new cluster",
|
||||
startingObjects: []runtime.Object{},
|
||||
hasSpokeCluster: true,
|
||||
validateActions: func(t *testing.T, actions []clienttesting.Action) {
|
||||
assertActions(t, actions, "create")
|
||||
actual := actions[0].(clienttesting.CreateActionImpl).Object
|
||||
assertSpokeExternalServerUrl(t, actual, testSpokeExternalServerUrl)
|
||||
assertSpokeCABundle(t, actual, []byte("testcabundle"))
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "create an existed cluster",
|
||||
startingObjects: []runtime.Object{newSpokeCluster([]clusterv1.StatusCondition{})},
|
||||
hasSpokeCluster: true,
|
||||
validateActions: func(t *testing.T, actions []clienttesting.Action) {
|
||||
assertActions(t, actions, "create")
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
hasSpokeCluster := false
|
||||
clusterClient := clusterfake.NewSimpleClientset(c.startingObjects...)
|
||||
ctrl := spokeClusterCreatingController{
|
||||
clusterName: testSpokeClusterName,
|
||||
spokeExternalServerUrl: testSpokeExternalServerUrl,
|
||||
spokeCABundle: []byte("testcabundle"),
|
||||
hasSpokeClusterFunc: func(isCreated bool) { hasSpokeCluster = isCreated },
|
||||
hubClusterClient: clusterClient,
|
||||
}
|
||||
|
||||
syncErr := ctrl.sync(context.TODO(), newFakeSyncContext(t))
|
||||
if len(c.expectedErr) > 0 && syncErr == nil {
|
||||
t.Errorf("expected %q error", c.expectedErr)
|
||||
return
|
||||
}
|
||||
if len(c.expectedErr) > 0 && syncErr != nil && syncErr.Error() != c.expectedErr {
|
||||
t.Errorf("expected %q error, got %q", c.expectedErr, syncErr.Error())
|
||||
return
|
||||
}
|
||||
if len(c.expectedErr) == 0 && syncErr != nil {
|
||||
t.Errorf("unexpected err: %v", syncErr)
|
||||
}
|
||||
|
||||
if hasSpokeCluster != c.hasSpokeCluster {
|
||||
t.Errorf("expected %t error, but failed", c.hasSpokeCluster)
|
||||
return
|
||||
}
|
||||
|
||||
c.validateActions(t, clusterClient.Actions())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func assertSpokeExternalServerUrl(t *testing.T, actual runtime.Object, expected string) {
|
||||
spokeCluster := actual.(*clusterv1.SpokeCluster)
|
||||
if spokeCluster.Spec.SpokeClientConfig.URL != expected {
|
||||
t.Errorf("expected %q error, but got %q", expected, spokeCluster.Spec.SpokeClientConfig.URL)
|
||||
}
|
||||
}
|
||||
|
||||
func assertSpokeCABundle(t *testing.T, actual runtime.Object, expected []byte) {
|
||||
spokeCluster := actual.(*clusterv1.SpokeCluster)
|
||||
if !bytes.Equal(spokeCluster.Spec.SpokeClientConfig.CABundle, expected) {
|
||||
t.Errorf("expected %q error, but got %q", expected, spokeCluster.Spec.SpokeClientConfig.CABundle)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user