Files
open-cluster-management/pkg/work/spoke/spokeagent.go
Jian Qiu f7cd1402e9 run work and registration as a single binary (#201)
* run registratin/work together

Signed-off-by: Jian Qiu <jqiu@redhat.com>

* Fix integration test and lint issue

Signed-off-by: Jian Qiu <jqiu@redhat.com>

* Update operator to deploy singleton mode

Signed-off-by: Jian Qiu <jqiu@redhat.com>

* Update deps

Signed-off-by: Jian Qiu <jqiu@redhat.com>

---------

Signed-off-by: Jian Qiu <jqiu@redhat.com>
2023-07-14 04:56:48 +02:00

194 lines
7.6 KiB
Go

package spoke
import (
"context"
"time"
"github.com/openshift/library-go/pkg/controller/controllercmd"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
workclientset "open-cluster-management.io/api/client/work/clientset/versioned"
workinformers "open-cluster-management.io/api/client/work/informers/externalversions"
ocmfeature "open-cluster-management.io/api/feature"
commonoptions "open-cluster-management.io/ocm/pkg/common/options"
"open-cluster-management.io/ocm/pkg/features"
"open-cluster-management.io/ocm/pkg/work/helper"
"open-cluster-management.io/ocm/pkg/work/spoke/auth"
"open-cluster-management.io/ocm/pkg/work/spoke/controllers/appliedmanifestcontroller"
"open-cluster-management.io/ocm/pkg/work/spoke/controllers/finalizercontroller"
"open-cluster-management.io/ocm/pkg/work/spoke/controllers/manifestcontroller"
"open-cluster-management.io/ocm/pkg/work/spoke/controllers/statuscontroller"
)
const (
// If a controller queue size is too large (>500), the processing speed of the controller will drop significantly
// with one worker, increasing the work numbers can improve the processing speed.
// We compared the two situations where the worker is set to 1 and 10, when the worker is 10, the resource
// utilization of the kubeapi-server and work agent do not increase significantly.
//
// TODO expose a flag to set the worker for each controller
appliedManifestWorkFinalizeControllerWorkers = 10
manifestWorkFinalizeControllerWorkers = 10
availableStatusControllerWorkers = 10
)
type WorkAgentConfig struct {
agentOptions *commonoptions.AgentOptions
workOptions *WorkloadAgentOptions
}
// NewWorkAgentConfig returns a WorkAgentConfig
func NewWorkAgentConfig(commonOpts *commonoptions.AgentOptions, opts *WorkloadAgentOptions) *WorkAgentConfig {
return &WorkAgentConfig{
agentOptions: commonOpts,
workOptions: opts,
}
}
// RunWorkloadAgent starts the controllers on agent to process work from hub.
func (o *WorkAgentConfig) RunWorkloadAgent(ctx context.Context, controllerContext *controllercmd.ControllerContext) error {
// build hub client and informer
hubRestConfig, err := clientcmd.BuildConfigFromFlags("" /* leave masterurl as empty */, o.agentOptions.HubKubeconfigFile)
if err != nil {
return err
}
hubhash := helper.HubHash(hubRestConfig.Host)
agentID := o.agentOptions.AgentID
if len(agentID) == 0 {
agentID = hubhash
}
hubWorkClient, err := workclientset.NewForConfig(hubRestConfig)
if err != nil {
return err
}
// Only watch the cluster namespace on hub
workInformerFactory := workinformers.NewSharedInformerFactoryWithOptions(hubWorkClient, 5*time.Minute,
workinformers.WithNamespace(o.agentOptions.SpokeClusterName))
// load spoke client config and create spoke clients,
// the work agent may not running in the spoke/managed cluster.
spokeRestConfig, err := o.agentOptions.SpokeKubeConfig(controllerContext.KubeConfig)
if err != nil {
return err
}
spokeDynamicClient, err := dynamic.NewForConfig(spokeRestConfig)
if err != nil {
return err
}
spokeKubeClient, err := kubernetes.NewForConfig(spokeRestConfig)
if err != nil {
return err
}
spokeAPIExtensionClient, err := apiextensionsclient.NewForConfig(spokeRestConfig)
if err != nil {
return err
}
spokeWorkClient, err := workclientset.NewForConfig(spokeRestConfig)
if err != nil {
return err
}
spokeWorkInformerFactory := workinformers.NewSharedInformerFactory(spokeWorkClient, 5*time.Minute)
httpClient, err := rest.HTTPClientFor(spokeRestConfig)
if err != nil {
return err
}
restMapper, err := apiutil.NewDynamicRESTMapper(spokeRestConfig, httpClient)
if err != nil {
return err
}
validator := auth.NewFactory(
spokeRestConfig,
spokeKubeClient,
workInformerFactory.Work().V1().ManifestWorks(),
o.agentOptions.SpokeClusterName,
controllerContext.EventRecorder,
restMapper,
).NewExecutorValidator(ctx, features.SpokeMutableFeatureGate.Enabled(ocmfeature.ExecutorValidatingCaches))
manifestWorkController := manifestcontroller.NewManifestWorkController(
controllerContext.EventRecorder,
spokeDynamicClient,
spokeKubeClient,
spokeAPIExtensionClient,
hubWorkClient.WorkV1().ManifestWorks(o.agentOptions.SpokeClusterName),
workInformerFactory.Work().V1().ManifestWorks(),
workInformerFactory.Work().V1().ManifestWorks().Lister().ManifestWorks(o.agentOptions.SpokeClusterName),
spokeWorkClient.WorkV1().AppliedManifestWorks(),
spokeWorkInformerFactory.Work().V1().AppliedManifestWorks(),
hubhash, agentID,
restMapper,
validator,
)
addFinalizerController := finalizercontroller.NewAddFinalizerController(
controllerContext.EventRecorder,
hubWorkClient.WorkV1().ManifestWorks(o.agentOptions.SpokeClusterName),
workInformerFactory.Work().V1().ManifestWorks(),
workInformerFactory.Work().V1().ManifestWorks().Lister().ManifestWorks(o.agentOptions.SpokeClusterName),
)
appliedManifestWorkFinalizeController := finalizercontroller.NewAppliedManifestWorkFinalizeController(
controllerContext.EventRecorder,
spokeDynamicClient,
spokeWorkClient.WorkV1().AppliedManifestWorks(),
spokeWorkInformerFactory.Work().V1().AppliedManifestWorks(),
agentID,
)
manifestWorkFinalizeController := finalizercontroller.NewManifestWorkFinalizeController(
controllerContext.EventRecorder,
hubWorkClient.WorkV1().ManifestWorks(o.agentOptions.SpokeClusterName),
workInformerFactory.Work().V1().ManifestWorks(),
workInformerFactory.Work().V1().ManifestWorks().Lister().ManifestWorks(o.agentOptions.SpokeClusterName),
spokeWorkClient.WorkV1().AppliedManifestWorks(),
spokeWorkInformerFactory.Work().V1().AppliedManifestWorks(),
hubhash,
)
unmanagedAppliedManifestWorkController := finalizercontroller.NewUnManagedAppliedWorkController(
controllerContext.EventRecorder,
workInformerFactory.Work().V1().ManifestWorks(),
workInformerFactory.Work().V1().ManifestWorks().Lister().ManifestWorks(o.agentOptions.SpokeClusterName),
spokeWorkClient.WorkV1().AppliedManifestWorks(),
spokeWorkInformerFactory.Work().V1().AppliedManifestWorks(),
o.workOptions.AppliedManifestWorkEvictionGracePeriod,
hubhash, agentID,
)
appliedManifestWorkController := appliedmanifestcontroller.NewAppliedManifestWorkController(
controllerContext.EventRecorder,
spokeDynamicClient,
workInformerFactory.Work().V1().ManifestWorks(),
workInformerFactory.Work().V1().ManifestWorks().Lister().ManifestWorks(o.agentOptions.SpokeClusterName),
spokeWorkClient.WorkV1().AppliedManifestWorks(),
spokeWorkInformerFactory.Work().V1().AppliedManifestWorks(),
hubhash,
)
availableStatusController := statuscontroller.NewAvailableStatusController(
controllerContext.EventRecorder,
spokeDynamicClient,
hubWorkClient.WorkV1().ManifestWorks(o.agentOptions.SpokeClusterName),
workInformerFactory.Work().V1().ManifestWorks(),
workInformerFactory.Work().V1().ManifestWorks().Lister().ManifestWorks(o.agentOptions.SpokeClusterName),
o.workOptions.StatusSyncInterval,
)
go workInformerFactory.Start(ctx.Done())
go spokeWorkInformerFactory.Start(ctx.Done())
go addFinalizerController.Run(ctx, 1)
go appliedManifestWorkFinalizeController.Run(ctx, appliedManifestWorkFinalizeControllerWorkers)
go unmanagedAppliedManifestWorkController.Run(ctx, 1)
go appliedManifestWorkController.Run(ctx, 1)
go manifestWorkController.Run(ctx, 1)
go manifestWorkFinalizeController.Run(ctx, manifestWorkFinalizeControllerWorkers)
go availableStatusController.Run(ctx, availableStatusControllerWorkers)
<-ctx.Done()
return nil
}