upgrade sdk-go (#498)

Signed-off-by: Wei Liu <liuweixa@redhat.com>
This commit is contained in:
Wei Liu
2024-06-06 15:36:33 +08:00
committed by GitHub
parent 84ec2b2159
commit 61a74bb348
26 changed files with 1155 additions and 514 deletions

View File

@@ -80,10 +80,16 @@ func NewManifestWorkReplicaSetController(
manifestWorkReplicaSetInformer workinformerv1alpha1.ManifestWorkReplicaSetInformer,
manifestWorkInformer workinformerv1.ManifestWorkInformer,
placementInformer clusterinformerv1beta1.PlacementInformer,
placeDecisionInformer clusterinformerv1beta1.PlacementDecisionInformer) factory.Controller {
placeDecisionInformer clusterinformerv1beta1.PlacementDecisionInformer,
) factory.Controller {
controller := newController(
workClient, workApplier, manifestWorkReplicaSetInformer, manifestWorkInformer, placementInformer, placeDecisionInformer)
workClient,
workApplier,
manifestWorkReplicaSetInformer,
manifestWorkInformer,
placementInformer,
placeDecisionInformer,
)
err := manifestWorkReplicaSetInformer.Informer().AddIndexers(
cache.Indexers{
@@ -114,12 +120,14 @@ func NewManifestWorkReplicaSetController(
WithSync(controller.sync).ToController("ManifestWorkReplicaSetController", recorder)
}
func newController(workClient workclientset.Interface,
func newController(
workClient workclientset.Interface,
workApplier *workapplier.WorkApplier,
manifestWorkReplicaSetInformer workinformerv1alpha1.ManifestWorkReplicaSetInformer,
manifestWorkInformer workinformerv1.ManifestWorkInformer,
placementInformer clusterinformerv1beta1.PlacementInformer,
placeDecisionInformer clusterinformerv1beta1.PlacementDecisionInformer) *ManifestWorkReplicaSetController {
placeDecisionInformer clusterinformerv1beta1.PlacementDecisionInformer,
) *ManifestWorkReplicaSetController {
return &ManifestWorkReplicaSetController{
workClient: workClient,
manifestWorkReplicaSetLister: manifestWorkReplicaSetInformer.Lister(),

View File

@@ -16,8 +16,6 @@ import (
workapiv1alpha1 "open-cluster-management.io/api/work/v1alpha1"
clustersdkv1alpha1 "open-cluster-management.io/sdk-go/pkg/apis/cluster/v1alpha1"
workapplier "open-cluster-management.io/sdk-go/pkg/apis/work/v1/applier"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/common"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload"
"open-cluster-management.io/ocm/pkg/common/helpers"
"open-cluster-management.io/ocm/pkg/work/helper"
@@ -264,11 +262,17 @@ func getCondition(conditionType string, reason string, message string, status me
}
}
func CreateManifestWork(mwrSet *workapiv1alpha1.ManifestWorkReplicaSet, clusterNS string, placementRefName string) (*workv1.ManifestWork, error) {
func CreateManifestWork(
mwrSet *workapiv1alpha1.ManifestWorkReplicaSet,
clusterNS string,
placementRefName string,
) (*workv1.ManifestWork, error) {
if clusterNS == "" {
return nil, fmt.Errorf("invalid cluster namespace")
}
// TODO consider how to trace the manifestworks spec changes for cloudevents work client
return &workv1.ManifestWork{
ObjectMeta: metav1.ObjectMeta{
Name: mwrSet.Name,
@@ -277,12 +281,9 @@ func CreateManifestWork(mwrSet *workapiv1alpha1.ManifestWorkReplicaSet, clusterN
ManifestWorkReplicaSetControllerNameLabelKey: manifestWorkReplicaSetKey(mwrSet),
ManifestWorkReplicaSetPlacementNameLabelKey: placementRefName,
},
Annotations: map[string]string{
common.CloudEventsDataTypeAnnotationKey: payload.ManifestBundleEventDataType.String(),
common.CloudEventsGenerationAnnotationKey: fmt.Sprintf("%d", mwrSet.Generation),
},
},
Spec: mwrSet.Spec.ManifestWorkTemplate}, nil
Spec: mwrSet.Spec.ManifestWorkTemplate,
}, nil
}
func getAvailableDecisionGroupProgressMessage(groupNum int, existingClsCount int, totalCls int32) string {

View File

@@ -6,15 +6,18 @@ import (
"github.com/openshift/library-go/pkg/controller/controllercmd"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/clientcmd"
clusterclientset "open-cluster-management.io/api/client/cluster/clientset/versioned"
clusterinformers "open-cluster-management.io/api/client/cluster/informers/externalversions"
workclientset "open-cluster-management.io/api/client/work/clientset/versioned"
workinformers "open-cluster-management.io/api/client/work/informers/externalversions"
workv1informer "open-cluster-management.io/api/client/work/informers/externalversions/work/v1"
workapplier "open-cluster-management.io/sdk-go/pkg/apis/work/v1/applier"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
cloudeventswork "open-cluster-management.io/sdk-go/pkg/cloudevents/work"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/store"
"open-cluster-management.io/ocm/pkg/work/hub/controllers/manifestworkreplicasetcontroller"
)
@@ -48,17 +51,6 @@ func (c *WorkHubManagerConfig) RunWorkHubManager(ctx context.Context, controller
return err
}
// To support sending ManifestWorks to different drivers (like the Kubernetes apiserver or MQTT broker), we build
// ManifestWork client that implements the ManifestWorkInterface and ManifestWork informer based on different
// driver configuration.
// Refer to Event Based Manifestwork proposal in enhancements repo to get more details.
_, config, err := generic.NewConfigLoader(c.workOptions.WorkDriver, c.workOptions.WorkDriverConfig).
WithKubeConfig(controllerContext.KubeConfig).
LoadConfig()
if err != nil {
return err
}
// we need a separated filtered manifestwork informers so we only watch the manifestworks that manifestworkreplicaset cares.
// This could reduce a lot of memory consumptions
workInformOption := workinformers.WithTweakListOptions(
@@ -75,35 +67,82 @@ func (c *WorkHubManagerConfig) RunWorkHubManager(ctx context.Context, controller
},
)
clientHolder, err := cloudeventswork.NewClientHolderBuilder(config).
WithClientID(c.workOptions.CloudEventsClientID).
WithSourceID(sourceID).
WithInformerConfig(30*time.Minute, workInformOption).
WithCodecs(codec.NewManifestBundleCodec()).
NewSourceClientHolder(ctx)
if err != nil {
return err
var workClient workclientset.Interface
var watcherStore *store.InformerWatcherStore
if c.workOptions.WorkDriver == "kube" {
config := controllerContext.KubeConfig
if c.workOptions.WorkDriverConfig != "" {
config, err = clientcmd.BuildConfigFromFlags("", c.workOptions.WorkDriverConfig)
if err != nil {
return err
}
}
workClient, err = workclientset.NewForConfig(config)
if err != nil {
return err
}
} else {
// For cloudevents drivers, we build ManifestWork client that implements the
// ManifestWorkInterface and ManifestWork informer based on different driver configuration.
// Refer to Event Based Manifestwork proposal in enhancements repo to get more details.
watcherStore = store.NewInformerWatcherStore(ctx)
_, config, err := generic.NewConfigLoader(c.workOptions.WorkDriver, c.workOptions.WorkDriverConfig).
LoadConfig()
if err != nil {
return err
}
clientHolder, err := work.NewClientHolderBuilder(config).
WithClientID(c.workOptions.CloudEventsClientID).
WithSourceID(sourceID).
WithCodecs(codec.NewManifestBundleCodec()).
WithWorkClientWatcherStore(watcherStore).
NewSourceClientHolder(ctx)
if err != nil {
return err
}
workClient = clientHolder.WorkInterface()
}
return RunControllerManagerWithInformers(ctx, controllerContext, replicaSetsClient, clientHolder, clusterInformerFactory)
factory := workinformers.NewSharedInformerFactoryWithOptions(workClient, 30*time.Minute, workInformOption)
informer := factory.Work().V1().ManifestWorks()
// For cloudevents work client, we use the informer store as the client store
if watcherStore != nil {
watcherStore.SetStore(informer.Informer().GetStore())
}
return RunControllerManagerWithInformers(
ctx,
controllerContext,
replicaSetsClient,
workClient,
informer,
clusterInformerFactory,
)
}
func RunControllerManagerWithInformers(
ctx context.Context,
controllerContext *controllercmd.ControllerContext,
replicaSetClient workclientset.Interface,
hubWorkClientHolder *cloudeventswork.ClientHolder,
workClient workclientset.Interface,
workInformer workv1informer.ManifestWorkInformer,
clusterInformers clusterinformers.SharedInformerFactory,
) error {
replicaSetInformerFactory := workinformers.NewSharedInformerFactory(replicaSetClient, 30*time.Minute)
hubWorkInformer := hubWorkClientHolder.ManifestWorkInformer()
manifestWorkReplicaSetController := manifestworkreplicasetcontroller.NewManifestWorkReplicaSetController(
controllerContext.EventRecorder,
replicaSetClient,
workapplier.NewWorkApplierWithTypedClient(hubWorkClientHolder.WorkInterface(), hubWorkInformer.Lister()),
workapplier.NewWorkApplierWithTypedClient(workClient, workInformer.Lister()),
replicaSetInformerFactory.Work().V1alpha1().ManifestWorkReplicaSets(),
hubWorkInformer,
workInformer,
clusterInformers.Cluster().V1beta1().Placements(),
clusterInformers.Cluster().V1beta1().PlacementDecisions(),
)
@@ -112,7 +151,7 @@ func RunControllerManagerWithInformers(
go replicaSetInformerFactory.Start(ctx.Done())
go manifestWorkReplicaSetController.Run(ctx, 5)
go hubWorkInformer.Informer().Run(ctx.Done())
go workInformer.Informer().Run(ctx.Done())
<-ctx.Done()
return nil

View File

@@ -4,8 +4,6 @@ import (
"time"
"github.com/spf13/pflag"
cloudeventsconstants "open-cluster-management.io/sdk-go/pkg/cloudevents/constants"
)
const (
@@ -30,7 +28,7 @@ func NewWorkloadAgentOptions() *WorkloadAgentOptions {
MaxJSONRawLength: 1024,
StatusSyncInterval: 10 * time.Second,
AppliedManifestWorkEvictionGracePeriod: 60 * time.Minute,
WorkloadSourceDriver: cloudeventsconstants.ConfigTypeKube,
WorkloadSourceDriver: "kube",
WorkloadSourceConfig: "/spoke/hub-kubeconfig/kubeconfig",
}
}

View File

@@ -10,10 +10,13 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
workclientset "open-cluster-management.io/api/client/work/clientset/versioned"
workv1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1"
workinformers "open-cluster-management.io/api/client/work/informers/externalversions"
workv1informers "open-cluster-management.io/api/client/work/informers/externalversions/work/v1"
ocmfeature "open-cluster-management.io/api/feature"
workv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
@@ -91,22 +94,7 @@ func (o *WorkAgentConfig) RunWorkloadAgent(ctx context.Context, controllerContex
return err
}
// To support consuming ManifestWorks from different drivers (like the Kubernetes apiserver or MQTT broker), we build
// ManifestWork client that implements the ManifestWorkInterface and ManifestWork informer based on different
// driver configuration.
// Refer to Event Based Manifestwork proposal in enhancements repo to get more details.
hubHost, config, err := generic.NewConfigLoader(o.workOptions.WorkloadSourceDriver, o.workOptions.WorkloadSourceConfig).
LoadConfig()
if err != nil {
return err
}
clientHolder, err := cloudeventswork.NewClientHolderBuilder(config).
WithClientID(o.workOptions.CloudEventsClientID).
WithInformerConfig(5*time.Minute, workinformers.WithNamespace(o.agentOptions.SpokeClusterName)).
WithClusterName(o.agentOptions.SpokeClusterName).
WithCodecs(buildCodecs(o.workOptions.CloudEventsClientCodecs, restMapper)...).
NewAgentClientHolder(ctx)
hubHost, hubWorkClient, hubWorkInformer, err := o.newHubWorkClientAndInformer(ctx, restMapper)
if err != nil {
return err
}
@@ -117,9 +105,6 @@ func (o *WorkAgentConfig) RunWorkloadAgent(ctx context.Context, controllerContex
agentID = hubHash
}
hubWorkClient := clientHolder.ManifestWorks(o.agentOptions.SpokeClusterName)
hubWorkInformer := clientHolder.ManifestWorkInformer()
// create controllers
validator := auth.NewFactory(
spokeRestConfig,
@@ -223,3 +208,50 @@ func buildCodecs(codecNames []string, restMapper meta.RESTMapper) []generic.Code
}
return codecs
}
func (o *WorkAgentConfig) newHubWorkClientAndInformer(
ctx context.Context,
restMapper meta.RESTMapper,
) (string, workv1client.ManifestWorkInterface, workv1informers.ManifestWorkInformer, error) {
if o.workOptions.WorkloadSourceDriver == "kube" {
config, err := clientcmd.BuildConfigFromFlags("", o.workOptions.WorkloadSourceConfig)
if err != nil {
return "", nil, nil, err
}
kubeWorkClientSet, err := workclientset.NewForConfig(config)
if err != nil {
return "", nil, nil, err
}
factory := workinformers.NewSharedInformerFactoryWithOptions(
kubeWorkClientSet,
5*time.Minute,
workinformers.WithNamespace(o.agentOptions.SpokeClusterName),
)
informer := factory.Work().V1().ManifestWorks()
return config.Host, kubeWorkClientSet.WorkV1().ManifestWorks(o.agentOptions.SpokeClusterName), informer, nil
}
// For cloudevents drivers, we build ManifestWork client that implements the
// ManifestWorkInterface and ManifestWork informer based on different driver configuration.
// Refer to Event Based Manifestwork proposal in enhancements repo to get more details.
hubHost, config, err := generic.NewConfigLoader(o.workOptions.WorkloadSourceDriver, o.workOptions.WorkloadSourceConfig).
LoadConfig()
if err != nil {
return "", nil, nil, err
}
clientHolder, informer, err := cloudeventswork.NewClientHolderBuilder(config).
WithClientID(o.workOptions.CloudEventsClientID).
WithInformerConfig(5*time.Minute, workinformers.WithNamespace(o.agentOptions.SpokeClusterName)).
WithClusterName(o.agentOptions.SpokeClusterName).
WithCodecs(buildCodecs(o.workOptions.CloudEventsClientCodecs, restMapper)...).
NewAgentClientHolderWithInformer(ctx)
if err != nil {
return "", nil, nil, err
}
return hubHost, clientHolder.ManifestWorks(o.agentOptions.SpokeClusterName), informer, err
}