Files
open-cluster-management/pkg/hub/manager.go
2020-11-17 15:38:00 +08:00

106 lines
3.7 KiB
Go

package hub
import (
"context"
"time"
"github.com/openshift/library-go/pkg/controller/controllercmd"
"k8s.io/client-go/kubernetes"
clusterv1client "github.com/open-cluster-management/api/client/cluster/clientset/versioned"
clusterv1informers "github.com/open-cluster-management/api/client/cluster/informers/externalversions"
workv1client "github.com/open-cluster-management/api/client/work/clientset/versioned"
workv1informers "github.com/open-cluster-management/api/client/work/informers/externalversions"
"github.com/open-cluster-management/registration/pkg/hub/csr"
"github.com/open-cluster-management/registration/pkg/hub/lease"
"github.com/open-cluster-management/registration/pkg/hub/managedcluster"
"github.com/open-cluster-management/registration/pkg/hub/managedclusterset"
"github.com/open-cluster-management/registration/pkg/hub/rbacfinalizerdeletion"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/rest"
)
// RunControllerManager starts the controllers on hub to manage spoke cluster registration.
func RunControllerManager(ctx context.Context, controllerContext *controllercmd.ControllerContext) error {
// If qps in kubconfig is not set, increase the qps and burst to enhance the ability of kube client to handle
// requests in concurrent
// TODO: Use ClientConnectionOverrides flags to change qps/burst when library-go exposes them in the future
kubeConfig := rest.CopyConfig(controllerContext.KubeConfig)
if kubeConfig.QPS == 0.0 {
kubeConfig.QPS = 100.0
kubeConfig.Burst = 200
}
kubeClient, err := kubernetes.NewForConfig(kubeConfig)
if err != nil {
return err
}
clusterClient, err := clusterv1client.NewForConfig(kubeConfig)
if err != nil {
return err
}
workClient, err := workv1client.NewForConfig(kubeConfig)
if err != nil {
return err
}
clusterInformers := clusterv1informers.NewSharedInformerFactory(clusterClient, 10*time.Minute)
workInformers := workv1informers.NewSharedInformerFactory(workClient, 10*time.Minute)
kubeInfomers := kubeinformers.NewSharedInformerFactory(kubeClient, 10*time.Minute)
managedClusterController := managedcluster.NewManagedClusterController(
kubeClient,
clusterClient,
clusterInformers.Cluster().V1().ManagedClusters().Informer(),
controllerContext.EventRecorder,
)
csrController := csr.NewCSRApprovingController(
kubeClient,
kubeInfomers.Certificates().V1beta1().CertificateSigningRequests().Informer(),
controllerContext.EventRecorder,
)
leaseController := lease.NewClusterLeaseController(
kubeClient,
clusterClient,
clusterInformers.Cluster().V1().ManagedClusters(),
kubeInfomers.Coordination().V1().Leases(),
5*time.Minute, //TODO: this interval time should be allowed to change from outside
controllerContext.EventRecorder,
)
rbacFinalizerController := rbacfinalizerdeletion.NewFinalizeController(
kubeInfomers.Rbac().V1().Roles(),
kubeInfomers.Rbac().V1().RoleBindings(),
kubeInfomers.Core().V1().Namespaces().Lister(),
clusterInformers.Cluster().V1().ManagedClusters().Lister(),
workInformers.Work().V1().ManifestWorks().Lister(),
kubeClient.RbacV1(),
controllerContext.EventRecorder,
)
managedClusterSetController := managedclusterset.NewManagedClusterSetController(
clusterClient,
clusterInformers.Cluster().V1().ManagedClusters(),
clusterInformers.Cluster().V1alpha1().ManagedClusterSets(),
controllerContext.EventRecorder,
)
go clusterInformers.Start(ctx.Done())
go workInformers.Start(ctx.Done())
go kubeInfomers.Start(ctx.Done())
go managedClusterController.Run(ctx, 1)
go csrController.Run(ctx, 1)
go leaseController.Run(ctx, 1)
go rbacFinalizerController.Run(ctx, 1)
go managedClusterSetController.Run(ctx, 1)
<-ctx.Done()
return nil
}