From 8fd640694e9c4760c1f7a4f93e6109415648e97a Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Tue, 27 Jan 2026 18:26:32 +0800 Subject: [PATCH] enable grpc e2e (#1354) Signed-off-by: Wei Liu --- .github/workflows/e2e.yml | 38 ++++ .gitignore | 1 + go.mod | 2 +- go.sum | 4 +- pkg/registration/register/csr/csr.go | 2 +- pkg/server/services/addon/addon.go | 105 +++++++-- pkg/server/services/addon/addon_test.go | 77 ++----- pkg/server/services/cluster/cluster.go | 58 +++-- pkg/server/services/cluster/cluster_test.go | 71 +----- pkg/server/services/csr/csr.go | 96 ++++++-- pkg/server/services/csr/csr_test.go | 83 ++----- pkg/server/services/event/event.go | 6 +- pkg/server/services/event/event_test.go | 9 +- pkg/server/services/lease/lease.go | 97 ++++++-- pkg/server/services/lease/lease_test.go | 92 +++----- .../services/tokenrequest/tokenreqeust.go | 58 ++--- .../tokenrequest/tokenrequest_test.go | 215 ++---------------- pkg/server/services/work/work.go | 140 +++++++----- pkg/server/services/work/work_test.go | 130 +++-------- test/e2e-test.mk | 62 ++++- test/e2e/addon_lease_test.go | 2 +- test/e2e/e2e_suite_test.go | 6 +- test/e2e/klusterlet_hosted_test.go | 2 +- test/e2e/klusterlet_test.go | 16 +- test/framework/klusterlet.go | 17 +- vendor/modules.txt | 2 +- .../sdk-go/pkg/certrotation/signer.go | 11 +- .../sdk-go/pkg/certrotation/target.go | 10 + .../clients/addon/v1alpha1/client.go | 6 + .../clients/addon/v1beta1/client.go | 6 + .../pkg/cloudevents/clients/lease/client.go | 6 + .../cloudevents/generic/clients/baseclient.go | 21 +- .../generic/options/v2/grpc/transport.go | 24 +- .../cloudevents/server/grpc/authz/kube/sar.go | 44 +++- .../pkg/cloudevents/server/grpc/broker.go | 204 +++++++---------- .../pkg/cloudevents/server/interface.go | 11 +- .../sdk-go/pkg/cloudevents/server/store.go | 6 +- .../sdk-go/pkg/helpers/resourceapply.go | 9 +- .../sdk-go/pkg/server/grpc/server.go | 12 +- 39 files changed, 844 insertions(+), 917 deletions(-) diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index b8a30aee8..5c17a741f 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -137,3 +137,41 @@ jobs: IMAGE_TAG=e2e KLUSTERLET_DEPLOY_MODE=Singleton make test-e2e env: KUBECONFIG: /home/runner/.kube/config + e2e-grpc: + runs-on: ubuntu-latest + steps: + - name: add permisson to docker.sock + run: sudo chown runner:docker /var/run/docker.sock + if: ${{ env.ACT }} # this step only runs locally when using the https://github.com/nektos/act to debug the e2e + - name: Checkout + uses: actions/checkout@v6.0.2 + - name: Setup Go + uses: actions/setup-go@v6 + with: + go-version: ${{ env.GO_VERSION }} + - name: Setup kind + uses: engineerd/setup-kind@v0.6.2 + with: + version: v0.22.0 + skipClusterCreation: ${{ env.USE_EXISTING_CLUSTER }} + - name: Set KUBECONFIG + run: | + mkdir -p /home/runner/.kube + kind get kubeconfig > /home/runner/.kube/config + if: ${{ env.USE_EXISTING_CLUSTER }} + - name: install imagebuilder + run: go install github.com/openshift/imagebuilder/cmd/imagebuilder@v1.2.3 + - name: Build images + run: IMAGE_TAG=e2e make images + - name: Load images + run: | + kind load docker-image --name=kind quay.io/open-cluster-management/registration-operator:e2e + kind load docker-image --name=kind quay.io/open-cluster-management/registration:e2e + kind load docker-image --name=kind quay.io/open-cluster-management/work:e2e + kind load docker-image --name=kind quay.io/open-cluster-management/placement:e2e + kind load docker-image --name=kind quay.io/open-cluster-management/addon-manager:e2e + - name: Test E2E + run: | + IMAGE_TAG=e2e REGISTRATION_DRIVER=grpc make test-e2e + env: + KUBECONFIG: /home/runner/.kube/config diff --git a/.gitignore b/.gitignore index c29bb7a83..b3568e8c8 100644 --- a/.gitignore +++ b/.gitignore @@ -30,6 +30,7 @@ _output/ .kubeconfig .hub-kubeconfig +.grpc-config .external-hub-kubeconfig .external-managed-kubeconfig diff --git a/go.mod b/go.mod index bfd5215a7..54fb15f13 100644 --- a/go.mod +++ b/go.mod @@ -41,7 +41,7 @@ require ( k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 open-cluster-management.io/addon-framework v1.1.1-0.20251222073158-b5846d76add9 open-cluster-management.io/api v1.1.1-0.20260126032025-b449b3b4e4b9 - open-cluster-management.io/sdk-go v1.1.1-0.20260120013142-6d087c9a2a3d + open-cluster-management.io/sdk-go v1.1.1-0.20260127092137-c07e0fafa331 sigs.k8s.io/about-api v0.0.0-20250131010323-518069c31c03 sigs.k8s.io/cluster-inventory-api v0.0.0-20251124125836-445319b6307a sigs.k8s.io/controller-runtime v0.22.4 diff --git a/go.sum b/go.sum index 1245d41d8..bcbac797c 100644 --- a/go.sum +++ b/go.sum @@ -587,8 +587,8 @@ open-cluster-management.io/addon-framework v1.1.1-0.20251222073158-b5846d76add9 open-cluster-management.io/addon-framework v1.1.1-0.20251222073158-b5846d76add9/go.mod h1:St9LTEuZ5ADLY9cVXSp+rVE/ZbPJ+hzNQ7/YcsiQVd8= open-cluster-management.io/api v1.1.1-0.20260126032025-b449b3b4e4b9 h1:H20ld7TL3HLhvTMz8DHCZkcQTqnnHuJtWrBICbwsCUw= open-cluster-management.io/api v1.1.1-0.20260126032025-b449b3b4e4b9/go.mod h1:YcmA6SpGEekIMxdoeVIIyOaBhMA6ImWRLXP4g8n8T+4= -open-cluster-management.io/sdk-go v1.1.1-0.20260120013142-6d087c9a2a3d h1:V+2LZY0aPOStdRxnFvW+yL4y5UqC97R9x4lTQdjLVfg= -open-cluster-management.io/sdk-go v1.1.1-0.20260120013142-6d087c9a2a3d/go.mod h1:4haPv/uuKqQ3gxi62/PPknlrUFi132ga0KYLwj5tpx0= +open-cluster-management.io/sdk-go v1.1.1-0.20260127092137-c07e0fafa331 h1:7EcnxNPXQgQxOi2Hv2nYGsTxxFYt3nwGHee5eJlOuz8= +open-cluster-management.io/sdk-go v1.1.1-0.20260127092137-c07e0fafa331/go.mod h1:4haPv/uuKqQ3gxi62/PPknlrUFi132ga0KYLwj5tpx0= sigs.k8s.io/about-api v0.0.0-20250131010323-518069c31c03 h1:1ShFiMjGQOR/8jTBkmZrk1gORxnvMwm1nOy2/DbHg4U= sigs.k8s.io/about-api v0.0.0-20250131010323-518069c31c03/go.mod h1:F1pT4mK53U6F16/zuaPSYpBaR7x5Kjym6aKJJC0/DHU= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.2 h1:jpcvIRr3GLoUoEKRkHKSmGjxb6lWwrBlJsXc+eUYQHM= diff --git a/pkg/registration/register/csr/csr.go b/pkg/registration/register/csr/csr.go index ed041f76d..ee0b5151a 100644 --- a/pkg/registration/register/csr/csr.go +++ b/pkg/registration/register/csr/csr.go @@ -187,7 +187,7 @@ func (c *CSRDriver) Process( shouldHalt := c.haltCSRCreation() if shouldHalt { recorder.Eventf(ctx, "ClientCertificateCreationHalted", - "Stop creating csr since there are too many csr created already on hub", controllerName) + "Stop creating CSR for %s since there are too many CSRs created already on the hub.", controllerName) return nil, &metav1.Condition{ Type: "ClusterCertificateRotated", Status: metav1.ConditionFalse, diff --git a/pkg/server/services/addon/addon.go b/pkg/server/services/addon/addon.go index 8b0910862..0c1b9506c 100644 --- a/pkg/server/services/addon/addon.go +++ b/pkg/server/services/addon/addon.go @@ -3,6 +3,7 @@ package addon import ( "context" "fmt" + "time" cloudevents "github.com/cloudevents/sdk-go/v2" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -11,6 +12,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" + addonv1alpha1 "open-cluster-management.io/api/addon/v1alpha1" addonclientset "open-cluster-management.io/api/client/addon/clientset/versioned" addoninformerv1alpha1 "open-cluster-management.io/api/client/addon/informers/externalversions/addon/v1alpha1" addonlisterv1alpha1 "open-cluster-management.io/api/client/addon/listers/addon/v1alpha1" @@ -37,21 +39,7 @@ func NewAddonService(addonClient addonclientset.Interface, addonInformer addonin } } -func (s *AddonService) Get(_ context.Context, resourceID string) (*cloudevents.Event, error) { - namespace, name, err := cache.SplitMetaNamespaceKey(resourceID) - if err != nil { - return nil, err - } - - addon, err := s.addonLister.ManagedClusterAddOns(namespace).Get(name) - if err != nil { - return nil, err - } - - return s.codec.Encode(services.CloudEventsSourceKube, types.CloudEventsType{CloudEventsDataType: addonce.ManagedClusterAddOnEventDataType}, addon) -} - -func (s *AddonService) List(listOpts types.ListOptions) ([]*cloudevents.Event, error) { +func (s *AddonService) List(ctx context.Context, listOpts types.ListOptions) ([]*cloudevents.Event, error) { addons, err := s.addonLister.ManagedClusterAddOns(listOpts.ClusterName).List(labels.Everything()) if err != nil { return nil, err @@ -98,26 +86,93 @@ func (s *AddonService) RegisterHandler(ctx context.Context, handler server.Event } } +// TODO handle type check error and event handler error func (s *AddonService) EventHandlerFuncs(ctx context.Context, handler server.EventHandler) *cache.ResourceEventHandlerFuncs { return &cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - key, err := cache.MetaNamespaceKeyFunc(obj) - if err != nil { - utilruntime.HandleErrorWithContext(ctx, err, "failed to get key for addon") + addon, ok := obj.(*addonv1alpha1.ManagedClusterAddOn) + if !ok { + utilruntime.HandleErrorWithContext(ctx, fmt.Errorf("unknown type: %T", obj), "addon add") return } - if err := handler.OnCreate(ctx, addonce.ManagedClusterAddOnEventDataType, key); err != nil { - utilruntime.HandleErrorWithContext(ctx, err, "failed to create addon", "key", key) + + createEventTypes := types.CloudEventsType{ + CloudEventsDataType: addonce.ManagedClusterAddOnEventDataType, + SubResource: types.SubResourceSpec, + Action: types.CreateRequestAction, + } + evt, err := s.codec.Encode(services.CloudEventsSourceKube, createEventTypes, addon) + if err != nil { + utilruntime.HandleErrorWithContext(ctx, err, "failed to encode addon", + "namespace", addon.Namespace, "name", addon.Name) + return + } + + if err := handler.HandleEvent(ctx, evt); err != nil { + utilruntime.HandleErrorWithContext(ctx, err, "failed to create addon", + "namespace", addon.Namespace, "name", addon.Name) } }, UpdateFunc: func(oldObj, newObj interface{}) { - key, err := cache.MetaNamespaceKeyFunc(newObj) - if err != nil { - utilruntime.HandleErrorWithContext(ctx, err, "failed to get key for addon") + addon, ok := newObj.(*addonv1alpha1.ManagedClusterAddOn) + if !ok { + utilruntime.HandleErrorWithContext(ctx, fmt.Errorf("unknown type: %T", newObj), "addon add") return } - if err := handler.OnUpdate(ctx, addonce.ManagedClusterAddOnEventDataType, key); err != nil { - utilruntime.HandleErrorWithContext(ctx, err, "failed to update addon", "key", key) + + updateEventTypes := types.CloudEventsType{ + CloudEventsDataType: addonce.ManagedClusterAddOnEventDataType, + SubResource: types.SubResourceSpec, + Action: types.UpdateRequestAction, + } + evt, err := s.codec.Encode(services.CloudEventsSourceKube, updateEventTypes, addon) + if err != nil { + utilruntime.HandleErrorWithContext(ctx, err, "failed to encode addon", + "namespace", addon.Namespace, "name", addon.Name) + return + } + + if err := handler.HandleEvent(ctx, evt); err != nil { + utilruntime.HandleErrorWithContext(ctx, err, "failed to update addon", + "namespace", addon.Namespace, "name", addon.Name) + } + }, + DeleteFunc: func(obj interface{}) { + addon, ok := obj.(*addonv1alpha1.ManagedClusterAddOn) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleErrorWithContext(ctx, fmt.Errorf("unknown type: %T", obj), "addon delete") + return + } + + addon, ok = tombstone.Obj.(*addonv1alpha1.ManagedClusterAddOn) + if !ok { + utilruntime.HandleErrorWithContext(ctx, fmt.Errorf("unknown type: %T", obj), "addon delete") + return + } + } + + addon = addon.DeepCopy() + if addon.DeletionTimestamp.IsZero() { + addon.DeletionTimestamp = &metav1.Time{Time: time.Now()} + } + + deleteEventTypes := types.CloudEventsType{ + CloudEventsDataType: addonce.ManagedClusterAddOnEventDataType, + SubResource: types.SubResourceSpec, + Action: types.DeleteRequestAction, + } + evt, err := s.codec.Encode(services.CloudEventsSourceKube, deleteEventTypes, addon) + if err != nil { + utilruntime.HandleErrorWithContext(ctx, err, "failed to encode addon", + "namespace", addon.Namespace, "name", addon.Name) + return + } + + if err := handler.HandleEvent(ctx, evt); err != nil { + utilruntime.HandleErrorWithContext(ctx, err, "failed to delete addon", + "namespace", addon.Namespace, "name", addon.Name) } }, } diff --git a/pkg/server/services/addon/addon_test.go b/pkg/server/services/addon/addon_test.go index 20e8c7ac3..f012ed4fb 100644 --- a/pkg/server/services/addon/addon_test.go +++ b/pkg/server/services/addon/addon_test.go @@ -20,54 +20,6 @@ import ( testingcommon "open-cluster-management.io/ocm/pkg/common/testing" ) -func TestGet(t *testing.T) { - cases := []struct { - name string - addons []runtime.Object - resourceID string - expectedError bool - }{ - { - name: "addon not found", - addons: []runtime.Object{}, - resourceID: "test-namespace/test-addon", - expectedError: true, - }, - { - name: "get addon", - resourceID: "test-namespace/test-addon", - addons: []runtime.Object{&addonv1alpha1.ManagedClusterAddOn{ - ObjectMeta: metav1.ObjectMeta{Name: "test-addon", Namespace: "test-namespace"}, - }}, - }, - } - - for _, c := range cases { - t.Run(c.name, func(t *testing.T) { - addonClient := addonfake.NewSimpleClientset(c.addons...) - addonInformers := addoninformers.NewSharedInformerFactory(addonClient, 10*time.Minute) - addonInformer := addonInformers.Addon().V1alpha1().ManagedClusterAddOns() - for _, obj := range c.addons { - if err := addonInformer.Informer().GetStore().Add(obj); err != nil { - t.Fatal(err) - } - } - - service := NewAddonService(addonClient, addonInformer) - _, err := service.Get(context.Background(), c.resourceID) - if c.expectedError { - if err == nil { - t.Errorf("expected error, got nil") - } - return - } - if err != nil { - t.Errorf("unexpected error: %v", err) - } - }) - } -} - func TestList(t *testing.T) { cases := []struct { name string @@ -108,7 +60,7 @@ func TestList(t *testing.T) { } service := NewAddonService(addonClient, addonInformer) - evts, err := service.List(types.ListOptions{ClusterName: c.clusterName}) + evts, err := service.List(context.Background(), types.ListOptions{ClusterName: c.clusterName}) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -229,22 +181,23 @@ type addOnHandler struct { onUpdateCalled bool } -func (m *addOnHandler) OnCreate(ctx context.Context, t types.CloudEventsDataType, resourceID string) error { - if t != addonce.ManagedClusterAddOnEventDataType { - return fmt.Errorf("expected %v, got %v", addonce.ManagedClusterAddOnEventDataType, t) +func (m *addOnHandler) HandleEvent(ctx context.Context, evt *cloudevents.Event) error { + eventType, err := types.ParseCloudEventsType(evt.Type()) + if err != nil { + return err } - m.onCreateCalled = true - return nil -} -func (m *addOnHandler) OnUpdate(ctx context.Context, t types.CloudEventsDataType, resourceID string) error { - if t != addonce.ManagedClusterAddOnEventDataType { - return fmt.Errorf("expected %v, got %v", addonce.ManagedClusterAddOnEventDataType, t) + if eventType.CloudEventsDataType != addonce.ManagedClusterAddOnEventDataType { + return fmt.Errorf("expected %v, got %v", addonce.ManagedClusterAddOnEventDataType, eventType.CloudEventsDataType) + } + + // Determine action type + switch eventType.Action { + case types.CreateRequestAction: + m.onCreateCalled = true + case types.UpdateRequestAction: + m.onUpdateCalled = true } - m.onUpdateCalled = true - return nil -} -func (m *addOnHandler) OnDelete(ctx context.Context, t types.CloudEventsDataType, resourceID string) error { return nil } diff --git a/pkg/server/services/cluster/cluster.go b/pkg/server/services/cluster/cluster.go index 827f90738..76d6c810f 100644 --- a/pkg/server/services/cluster/cluster.go +++ b/pkg/server/services/cluster/cluster.go @@ -6,7 +6,6 @@ import ( cloudevents "github.com/cloudevents/sdk-go/v2" "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/cache" @@ -15,6 +14,7 @@ import ( clusterclient "open-cluster-management.io/api/client/cluster/clientset/versioned" clusterinformerv1 "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1" clusterlisterv1 "open-cluster-management.io/api/client/cluster/listers/cluster/v1" + clusterv1 "open-cluster-management.io/api/cluster/v1" clusterce "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/cluster" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" "open-cluster-management.io/sdk-go/pkg/cloudevents/server" @@ -38,16 +38,7 @@ func NewClusterService(clusterClient clusterclient.Interface, clusterInformer cl } } -func (c *ClusterService) Get(_ context.Context, resourceID string) (*cloudevents.Event, error) { - cluster, err := c.clusterLister.Get(resourceID) - if err != nil { - return nil, err - } - - return c.codec.Encode(services.CloudEventsSourceKube, types.CloudEventsType{CloudEventsDataType: clusterce.ManagedClusterEventDataType}, cluster) -} - -func (c *ClusterService) List(listOpts types.ListOptions) ([]*cloudevents.Event, error) { +func (c *ClusterService) List(ctx context.Context, listOpts types.ListOptions) ([]*cloudevents.Event, error) { var evts []*cloudevents.Event cluster, err := c.clusterLister.Get(listOpts.ClusterName) if errors.IsNotFound(err) { @@ -104,26 +95,51 @@ func (c *ClusterService) RegisterHandler(ctx context.Context, handler server.Eve } } +// TODO handle type check error and event handler error func (c *ClusterService) EventHandlerFuncs(ctx context.Context, handler server.EventHandler) *cache.ResourceEventHandlerFuncs { return &cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - accessor, err := meta.Accessor(obj) - if err != nil { - utilruntime.HandleErrorWithContext(ctx, err, "failed to get accessor for cluster") + cluster, ok := obj.(*clusterv1.ManagedCluster) + if !ok { + utilruntime.HandleErrorWithContext(ctx, fmt.Errorf("unknown type: %T", obj), "cluster add") return } - if err := handler.OnCreate(ctx, clusterce.ManagedClusterEventDataType, accessor.GetName()); err != nil { - utilruntime.HandleErrorWithContext(ctx, err, "failed to create cluster", "clusterName", accessor.GetName()) + + eventTypes := types.CloudEventsType{ + CloudEventsDataType: clusterce.ManagedClusterEventDataType, + SubResource: types.SubResourceSpec, + Action: types.CreateRequestAction, + } + evt, err := c.codec.Encode(services.CloudEventsSourceKube, eventTypes, cluster) + if err != nil { + utilruntime.HandleErrorWithContext(ctx, err, "failed to encode cluster", "clusterName", cluster.Name) + return + } + + if err := handler.HandleEvent(ctx, evt); err != nil { + utilruntime.HandleErrorWithContext(ctx, err, "failed to create cluster", "clusterName", cluster.Name) } }, UpdateFunc: func(oldObj, newObj interface{}) { - accessor, err := meta.Accessor(newObj) - if err != nil { - utilruntime.HandleErrorWithContext(ctx, err, "failed to get accessor for cluster") + cluster, ok := newObj.(*clusterv1.ManagedCluster) + if !ok { + utilruntime.HandleErrorWithContext(ctx, fmt.Errorf("unknown type: %T", newObj), "cluster update") return } - if err := handler.OnUpdate(ctx, clusterce.ManagedClusterEventDataType, accessor.GetName()); err != nil { - utilruntime.HandleErrorWithContext(ctx, err, "failed to update cluster", "clusterName", accessor.GetName()) + + eventTypes := types.CloudEventsType{ + CloudEventsDataType: clusterce.ManagedClusterEventDataType, + SubResource: types.SubResourceSpec, + Action: types.UpdateRequestAction, + } + evt, err := c.codec.Encode(services.CloudEventsSourceKube, eventTypes, cluster) + if err != nil { + utilruntime.HandleErrorWithContext(ctx, err, "failed to encode cluster", "clusterName", cluster.Name) + return + } + + if err := handler.HandleEvent(ctx, evt); err != nil { + utilruntime.HandleErrorWithContext(ctx, err, "failed to update cluster", "clusterName", cluster.Name) } }, } diff --git a/pkg/server/services/cluster/cluster_test.go b/pkg/server/services/cluster/cluster_test.go index 315d98935..6e0564295 100644 --- a/pkg/server/services/cluster/cluster_test.go +++ b/pkg/server/services/cluster/cluster_test.go @@ -20,54 +20,6 @@ import ( testingcommon "open-cluster-management.io/ocm/pkg/common/testing" ) -func TestGet(t *testing.T) { - cases := []struct { - name string - clusters []runtime.Object - resourceID string - expectedError bool - }{ - { - name: "cluster not found", - clusters: []runtime.Object{}, - resourceID: "test-cluster", - expectedError: true, - }, - { - name: "cluster found", - resourceID: "test-cluster", - clusters: []runtime.Object{&clusterv1.ManagedCluster{ - ObjectMeta: metav1.ObjectMeta{Name: "test-cluster"}, - }}, - }, - } - - for _, c := range cases { - t.Run(c.name, func(t *testing.T) { - clusterClient := clusterfake.NewSimpleClientset(c.clusters...) - clusterInformers := clusterinformers.NewSharedInformerFactory(clusterClient, 10*time.Minute) - clusterInformer := clusterInformers.Cluster().V1().ManagedClusters() - for _, obj := range c.clusters { - if err := clusterInformer.Informer().GetStore().Add(obj); err != nil { - t.Fatal(err) - } - } - - service := NewClusterService(clusterClient, clusterInformer) - _, err := service.Get(context.Background(), c.resourceID) - if c.expectedError { - if err == nil { - t.Errorf("expected error, got nil") - } - return - } - if err != nil { - t.Errorf("unexpected error: %v", err) - } - }) - } -} - func TestList(t *testing.T) { cases := []struct { name string @@ -108,7 +60,7 @@ func TestList(t *testing.T) { } service := NewClusterService(clusterClient, clusterInformer) - evts, err := service.List(types.ListOptions{ClusterName: c.clusterName}) + evts, err := service.List(context.Background(), types.ListOptions{ClusterName: c.clusterName}) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -274,22 +226,17 @@ type clusterHandler struct { onUpdateCalled bool } -func (m *clusterHandler) OnCreate(ctx context.Context, t types.CloudEventsDataType, resourceID string) error { - if t != clusterce.ManagedClusterEventDataType { - return fmt.Errorf("expected %v, got %v", clusterce.ManagedClusterEventDataType, t) +func (m *clusterHandler) HandleEvent(ctx context.Context, evt *cloudevents.Event) error { + eventType, err := types.ParseCloudEventsType(evt.Type()) + if err != nil { + return err } - m.onCreateCalled = true - return nil -} -func (m *clusterHandler) OnUpdate(ctx context.Context, t types.CloudEventsDataType, resourceID string) error { - if t != clusterce.ManagedClusterEventDataType { - return fmt.Errorf("expected %v, got %v", clusterce.ManagedClusterEventDataType, t) + if eventType.CloudEventsDataType != clusterce.ManagedClusterEventDataType { + return fmt.Errorf("expected %v, got %v", clusterce.ManagedClusterEventDataType, eventType.CloudEventsDataType) } + + m.onCreateCalled = true m.onUpdateCalled = true return nil } - -func (m *clusterHandler) OnDelete(ctx context.Context, t types.CloudEventsDataType, resourceID string) error { - return nil -} diff --git a/pkg/server/services/csr/csr.go b/pkg/server/services/csr/csr.go index 46737c4f4..b6361f6e9 100644 --- a/pkg/server/services/csr/csr.go +++ b/pkg/server/services/csr/csr.go @@ -3,9 +3,10 @@ package csr import ( "context" "fmt" + "time" cloudevents "github.com/cloudevents/sdk-go/v2" - "k8s.io/apimachinery/pkg/api/meta" + certificatesv1 "k8s.io/api/certificates/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" @@ -42,16 +43,7 @@ func NewCSRService(csrClient kubernetes.Interface, csrInformer certificatesv1inf } } -func (c *CSRService) Get(_ context.Context, resourceID string) (*cloudevents.Event, error) { - csr, err := c.csrLister.Get(resourceID) - if err != nil { - return nil, err - } - - return c.codec.Encode(services.CloudEventsSourceKube, types.CloudEventsType{CloudEventsDataType: csrce.CSREventDataType}, csr) -} - -func (c *CSRService) List(listOpts types.ListOptions) ([]*cloudevents.Event, error) { +func (c *CSRService) List(ctx context.Context, listOpts types.ListOptions) ([]*cloudevents.Event, error) { var evts []*cloudevents.Event requirement, err := labels.NewRequirement(clusterv1.ClusterNameLabelKey, selection.Equals, []string{listOpts.ClusterName}) if err != nil { @@ -106,27 +98,89 @@ func (c *CSRService) RegisterHandler(ctx context.Context, handler server.EventHa } } +// TODO handle type check error and event handler error func (c *CSRService) EventHandlerFuncs(ctx context.Context, handler server.EventHandler) *cache.ResourceEventHandlerFuncs { return &cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - accessor, err := meta.Accessor(obj) - if err != nil { - utilruntime.HandleErrorWithContext(ctx, err, "failed to get accessor for csr") + csr, ok := obj.(*certificatesv1.CertificateSigningRequest) + if !ok { + utilruntime.HandleErrorWithContext(ctx, fmt.Errorf("unknown type: %T", obj), "csr add") return } - if err := handler.OnCreate(ctx, csrce.CSREventDataType, accessor.GetName()); err != nil { - utilruntime.HandleErrorWithContext(ctx, err, "failed to create csr", "csrName", accessor.GetName()) + + eventTypes := types.CloudEventsType{ + CloudEventsDataType: csrce.CSREventDataType, + SubResource: types.SubResourceSpec, + Action: types.CreateRequestAction, + } + evt, err := c.codec.Encode(services.CloudEventsSourceKube, eventTypes, csr) + if err != nil { + utilruntime.HandleErrorWithContext(ctx, err, "failed to encode csr", "name", csr.Name) + return + } + + if err := handler.HandleEvent(ctx, evt); err != nil { + utilruntime.HandleErrorWithContext(ctx, err, "failed to create csr", "name", csr.Name) } }, UpdateFunc: func(oldObj, newObj interface{}) { - accessor, err := meta.Accessor(newObj) - if err != nil { - utilruntime.HandleErrorWithContext(ctx, err, "failed to get accessor for csr") + csr, ok := newObj.(*certificatesv1.CertificateSigningRequest) + if !ok { + utilruntime.HandleErrorWithContext(ctx, fmt.Errorf("unknown type: %T", newObj), "csr update") return } - if err := handler.OnUpdate(ctx, csrce.CSREventDataType, accessor.GetName()); err != nil { - utilruntime.HandleErrorWithContext(ctx, err, "failed to update csr", "csrName", accessor.GetName()) + + eventTypes := types.CloudEventsType{ + CloudEventsDataType: csrce.CSREventDataType, + SubResource: types.SubResourceSpec, + Action: types.UpdateRequestAction, } + evt, err := c.codec.Encode(services.CloudEventsSourceKube, eventTypes, csr) + if err != nil { + utilruntime.HandleErrorWithContext(ctx, err, "failed to encode csr", "name", csr.Name) + return + } + + if err := handler.HandleEvent(ctx, evt); err != nil { + utilruntime.HandleErrorWithContext(ctx, err, "failed to update csr", "name", csr.Name) + } + }, + DeleteFunc: func(obj interface{}) { + csr, ok := obj.(*certificatesv1.CertificateSigningRequest) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleErrorWithContext(ctx, fmt.Errorf("unknown type: %T", obj), "csr delete") + return + } + + csr, ok = tombstone.Obj.(*certificatesv1.CertificateSigningRequest) + if !ok { + utilruntime.HandleErrorWithContext(ctx, fmt.Errorf("unknown type: %T", obj), "csr delete") + return + } + } + + csr = csr.DeepCopy() + if csr.DeletionTimestamp.IsZero() { + csr.DeletionTimestamp = &metav1.Time{Time: time.Now()} + } + + eventTypes := types.CloudEventsType{ + CloudEventsDataType: csrce.CSREventDataType, + SubResource: types.SubResourceSpec, + Action: types.DeleteRequestAction, + } + evt, err := c.codec.Encode(services.CloudEventsSourceKube, eventTypes, csr) + if err != nil { + utilruntime.HandleErrorWithContext(ctx, err, "failed to encode csr", "name", csr.Name) + return + } + + if err := handler.HandleEvent(ctx, evt); err != nil { + utilruntime.HandleErrorWithContext(ctx, err, "failed to delete csr", "name", csr.Name) + } + }, } } diff --git a/pkg/server/services/csr/csr_test.go b/pkg/server/services/csr/csr_test.go index 8b7db1105..8afe23655 100644 --- a/pkg/server/services/csr/csr_test.go +++ b/pkg/server/services/csr/csr_test.go @@ -20,59 +20,6 @@ import ( testingcommon "open-cluster-management.io/ocm/pkg/common/testing" ) -func TestGet(t *testing.T) { - cases := []struct { - name string - csrs []runtime.Object - resourceID string - expectedError bool - }{ - { - name: "csr not found", - csrs: []runtime.Object{}, - resourceID: "test-csr", - expectedError: true, - }, - { - name: "get csr", - resourceID: "test-csr", - csrs: []runtime.Object{&certificatesv1.CertificateSigningRequest{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-csr", - Labels: map[string]string{ - "open-cluster-management.io/cluster-name": "test-cluster", - }, - }, - }}, - }, - } - - for _, c := range cases { - t.Run(c.name, func(t *testing.T) { - csrClient := kubefake.NewSimpleClientset(c.csrs...) - csrInformers := informers.NewSharedInformerFactory(csrClient, 10*time.Minute) - csrInformer := csrInformers.Certificates().V1().CertificateSigningRequests() - for _, obj := range c.csrs { - if err := csrInformer.Informer().GetStore().Add(obj); err != nil { - t.Fatal(err) - } - } - - service := NewCSRService(csrClient, csrInformer) - _, err := service.Get(context.Background(), c.resourceID) - if c.expectedError { - if err == nil { - t.Errorf("expected error, got nil") - } - return - } - if err != nil { - t.Errorf("unexpected error: %v", err) - } - }) - } -} - func TestList(t *testing.T) { cases := []struct { name string @@ -120,7 +67,7 @@ func TestList(t *testing.T) { } service := NewCSRService(csrClient, csrInformer) - evts, err := service.List(types.ListOptions{ClusterName: c.clusterName}) + evts, err := service.List(context.Background(), types.ListOptions{ClusterName: c.clusterName}) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -216,7 +163,12 @@ func TestEventHandlerFuncs(t *testing.T) { eventHandlerFuncs := service.EventHandlerFuncs(context.Background(), handler) csr := &certificatesv1.CertificateSigningRequest{ - ObjectMeta: metav1.ObjectMeta{Name: "test-csr"}, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-csr", + Labels: map[string]string{ + "open-cluster-management.io/cluster-name": "test-cluster", + }, + }, } eventHandlerFuncs.AddFunc(csr) if !handler.onCreateCalled { @@ -234,22 +186,17 @@ type csrOnHandler struct { onUpdateCalled bool } -func (m *csrOnHandler) OnCreate(ctx context.Context, t types.CloudEventsDataType, resourceID string) error { - if t != csrce.CSREventDataType { - return fmt.Errorf("expected %v, got %v", csrce.CSREventDataType, t) +func (m *csrOnHandler) HandleEvent(ctx context.Context, evt *cloudevents.Event) error { + eventType, err := types.ParseCloudEventsType(evt.Type()) + if err != nil { + return err } - m.onCreateCalled = true - return nil -} -func (m *csrOnHandler) OnUpdate(ctx context.Context, t types.CloudEventsDataType, resourceID string) error { - if t != csrce.CSREventDataType { - return fmt.Errorf("expected %v, got %v", csrce.CSREventDataType, t) + if eventType.CloudEventsDataType != csrce.CSREventDataType { + return fmt.Errorf("expected %v, got %v", csrce.CSREventDataType, eventType.CloudEventsDataType) } + + m.onCreateCalled = true m.onUpdateCalled = true return nil } - -func (m *csrOnHandler) OnDelete(ctx context.Context, t types.CloudEventsDataType, resourceID string) error { - return nil -} diff --git a/pkg/server/services/event/event.go b/pkg/server/services/event/event.go index 745a6b85a..9880d4d61 100644 --- a/pkg/server/services/event/event.go +++ b/pkg/server/services/event/event.go @@ -28,11 +28,7 @@ func NewEventService(client kubernetes.Interface) server.Service { } } -func (e *EventService) Get(ctx context.Context, resourceID string) (*cloudevents.Event, error) { - return nil, errors.NewMethodNotSupported(eventv1.Resource("events"), "get") -} - -func (e *EventService) List(listOpts types.ListOptions) ([]*cloudevents.Event, error) { +func (e *EventService) List(ctx context.Context, listOpts types.ListOptions) ([]*cloudevents.Event, error) { return nil, errors.NewMethodNotSupported(eventv1.Resource("events"), "list") } diff --git a/pkg/server/services/event/event_test.go b/pkg/server/services/event/event_test.go index 0f4a9c794..62c618bb2 100644 --- a/pkg/server/services/event/event_test.go +++ b/pkg/server/services/event/event_test.go @@ -17,16 +17,9 @@ import ( testingcommon "open-cluster-management.io/ocm/pkg/common/testing" ) -func TestGet(t *testing.T) { - service := NewEventService(kubefake.NewSimpleClientset()) - if _, err := service.Get(context.Background(), "test-event"); err == nil { - t.Errorf("expected error, but failed") - } -} - func TestList(t *testing.T) { service := NewEventService(kubefake.NewSimpleClientset()) - if _, err := service.List(types.ListOptions{}); err == nil { + if _, err := service.List(context.Background(), types.ListOptions{}); err == nil { t.Errorf("expected error, but failed") } } diff --git a/pkg/server/services/lease/lease.go b/pkg/server/services/lease/lease.go index a07fac72b..27f97f132 100644 --- a/pkg/server/services/lease/lease.go +++ b/pkg/server/services/lease/lease.go @@ -3,8 +3,10 @@ package lease import ( "context" "fmt" + "time" cloudevents "github.com/cloudevents/sdk-go/v2" + coordinationv1 "k8s.io/api/coordination/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -38,19 +40,7 @@ func NewLeaseService(client kubernetes.Interface, informer leasev1.LeaseInformer } } -func (l *LeaseService) Get(ctx context.Context, resourceID string) (*cloudevents.Event, error) { - namespace, name, err := cache.SplitMetaNamespaceKey(resourceID) - if err != nil { - return nil, err - } - lease, err := l.lister.Leases(namespace).Get(name) - if err != nil { - return nil, err - } - return l.codec.Encode(services.CloudEventsSourceKube, types.CloudEventsType{CloudEventsDataType: leasece.LeaseEventDataType}, lease) -} - -func (l *LeaseService) List(listOpts types.ListOptions) ([]*cloudevents.Event, error) { +func (l *LeaseService) List(ctx context.Context, listOpts types.ListOptions) ([]*cloudevents.Event, error) { leases, err := l.lister.Leases(listOpts.ClusterName).List(labels.SelectorFromSet(labels.Set{ clusterv1.ClusterNameLabelKey: listOpts.ClusterName, })) @@ -97,26 +87,87 @@ func (l *LeaseService) RegisterHandler(ctx context.Context, handler server.Event } } +// TODO handle type check error and event handler error func (l *LeaseService) EventHandlerFuncs(ctx context.Context, handler server.EventHandler) *cache.ResourceEventHandlerFuncs { return &cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - key, err := cache.MetaNamespaceKeyFunc(obj) - if err != nil { - utilruntime.HandleErrorWithContext(ctx, err, "failed to get key for lease") + lease, ok := obj.(*coordinationv1.Lease) + if !ok { + utilruntime.HandleErrorWithContext(ctx, fmt.Errorf("unknown type: %T", obj), "lease add") return } - if err := handler.OnCreate(ctx, leasece.LeaseEventDataType, key); err != nil { - utilruntime.HandleErrorWithContext(ctx, err, "failed to create lease", "key", key) + + eventTypes := types.CloudEventsType{ + CloudEventsDataType: leasece.LeaseEventDataType, + SubResource: types.SubResourceSpec, + Action: types.CreateRequestAction, + } + evt, err := l.codec.Encode(services.CloudEventsSourceKube, eventTypes, lease) + if err != nil { + utilruntime.HandleErrorWithContext(ctx, err, "failed to encode lease", "namespace", lease.Namespace, "name", lease.Name) + return + } + + if err := handler.HandleEvent(ctx, evt); err != nil { + utilruntime.HandleErrorWithContext(ctx, err, "failed to create lease", "namespace", lease.Namespace, "name", lease.Name) } }, UpdateFunc: func(oldObj, newObj interface{}) { - key, err := cache.MetaNamespaceKeyFunc(newObj) - if err != nil { - utilruntime.HandleErrorWithContext(ctx, err, "failed to get key for lease") + lease, ok := newObj.(*coordinationv1.Lease) + if !ok { + utilruntime.HandleErrorWithContext(ctx, fmt.Errorf("unknown type: %T", newObj), "lease update") return } - if err := handler.OnUpdate(ctx, leasece.LeaseEventDataType, key); err != nil { - utilruntime.HandleErrorWithContext(ctx, err, "failed to update lease", "key", key) + + eventTypes := types.CloudEventsType{ + CloudEventsDataType: leasece.LeaseEventDataType, + SubResource: types.SubResourceSpec, + Action: types.UpdateRequestAction, + } + evt, err := l.codec.Encode(services.CloudEventsSourceKube, eventTypes, lease) + if err != nil { + utilruntime.HandleErrorWithContext(ctx, err, "failed to encode lease", "namespace", lease.Namespace, "name", lease.Name) + return + } + + if err := handler.HandleEvent(ctx, evt); err != nil { + utilruntime.HandleErrorWithContext(ctx, err, "failed to update lease", "namespace", lease.Namespace, "name", lease.Name) + } + }, + DeleteFunc: func(obj interface{}) { + lease, ok := obj.(*coordinationv1.Lease) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleErrorWithContext(ctx, fmt.Errorf("unknown type: %T", obj), "lease delete") + return + } + + lease, ok = tombstone.Obj.(*coordinationv1.Lease) + if !ok { + utilruntime.HandleErrorWithContext(ctx, fmt.Errorf("unknown type: %T", obj), "lease delete") + return + } + } + + lease = lease.DeepCopy() + if lease.DeletionTimestamp.IsZero() { + lease.DeletionTimestamp = &metav1.Time{Time: time.Now()} + } + + eventTypes := types.CloudEventsType{ + CloudEventsDataType: leasece.LeaseEventDataType, + SubResource: types.SubResourceSpec, + Action: types.DeleteRequestAction, + } + evt, err := l.codec.Encode(services.CloudEventsSourceKube, eventTypes, lease) + if err != nil { + utilruntime.HandleErrorWithContext(ctx, err, "failed to encode lease", "namespace", lease.Namespace, "name", lease.Name) + return + } + + if err := handler.HandleEvent(ctx, evt); err != nil { + utilruntime.HandleErrorWithContext(ctx, err, "failed to delete lease", "namespace", lease.Namespace, "name", lease.Name) } }, } diff --git a/pkg/server/services/lease/lease_test.go b/pkg/server/services/lease/lease_test.go index 9fc9df379..12ade9500 100644 --- a/pkg/server/services/lease/lease_test.go +++ b/pkg/server/services/lease/lease_test.go @@ -20,54 +20,6 @@ import ( testingcommon "open-cluster-management.io/ocm/pkg/common/testing" ) -func TestGet(t *testing.T) { - cases := []struct { - name string - leases []runtime.Object - resourceID string - expectedError bool - }{ - { - name: "lease not found", - leases: []runtime.Object{}, - resourceID: "test-cluster", - expectedError: true, - }, - { - name: "lease found", - resourceID: "test-lease-namespace/test-lease", - leases: []runtime.Object{&coordinationv1.Lease{ - ObjectMeta: metav1.ObjectMeta{Name: "test-lease", Namespace: "test-lease-namespace"}, - }}, - }, - } - - for _, c := range cases { - t.Run(c.name, func(t *testing.T) { - kubeClient := kubefake.NewSimpleClientset(c.leases...) - kubeInformers := informers.NewSharedInformerFactory(kubeClient, 10*time.Minute) - kubeInformer := kubeInformers.Coordination().V1().Leases() - for _, obj := range c.leases { - if err := kubeInformer.Informer().GetStore().Add(obj); err != nil { - t.Fatal(err) - } - } - - service := NewLeaseService(kubeClient, kubeInformer) - _, err := service.Get(context.Background(), c.resourceID) - if c.expectedError { - if err == nil { - t.Errorf("expected error, got nil") - } - return - } - if err != nil { - t.Errorf("unexpected error: %v", err) - } - }) - } -} - func TestList(t *testing.T) { cases := []struct { name string @@ -114,7 +66,7 @@ func TestList(t *testing.T) { } service := NewLeaseService(kubeClient, kubeInformer) - evts, err := service.List(types.ListOptions{ClusterName: c.clusterName}) + evts, err := service.List(context.Background(), types.ListOptions{ClusterName: c.clusterName}) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -217,7 +169,13 @@ func TestEventHandlerFuncs(t *testing.T) { eventHandlerFuncs := service.EventHandlerFuncs(context.Background(), handler) lease := &coordinationv1.Lease{ - ObjectMeta: metav1.ObjectMeta{Name: "test-lease", Namespace: "test-lease-namespace"}, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-lease", + Namespace: "test-lease-namespace", + Labels: map[string]string{ + "open-cluster-management.io/cluster-name": "test-cluster", + }, + }, } eventHandlerFuncs.AddFunc(lease) if !handler.onCreateCalled { @@ -228,29 +186,37 @@ func TestEventHandlerFuncs(t *testing.T) { if !handler.onUpdateCalled { t.Errorf("onUpdate not called") } + + eventHandlerFuncs.DeleteFunc(lease) + if !handler.onDeleteCalled { + t.Errorf("onDelete not called") + } } type leaseHandler struct { onCreateCalled bool onUpdateCalled bool + onDeleteCalled bool } -func (m *leaseHandler) OnCreate(ctx context.Context, t types.CloudEventsDataType, resourceID string) error { - if t != leasece.LeaseEventDataType { - return fmt.Errorf("expected %v, got %v", leasece.LeaseEventDataType, t) +func (m *leaseHandler) HandleEvent(ctx context.Context, evt *cloudevents.Event) error { + eventType, err := types.ParseCloudEventsType(evt.Type()) + if err != nil { + return err } - m.onCreateCalled = true - return nil -} -func (m *leaseHandler) OnUpdate(ctx context.Context, t types.CloudEventsDataType, resourceID string) error { - if t != leasece.LeaseEventDataType { - return fmt.Errorf("expected %v, got %v", leasece.LeaseEventDataType, t) + if eventType.CloudEventsDataType != leasece.LeaseEventDataType { + return fmt.Errorf("expected %v, got %v", leasece.LeaseEventDataType, eventType.CloudEventsDataType) + } + + switch eventType.Action { + case types.CreateRequestAction: + m.onCreateCalled = true + case types.UpdateRequestAction: + m.onUpdateCalled = true + case types.DeleteRequestAction: + m.onDeleteCalled = true } - m.onUpdateCalled = true - return nil -} -func (m *leaseHandler) OnDelete(ctx context.Context, t types.CloudEventsDataType, resourceID string) error { return nil } diff --git a/pkg/server/services/tokenrequest/tokenreqeust.go b/pkg/server/services/tokenrequest/tokenreqeust.go index 052956ed2..dcf815314 100644 --- a/pkg/server/services/tokenrequest/tokenreqeust.go +++ b/pkg/server/services/tokenrequest/tokenreqeust.go @@ -3,14 +3,10 @@ package tokenrequest import ( "context" "fmt" - "time" cloudevents "github.com/cloudevents/sdk-go/v2" - authenticationv1 "k8s.io/api/authentication/v1" - "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" sace "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/serviceaccount" @@ -20,54 +16,21 @@ import ( "open-cluster-management.io/ocm/pkg/server/services" ) -var ( - // TokenCacheTTL is the time-to-live for cached token responses - // Tokens are cached temporarily until the agent retrieves them - TokenCacheTTL = 30 * time.Second -) - type TokenRequestService struct { client kubernetes.Interface codec *sace.TokenRequestCodec handler server.EventHandler - store cache.Store } -// NewTokenRequestService creates a new TokenRequestService with a TTL-based token cache +// NewTokenRequestService creates a new TokenRequestService func NewTokenRequestService(client kubernetes.Interface) server.Service { return &TokenRequestService{ client: client, codec: sace.NewTokenRequestCodec(), - store: cache.NewTTLStore(func(obj interface{}) (string, error) { - tokenRequest, ok := obj.(*authenticationv1.TokenRequest) - if !ok { - return "", fmt.Errorf("object is not a TokenRequest") - } - return string(tokenRequest.UID), nil - }, TokenCacheTTL), } } -func (t *TokenRequestService) Get(ctx context.Context, resourceID string) (*cloudevents.Event, error) { - // Get the token request from store by resourceID - obj, exists, err := t.store.GetByKey(resourceID) - if err != nil { - return nil, fmt.Errorf("failed to get token request from store: %v", err) - } - if !exists { - return nil, errors.NewNotFound(authenticationv1.Resource("tokenrequests"), resourceID) - } - - tokenRequest, ok := obj.(*authenticationv1.TokenRequest) - if !ok { - return nil, fmt.Errorf("stored object is not a TokenRequest") - } - - // Token will be automatically removed from cache when TTL expires - return t.codec.Encode(services.CloudEventsSourceKube, types.CloudEventsType{CloudEventsDataType: sace.TokenRequestDataType}, tokenRequest) -} - -func (t *TokenRequestService) List(listOpts types.ListOptions) ([]*cloudevents.Event, error) { +func (t *TokenRequestService) List(ctx context.Context, listOpts types.ListOptions) ([]*cloudevents.Event, error) { // resync is not needed, so list is not required return nil, nil } @@ -96,19 +59,24 @@ func (t *TokenRequestService) HandleStatusUpdate(ctx context.Context, evt *cloud // Create a token for the service account tokenResponse, err := t.client.CoreV1().ServiceAccounts(tokenRequest.Namespace).CreateToken(ctx, tokenRequest.Name, tokenRequest, metav1.CreateOptions{}) if err != nil { - return fmt.Errorf("failed to create token for service account %s/%s: %v", tokenRequest.Namespace, tokenRequest.Name, err) + return err } // set request id back tokenResponse.UID = requestID - // Cache the token response in the store for later retrieval - if err := t.store.Add(tokenResponse); err != nil { - return fmt.Errorf("failed to cache token response: %v", err) + // Notify the handler that the token is ready for retrieval + eventTypes := types.CloudEventsType{ + CloudEventsDataType: sace.TokenRequestDataType, + SubResource: types.SubResourceSpec, + Action: types.CreateRequestAction, + } + evt, err := t.codec.Encode(services.CloudEventsSourceKube, eventTypes, tokenResponse) + if err != nil { + return fmt.Errorf("failed to encode token response: %v", err) } - // Notify the handler that the token is ready for retrieval - if err := t.handler.OnCreate(ctx, eventType.CloudEventsDataType, string(tokenRequest.UID)); err != nil { + if err := t.handler.HandleEvent(ctx, evt); err != nil { return fmt.Errorf("failed to notify handler: %v", err) } diff --git a/pkg/server/services/tokenrequest/tokenrequest_test.go b/pkg/server/services/tokenrequest/tokenrequest_test.go index a2fe0379c..6f3bd7438 100644 --- a/pkg/server/services/tokenrequest/tokenrequest_test.go +++ b/pkg/server/services/tokenrequest/tokenrequest_test.go @@ -3,13 +3,12 @@ package tokenrequest import ( "context" "fmt" + "strings" "testing" - "time" cloudevents "github.com/cloudevents/sdk-go/v2" authenticationv1 "k8s.io/api/authentication/v1" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" kubefake "k8s.io/client-go/kubernetes/fake" @@ -37,93 +36,13 @@ func TestNewTokenRequestService(t *testing.T) { if tokenService.codec == nil { t.Errorf("codec should not be nil") } - - if tokenService.store == nil { - t.Errorf("store should not be nil") - } -} - -func TestGet(t *testing.T) { - cases := []struct { - name string - resourceID string - setupStore func(*TokenRequestService) - expectedError bool - errorCheck func(error) bool - }{ - { - name: "token not found", - resourceID: "non-existent-token", - setupStore: func(s *TokenRequestService) { - // Empty store - }, - expectedError: true, - errorCheck: func(err error) bool { - return errors.IsNotFound(err) - }, - }, - { - name: "token found", - resourceID: "test-token-uid", - setupStore: func(s *TokenRequestService) { - tokenRequest := &authenticationv1.TokenRequest{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-sa", - Namespace: "test-namespace", - }, - Spec: authenticationv1.TokenRequestSpec{ - Audiences: []string{"test-audience"}, - }, - Status: authenticationv1.TokenRequestStatus{ - Token: "test-token-value", - ExpirationTimestamp: metav1.NewTime(time.Now().Add(1 * time.Hour)), - }, - } - tokenRequest.UID = "test-token-uid" - s.store.Add(tokenRequest) - }, - expectedError: false, - }, - } - - for _, c := range cases { - t.Run(c.name, func(t *testing.T) { - kubeClient := kubefake.NewSimpleClientset() - service := NewTokenRequestService(kubeClient).(*TokenRequestService) - - if c.setupStore != nil { - c.setupStore(service) - } - - evt, err := service.Get(context.Background(), c.resourceID) - if c.expectedError { - if err == nil { - t.Errorf("expected error, got nil") - return - } - if c.errorCheck != nil && !c.errorCheck(err) { - t.Errorf("error check failed for error: %v", err) - } - return - } - - if err != nil { - t.Errorf("unexpected error: %v", err) - return - } - - if evt == nil { - t.Errorf("expected event, got nil") - } - }) - } } func TestList(t *testing.T) { kubeClient := kubefake.NewSimpleClientset() service := NewTokenRequestService(kubeClient) - evts, err := service.List(types.ListOptions{}) + evts, err := service.List(context.Background(), types.ListOptions{}) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -139,7 +58,6 @@ func TestHandleStatusUpdate(t *testing.T) { serviceAccounts []runtime.Object tokenRequestEvt *cloudevents.Event validateActions func(t *testing.T, actions []clienttesting.Action) - validateCache func(t *testing.T, service *TokenRequestService) validateHandler func(t *testing.T, handler *mockEventHandler) reactorError error expectedError bool @@ -211,29 +129,10 @@ func TestHandleStatusUpdate(t *testing.T) { t.Errorf("expected subresource %s, got %s", "token", actions[0].GetSubresource()) } }, - validateCache: func(t *testing.T, service *TokenRequestService) { - obj, exists, err := service.store.GetByKey("test-request-uid") - if err != nil { - t.Errorf("unexpected error getting from cache: %v", err) - } - if !exists { - t.Errorf("expected token to be cached") - } - tokenResponse, ok := obj.(*authenticationv1.TokenRequest) - if !ok { - t.Errorf("expected TokenRequest, got %T", obj) - } - if tokenResponse.UID != "test-request-uid" { - t.Errorf("expected UID %s, got %s", "test-request-uid", tokenResponse.UID) - } - }, validateHandler: func(t *testing.T, handler *mockEventHandler) { if !handler.onCreateCalled { t.Errorf("expected OnCreate to be called") } - if handler.lastResourceID != "test-request-uid" { - t.Errorf("expected resourceID %s, got %s", "test-request-uid", handler.lastResourceID) - } if handler.lastDataType != sace.TokenRequestDataType { t.Errorf("expected data type %s, got %s", sace.TokenRequestDataType, handler.lastDataType) } @@ -262,7 +161,7 @@ func TestHandleStatusUpdate(t *testing.T) { }(), reactorError: fmt.Errorf("simulated error"), expectedError: true, - expectedErrorText: "failed to create token for service account", + expectedErrorText: "simulated error", }, } @@ -289,8 +188,8 @@ func TestHandleStatusUpdate(t *testing.T) { t.Errorf("expected error, got nil") return } - if c.expectedErrorText != "" && err.Error()[:len(c.expectedErrorText)] != c.expectedErrorText { - t.Errorf("expected error to contain %q, got %q", c.expectedErrorText, err.Error()) + if c.expectedErrorText != "" && !strings.HasPrefix(err.Error(), c.expectedErrorText) { + t.Errorf("expected error to start with %q, got %q", c.expectedErrorText, err.Error()) } return } @@ -304,10 +203,6 @@ func TestHandleStatusUpdate(t *testing.T) { c.validateActions(t, kubeClient.Actions()) } - if c.validateCache != nil { - c.validateCache(t, service) - } - if c.validateHandler != nil { c.validateHandler(t, mockHandler) } @@ -327,73 +222,6 @@ func TestRegisterHandler(t *testing.T) { } } -func TestTokenCacheTTL(t *testing.T) { - // Save original TTL and restore after test - originalTTL := TokenCacheTTL - defer func() { - TokenCacheTTL = originalTTL - }() - - // Use a shorter TTL for faster test - TokenCacheTTL = 2 * time.Second - - kubeClient := kubefake.NewSimpleClientset(&corev1.ServiceAccount{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-sa", - Namespace: "test-namespace", - }, - }) - - service := NewTokenRequestService(kubeClient).(*TokenRequestService) - mockHandler := &mockEventHandler{} - service.RegisterHandler(context.Background(), mockHandler) - - // Create a token request event - evt := types.NewEventBuilder("test", types.CloudEventsType{ - CloudEventsDataType: sace.TokenRequestDataType, - SubResource: types.SubResourceSpec, - Action: types.CreateRequestAction, - }).NewEvent() - tokenRequest := &authenticationv1.TokenRequest{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-sa", - Namespace: "test-namespace", - }, - Spec: authenticationv1.TokenRequestSpec{ - Audiences: []string{"test-audience"}, - }, - } - tokenRequest.UID = "test-ttl-uid" - evt.SetData(cloudevents.ApplicationJSON, tokenRequest) - - // Handle the event to cache the token - err := service.HandleStatusUpdate(context.Background(), &evt) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - // Verify token is in cache - _, exists, err := service.store.GetByKey("test-ttl-uid") - if err != nil { - t.Errorf("unexpected error: %v", err) - } - if !exists { - t.Errorf("expected token to be in cache") - } - - // Wait for TTL to expire - time.Sleep(TokenCacheTTL + 1*time.Second) - - // Verify token is removed from cache - _, exists, err = service.store.GetByKey("test-ttl-uid") - if err != nil { - t.Errorf("unexpected error: %v", err) - } - if exists { - t.Errorf("expected token to be removed from cache after TTL") - } -} - // mockEventHandler is a mock implementation of server.EventHandler for testing type mockEventHandler struct { onCreateCalled bool @@ -403,23 +231,24 @@ type mockEventHandler struct { lastResourceID string } -func (m *mockEventHandler) OnCreate(ctx context.Context, dataType types.CloudEventsDataType, resourceID string) error { - m.onCreateCalled = true - m.lastDataType = dataType - m.lastResourceID = resourceID - return nil -} +func (m *mockEventHandler) HandleEvent(ctx context.Context, evt *cloudevents.Event) error { + eventType, err := types.ParseCloudEventsType(evt.Type()) + if err != nil { + return err + } -func (m *mockEventHandler) OnUpdate(ctx context.Context, dataType types.CloudEventsDataType, resourceID string) error { - m.onUpdateCalled = true - m.lastDataType = dataType - m.lastResourceID = resourceID - return nil -} + m.lastDataType = eventType.CloudEventsDataType + m.lastResourceID = evt.ID() + + // Determine what kind of event it is based on the action type + switch eventType.Action { + case types.CreateRequestAction: + m.onCreateCalled = true + case types.UpdateRequestAction: + m.onUpdateCalled = true + case types.DeleteRequestAction: + m.onDeleteCalled = true + } -func (m *mockEventHandler) OnDelete(ctx context.Context, dataType types.CloudEventsDataType, resourceID string) error { - m.onDeleteCalled = true - m.lastDataType = dataType - m.lastResourceID = resourceID return nil } diff --git a/pkg/server/services/work/work.go b/pkg/server/services/work/work.go index 22638d7f4..5f9e514e4 100644 --- a/pkg/server/services/work/work.go +++ b/pkg/server/services/work/work.go @@ -3,12 +3,14 @@ package work import ( "context" "fmt" + "time" cloudevents "github.com/cloudevents/sdk-go/v2" cloudeventstypes "github.com/cloudevents/sdk-go/v2/types" "github.com/google/go-cmp/cmp" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" kubetypes "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -50,21 +52,7 @@ func NewWorkService( } } -func (w *WorkService) Get(ctx context.Context, resourceID string) (*cloudevents.Event, error) { - namespace, name, err := cache.SplitMetaNamespaceKey(resourceID) - if err != nil { - return nil, err - } - work, err := w.workLister.ManifestWorks(namespace).Get(name) - if err != nil { - return nil, err - } - - work = work.DeepCopy() - return w.codec.Encode(services.CloudEventsSourceKube, types.CloudEventsType{CloudEventsDataType: payload.ManifestBundleEventDataType}, work) -} - -func (w *WorkService) List(listOpts types.ListOptions) ([]*cloudevents.Event, error) { +func (w *WorkService) List(ctx context.Context, listOpts types.ListOptions) ([]*cloudevents.Event, error) { works, err := w.workLister.ManifestWorks(listOpts.ClusterName).List(labels.Everything()) if err != nil { return nil, err @@ -158,10 +146,11 @@ func (w *WorkService) RegisterHandler(ctx context.Context, handler server.EventH } func (w *WorkService) EventHandlerFuncs(ctx context.Context, handler server.EventHandler) *cache.ResourceEventHandlerFuncs { + // TODO handle type check error and event handler error return &cache.ResourceEventHandlerFuncs{ - AddFunc: handleOnCreateFunc(ctx, handler), - UpdateFunc: handleOnUpdateFunc(ctx, handler), - DeleteFunc: handleOnDeleteFunc(ctx, handler), + AddFunc: w.handleOnCreateFunc(ctx, handler), + UpdateFunc: w.handleOnUpdateFunc(ctx, handler), + DeleteFunc: w.handleOnDeleteFunc(ctx, handler), } } @@ -180,61 +169,110 @@ func (w *WorkService) getWorkByUID(clusterName string, uid kubetypes.UID) (*work return nil, apierrors.NewNotFound(common.ManifestWorkGR, string(uid)) } -func handleOnCreateFunc(ctx context.Context, handler server.EventHandler) func(obj interface{}) { +func (w *WorkService) handleOnCreateFunc(ctx context.Context, handler server.EventHandler) func(obj interface{}) { return func(obj interface{}) { - accessor, err := meta.Accessor(obj) - if err != nil { - utilruntime.HandleErrorWithContext(ctx, err, "failed to get accessor for work") + work, ok := obj.(*workv1.ManifestWork) + if !ok { + utilruntime.HandleErrorWithContext(ctx, fmt.Errorf("unknown type: %T", obj), "work create") return } - id := accessor.GetNamespace() + "/" + accessor.GetName() - if err := handler.OnCreate(ctx, payload.ManifestBundleEventDataType, id); err != nil { + + eventTypes := types.CloudEventsType{ + CloudEventsDataType: payload.ManifestBundleEventDataType, + SubResource: types.SubResourceSpec, + Action: types.CreateRequestAction, + } + evt, err := w.codec.Encode(services.CloudEventsSourceKube, eventTypes, work) + if err != nil { + utilruntime.HandleErrorWithContext(ctx, err, "failed to encode work", + "namespace", work.Namespace, "name", work.Name) + return + } + + if err := handler.HandleEvent(ctx, evt); err != nil { utilruntime.HandleErrorWithContext(ctx, err, "failed to create work", - "manifestWork", accessor.GetName(), "manifestWorkNamespace", accessor.GetNamespace()) + "namespace", work.Namespace, "name", work.Name) } } } -func handleOnUpdateFunc(ctx context.Context, handler server.EventHandler) func(oldObj, newObj interface{}) { +func (w *WorkService) handleOnUpdateFunc(ctx context.Context, handler server.EventHandler) func(oldObj, newObj interface{}) { return func(oldObj, newObj interface{}) { - oldAccessor, err := meta.Accessor(oldObj) - if err != nil { - utilruntime.HandleErrorWithContext(ctx, err, "failed to get accessor for work") + oldWork, ok := oldObj.(*workv1.ManifestWork) + if !ok { + utilruntime.HandleErrorWithContext(ctx, fmt.Errorf("unknown type: %T", oldObj), "work update") return } - newAccessor, err := meta.Accessor(newObj) - if err != nil { - utilruntime.HandleErrorWithContext(ctx, err, "failed to get accessor for work") + newWork, ok := newObj.(*workv1.ManifestWork) + if !ok { + utilruntime.HandleErrorWithContext(ctx, fmt.Errorf("unknown type: %T", newObj), "work update") return } // the manifestwork is not changed and is not deleting - if cmp.Equal(oldAccessor.GetLabels(), newAccessor.GetLabels()) && - cmp.Equal(oldAccessor.GetAnnotations(), newAccessor.GetAnnotations()) && - oldAccessor.GetGeneration() >= newAccessor.GetGeneration() && - newAccessor.GetDeletionTimestamp().IsZero() { + if cmp.Equal(oldWork.Labels, newWork.Labels) && + cmp.Equal(oldWork.Annotations, newWork.Annotations) && + oldWork.Generation >= newWork.Generation && + newWork.DeletionTimestamp.IsZero() { return } - id := newAccessor.GetNamespace() + "/" + newAccessor.GetName() - if err := handler.OnUpdate(ctx, payload.ManifestBundleEventDataType, id); err != nil { - utilruntime.HandleErrorWithContext(ctx, err, "failed to update work", - "manifestWork", newAccessor.GetName(), "manifestWorkNamespace", newAccessor.GetNamespace()) + eventTypes := types.CloudEventsType{ + CloudEventsDataType: payload.ManifestBundleEventDataType, + SubResource: types.SubResourceSpec, + Action: types.UpdateRequestAction, } - } -} - -func handleOnDeleteFunc(ctx context.Context, handler server.EventHandler) func(obj interface{}) { - return func(obj interface{}) { - accessor, err := meta.Accessor(obj) + evt, err := w.codec.Encode(services.CloudEventsSourceKube, eventTypes, newWork) if err != nil { - utilruntime.HandleErrorWithContext(ctx, err, "failed to get accessor for work") + utilruntime.HandleErrorWithContext(ctx, err, "failed to encode work", + "namespace", newWork.Namespace, "name", newWork.Name) return } - id := accessor.GetNamespace() + "/" + accessor.GetName() - if err := handler.OnDelete(ctx, payload.ManifestBundleEventDataType, id); err != nil { - utilruntime.HandleErrorWithContext(ctx, err, "failed to delete work", - "manifestWork", accessor.GetName(), "manifestWorkNamespace", accessor.GetNamespace()) + + if err := handler.HandleEvent(ctx, evt); err != nil { + utilruntime.HandleErrorWithContext(ctx, err, "failed to update work", + "namespace", newWork.Namespace, "name", newWork.Name) + } + } +} + +func (w *WorkService) handleOnDeleteFunc(ctx context.Context, handler server.EventHandler) func(obj interface{}) { + return func(obj interface{}) { + work, ok := obj.(*workv1.ManifestWork) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleErrorWithContext(ctx, fmt.Errorf("unknown type: %T", obj), "work delete") + return + } + + work, ok = tombstone.Obj.(*workv1.ManifestWork) + if !ok { + utilruntime.HandleErrorWithContext(ctx, fmt.Errorf("unknown type: %T", obj), "work delete") + return + } + } + + work = work.DeepCopy() + if work.DeletionTimestamp.IsZero() { + work.DeletionTimestamp = &metav1.Time{Time: time.Now()} + } + + eventTypes := types.CloudEventsType{ + CloudEventsDataType: payload.ManifestBundleEventDataType, + SubResource: types.SubResourceSpec, + Action: types.DeleteRequestAction, + } + evt, err := w.codec.Encode(services.CloudEventsSourceKube, eventTypes, work) + if err != nil { + utilruntime.HandleErrorWithContext(ctx, err, "failed to encode work", + "namespace", work.Namespace, "name", work.Name) + return + } + + if err := handler.HandleEvent(ctx, evt); err != nil { + utilruntime.HandleErrorWithContext(ctx, err, "failed to delete work", + "namespace", work.Namespace, "name", work.Name) } } } diff --git a/pkg/server/services/work/work_test.go b/pkg/server/services/work/work_test.go index e11d2a286..40d4da413 100644 --- a/pkg/server/services/work/work_test.go +++ b/pkg/server/services/work/work_test.go @@ -16,63 +16,12 @@ import ( workv1 "open-cluster-management.io/api/work/v1" "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/common" "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/payload" + "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/source/codec" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" testingcommon "open-cluster-management.io/ocm/pkg/common/testing" ) -func TestGet(t *testing.T) { - cases := []struct { - name string - works []runtime.Object - resourceID string - expectedError bool - }{ - { - name: "work not found", - works: []runtime.Object{}, - resourceID: "test-namespace/test-work", - expectedError: true, - }, - { - name: "get work", - resourceID: "test-namespace/test-work", - works: []runtime.Object{&workv1.ManifestWork{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-work", - Namespace: "test-namespace", - ResourceVersion: "1", - }, - }}, - }, - } - - for _, c := range cases { - t.Run(c.name, func(t *testing.T) { - workClient := workfake.NewSimpleClientset(c.works...) - workInformers := workinformers.NewSharedInformerFactory(workClient, 10*time.Minute) - workInformer := workInformers.Work().V1().ManifestWorks() - for _, obj := range c.works { - if err := workInformer.Informer().GetStore().Add(obj); err != nil { - t.Fatal(err) - } - } - - service := NewWorkService(workClient, workInformer) - _, err := service.Get(context.Background(), c.resourceID) - if c.expectedError { - if err == nil { - t.Errorf("expected error, got nil") - } - return - } - if err != nil { - t.Errorf("unexpected error: %v", err) - } - }) - } -} - func TestList(t *testing.T) { cases := []struct { name string @@ -121,7 +70,7 @@ func TestList(t *testing.T) { } service := NewWorkService(workClient, workInformer) - evts, err := service.List(types.ListOptions{ClusterName: c.clusterName}) + evts, err := service.List(context.Background(), types.ListOptions{ClusterName: c.clusterName}) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -433,10 +382,11 @@ func TestHandleOnCreateFunc(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { handler := &workHandler{} - createFunc := handleOnCreateFunc(context.Background(), handler) + service := &WorkService{codec: codec.NewManifestBundleCodec()} + createFunc := service.handleOnCreateFunc(context.Background(), handler) createFunc(c.obj) - if handler.onCreateCallCount != c.expectedCallCount { - t.Errorf("expected %d onCreate calls, got %d", c.expectedCallCount, handler.onCreateCallCount) + if handler.handleEventCallCount != c.expectedCallCount { + t.Errorf("expected %d HandleEvent calls, got %d", c.expectedCallCount, handler.handleEventCallCount) } }) } @@ -625,10 +575,11 @@ func TestHandleOnUpdateFunc(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { handler := &workHandler{} - updateFunc := handleOnUpdateFunc(context.Background(), handler) + service := &WorkService{codec: codec.NewManifestBundleCodec()} + updateFunc := service.handleOnUpdateFunc(context.Background(), handler) updateFunc(c.oldObj, c.newObj) - if handler.onUpdateCallCount != c.expectedCallCount { - t.Errorf("%s: expected %d OnUpdate calls, got %d", c.description, c.expectedCallCount, handler.onUpdateCallCount) + if handler.handleEventCallCount != c.expectedCallCount { + t.Errorf("%s: expected %d HandleEvent calls, got %d", c.description, c.expectedCallCount, handler.handleEventCallCount) } }) } @@ -659,56 +610,43 @@ func TestHandleOnDeleteFunc(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { handler := &workHandler{} - deleteFunc := handleOnDeleteFunc(context.Background(), handler) + service := &WorkService{codec: codec.NewManifestBundleCodec()} + deleteFunc := service.handleOnDeleteFunc(context.Background(), handler) deleteFunc(c.obj) - if handler.onDeleteCallCount != c.expectedCallCount { - t.Errorf("expected %d onDelete calls, got %d", c.expectedCallCount, handler.onDeleteCallCount) + if handler.handleEventCallCount != c.expectedCallCount { + t.Errorf("expected %d HandleEvent calls, got %d", c.expectedCallCount, handler.handleEventCallCount) } }) } } type workHandler struct { - onCreateCalled bool - onUpdateCalled bool - onDeleteCalled bool - onCreateCallCount int - onUpdateCallCount int - onDeleteCallCount int + onCreateCalled bool + onUpdateCalled bool + onDeleteCalled bool + handleEventCallCount int } -func (m *workHandler) OnCreate(ctx context.Context, t types.CloudEventsDataType, resourceID string) error { - if t != payload.ManifestBundleEventDataType { - return fmt.Errorf("expected %v, got %v", payload.ManifestBundleEventDataType, t) +func (m *workHandler) HandleEvent(ctx context.Context, evt *cloudevents.Event) error { + eventType, err := types.ParseCloudEventsType(evt.Type()) + if err != nil { + return err } - if resourceID != "test-namespace/test-work" { - return fmt.Errorf("expected %v, got %v", "test-namespace/test-work", resourceID) - } - m.onCreateCalled = true - m.onCreateCallCount++ - return nil -} -func (m *workHandler) OnUpdate(ctx context.Context, t types.CloudEventsDataType, resourceID string) error { - if t != payload.ManifestBundleEventDataType { - return fmt.Errorf("expected %v, got %v", payload.ManifestBundleEventDataType, t) + if eventType.CloudEventsDataType != payload.ManifestBundleEventDataType { + return fmt.Errorf("expected %v, got %v", payload.ManifestBundleEventDataType, eventType.CloudEventsDataType) } - if resourceID != "test-namespace/test-work" { - return fmt.Errorf("expected %v, got %v", "test-namespace/test-work", resourceID) - } - m.onUpdateCalled = true - m.onUpdateCallCount++ - return nil -} -func (m *workHandler) OnDelete(ctx context.Context, t types.CloudEventsDataType, resourceID string) error { - if t != payload.ManifestBundleEventDataType { - return fmt.Errorf("expected %v, got %v", payload.ManifestBundleEventDataType, t) + // Track which type of event was called based on the action type + switch eventType.Action { + case types.CreateRequestAction: + m.onCreateCalled = true + case types.UpdateRequestAction: + m.onUpdateCalled = true + case types.DeleteRequestAction: + m.onDeleteCalled = true } - if resourceID != "test-namespace/test-work" { - return fmt.Errorf("expected %v, got %v", "test-namespace/test-work", resourceID) - } - m.onDeleteCalled = true - m.onDeleteCallCount++ + + m.handleEventCallCount++ return nil } diff --git a/test/e2e-test.mk b/test/e2e-test.mk index 89a96ca98..00e69279c 100644 --- a/test/e2e-test.mk +++ b/test/e2e-test.mk @@ -6,7 +6,9 @@ include $(addprefix ./vendor/github.com/openshift/build-machinery-go/make/, \ KUBECTL?=kubectl KUBECONFIG?=./.kubeconfig HUB_KUBECONFIG?=./.hub-kubeconfig +GRPC_CONFIG?=./.grpc-config KLUSTERLET_DEPLOY_MODE?=Default +REGISTRATION_DRIVER?=csr MANAGED_CLUSTER_NAME?=cluster1 KLUSTERLET_NAME?=klusterlet @@ -21,6 +23,22 @@ hub-kubeconfig: deploy-hub: deploy-hub-helm hub-kubeconfig cluster-ip deploy-hub-helm: ensure-helm +ifeq ($(REGISTRATION_DRIVER),grpc) + $(HELM) install cluster-manager deploy/cluster-manager/chart/cluster-manager --namespace=open-cluster-management \ + --create-namespace \ + --set images.overrides.operatorImage=$(OPERATOR_IMAGE_NAME) \ + --set images.overrides.registrationImage=$(REGISTRATION_IMAGE) \ + --set images.overrides.workImage=$(WORK_IMAGE) \ + --set images.overrides.placementImage=$(PLACEMENT_IMAGE) \ + --set images.overrides.addOnManagerImage=$(ADDON_MANAGER_IMAGE) \ + --set replicaCount=1 \ + --set createBootstrapSA=true \ + --set clusterManager.registrationConfiguration.registrationDrivers[0].authType=csr \ + --set clusterManager.registrationConfiguration.registrationDrivers[1].authType=grpc \ + --set clusterManager.serverConfiguration.endpointsExposure[0].protocol=grpc \ + --set clusterManager.serverConfiguration.endpointsExposure[0].grpc.type=hostname \ + --set clusterManager.serverConfiguration.endpointsExposure[0].grpc.hostname.host=cluster-manager-grpc-server.open-cluster-management-hub.svc +else $(HELM) install cluster-manager deploy/cluster-manager/chart/cluster-manager --namespace=open-cluster-management \ --create-namespace \ --set images.overrides.operatorImage=$(OPERATOR_IMAGE_NAME) \ @@ -29,6 +47,7 @@ deploy-hub-helm: ensure-helm --set images.overrides.placementImage=$(PLACEMENT_IMAGE) \ --set images.overrides.addOnManagerImage=$(ADDON_MANAGER_IMAGE) \ --set replicaCount=1 +endif deploy-hub-operator: ensure-kustomize cp deploy/cluster-manager/config/kustomization.yaml deploy/cluster-manager/config/kustomization.yaml.tmp @@ -47,7 +66,13 @@ test-e2e: deploy-hub deploy-spoke-operator-helm run-e2e run-e2e: go test -c ./test/e2e - ./e2e.test -test.v -ginkgo.v -nil-executor-validating=true -registration-image=$(REGISTRATION_IMAGE) -work-image=$(WORK_IMAGE) -singleton-image=$(OPERATOR_IMAGE_NAME) -klusterlet-deploy-mode=$(KLUSTERLET_DEPLOY_MODE) -expected-image-tag=$(IMAGE_TAG) + ./e2e.test -test.v -ginkgo.v -nil-executor-validating=true \ + -registration-image=$(REGISTRATION_IMAGE) \ + -work-image=$(WORK_IMAGE) \ + -singleton-image=$(OPERATOR_IMAGE_NAME) \ + -expected-image-tag=$(IMAGE_TAG) \ + -klusterlet-deploy-mode=$(KLUSTERLET_DEPLOY_MODE) \ + -registration-driver=$(REGISTRATION_DRIVER) clean-hub: clean-hub-cr clean-hub-operator @@ -66,7 +91,41 @@ bootstrap-secret: $(KUBECTL) get ns open-cluster-management-agent; if [ $$? -ne 0 ] ; then $(KUBECTL) create ns open-cluster-management-agent; fi $(KUSTOMIZE) build deploy/klusterlet/config/samples/bootstrap | $(KUBECTL) apply -f - +grpc-config: + @set -e; \ + retry=0; \ + while ! $(KUBECTL) get deploy cluster-manager-grpc-server -n open-cluster-management-hub >/dev/null 2>&1; do \ + if [ $$retry -ge 150 ]; then \ + exit 1; \ + fi; \ + sleep 2; \ + retry=$$(($$retry + 1)); \ + done; \ + $(KUBECTL) wait --for=condition=available --timeout=300s \ + deployment/cluster-manager-grpc-server \ + -n open-cluster-management-hub; \ + CA_DATA=$$($(KUBECTL) get configmap ca-bundle-configmap \ + -n open-cluster-management-hub \ + -o jsonpath='{.data.ca-bundle\.crt}' | base64 | tr -d '\n'); \ + TOKEN=$$($(KUBECTL) create token agent-registration-bootstrap \ + -n open-cluster-management \ + --duration=24h); \ + echo "caData: $$CA_DATA" > $(GRPC_CONFIG); \ + echo "token: $$TOKEN" >> $(GRPC_CONFIG); \ + echo "url: cluster-manager-grpc-server.open-cluster-management-hub.svc:8090" >> $(GRPC_CONFIG); \ + deploy-spoke-operator-helm: ensure-helm +ifeq ($(REGISTRATION_DRIVER),grpc) +deploy-spoke-operator-helm: grpc-config + $(HELM) install klusterlet deploy/klusterlet/chart/klusterlet --namespace=open-cluster-management \ + --create-namespace \ + --set-file bootstrapHubKubeConfig=$(HUB_KUBECONFIG) \ + --set-file grpcConfig=$(GRPC_CONFIG) \ + --set klusterlet.create=false \ + --set images.overrides.operatorImage=$(OPERATOR_IMAGE_NAME) \ + --set images.overrides.registrationImage=$(REGISTRATION_IMAGE) \ + --set images.overrides.workImage=$(WORK_IMAGE) +else $(HELM) install klusterlet deploy/klusterlet/chart/klusterlet --namespace=open-cluster-management \ --create-namespace \ --set-file bootstrapHubKubeConfig=$(HUB_KUBECONFIG) \ @@ -74,6 +133,7 @@ deploy-spoke-operator-helm: ensure-helm --set images.overrides.operatorImage=$(OPERATOR_IMAGE_NAME) \ --set images.overrides.registrationImage=$(REGISTRATION_IMAGE) \ --set images.overrides.workImage=$(WORK_IMAGE) +endif deploy-spoke-operator: ensure-kustomize cp deploy/klusterlet/config/kustomization.yaml deploy/klusterlet/config/kustomization.yaml.tmp diff --git a/test/e2e/addon_lease_test.go b/test/e2e/addon_lease_test.go index 33e526d02..2c41ac147 100644 --- a/test/e2e/addon_lease_test.go +++ b/test/e2e/addon_lease_test.go @@ -231,7 +231,7 @@ var _ = ginkgo.Describe("Addon Health Check", ginkgo.Label("addon-lease"), func( agentNamespace := fmt.Sprintf("open-cluster-management-agent-%s", rand.String(6)) framework.CreateAndApproveKlusterlet( hub, spoke, - klusterletName, clusterName, agentNamespace, operatorapiv1.InstallMode(klusterletDeployMode), bootstrapHubKubeConfigSecret, images) + klusterletName, clusterName, agentNamespace, operatorapiv1.InstallMode(klusterletDeployMode), bootstrapHubKubeConfigSecret, images, registrationDriver) // create an addon on created managed cluster addOnName = fmt.Sprintf("addon-%s", rand.String(6)) ginkgo.By(fmt.Sprintf("Creating managed cluster addon %q", addOnName)) diff --git a/test/e2e/e2e_suite_test.go b/test/e2e/e2e_suite_test.go index dc49573f9..27aacb267 100644 --- a/test/e2e/e2e_suite_test.go +++ b/test/e2e/e2e_suite_test.go @@ -45,6 +45,9 @@ var ( // hub hash hubHash string + // registration driver + registrationDriver string + // bootstrap-hub-kubeconfig // It's a secret named 'bootstrap-hub-kubeconfig' under the namespace 'open-cluster-management-agent', // the content of the secret is a kubeconfig file. @@ -66,6 +69,7 @@ func init() { flag.StringVar(&workImage, "work-image", "", "The image of the work") flag.StringVar(&singletonImage, "singleton-image", "", "The image of the klusterlet agent") flag.StringVar(&expectedImageTag, "expected-image-tag", "", "The expected image tag for all OCM components (e.g., 'e2e')") + flag.StringVar(®istrationDriver, "registration-driver", "csr", "The registration driver (csr or grpc)") } var hub *framework.Hub @@ -181,7 +185,7 @@ var _ = BeforeSuite(func() { framework.CreateAndApproveKlusterlet( hub, spoke, universalKlusterletName, universalClusterName, universalAgentNamespace, operatorapiv1.InstallMode(klusterletDeployMode), - bootstrapHubKubeConfigSecret, images, + bootstrapHubKubeConfigSecret, images, registrationDriver, ) By("Create a universal ClusterSet and bind it with the universal managedcluster") diff --git a/test/e2e/klusterlet_hosted_test.go b/test/e2e/klusterlet_hosted_test.go index cb29faf99..d8b11c04d 100644 --- a/test/e2e/klusterlet_hosted_test.go +++ b/test/e2e/klusterlet_hosted_test.go @@ -71,7 +71,7 @@ var _ = Describe("Delete hosted klusterlet CR", Label("klusterlet-hosted"), func It("Delete klusterlet CR in Hosted mode when the managed cluster was destroyed", func() { By(fmt.Sprintf("create klusterlet %v with managed cluster name %v", klusterletName, clusterName)) klusterlet, err := spoke.CreateKlusterlet(klusterletName, clusterName, klusterletNamespace, - operatorapiv1.InstallModeHosted, bootstrapHubKubeConfigSecret, images) + operatorapiv1.InstallModeHosted, bootstrapHubKubeConfigSecret, images, registrationDriver) Expect(err).ToNot(HaveOccurred()) By(fmt.Sprintf("waiting for the managed cluster %v to be created", clusterName)) diff --git a/test/e2e/klusterlet_test.go b/test/e2e/klusterlet_test.go index 65de71167..4da9954d7 100644 --- a/test/e2e/klusterlet_test.go +++ b/test/e2e/klusterlet_test.go @@ -10,6 +10,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/klog/v2" operatorapiv1 "open-cluster-management.io/api/operator/v1" @@ -18,6 +19,13 @@ import ( ) var _ = Describe("Create klusterlet CR", Label("klusterlet"), func() { + // Skip entire suite if using grpc driver + if registrationDriver == "grpc" { + // TODO enable this test after https://github.com/open-cluster-management-io/ocm/issues/1246 is resolved + klog.Infof("Skip the klusterlet test when registrationDriver is grpc") + return + } + var klusterletName string var clusterName string var klusterletNamespace string @@ -38,7 +46,7 @@ var _ = Describe("Create klusterlet CR", Label("klusterlet"), func() { By(fmt.Sprintf("create klusterlet %v with managed cluster name %v", klusterletName, clusterName)) // Set install mode empty _, err := spoke.CreateKlusterlet(klusterletName, clusterName, klusterletNamespace, - "", bootstrapHubKubeConfigSecret, images) + "", bootstrapHubKubeConfigSecret, images, registrationDriver) Expect(err).ToNot(HaveOccurred()) By(fmt.Sprintf("waiting for the managed cluster %v to be created", clusterName)) @@ -80,7 +88,7 @@ var _ = Describe("Create klusterlet CR", Label("klusterlet"), func() { It("Create klusterlet CR with managed cluster name", func() { By(fmt.Sprintf("create klusterlet %v with managed cluster name %v", klusterletName, clusterName)) _, err := spoke.CreateKlusterlet(klusterletName, clusterName, klusterletNamespace, - operatorapiv1.InstallMode(klusterletDeployMode), bootstrapHubKubeConfigSecret, images) + operatorapiv1.InstallMode(klusterletDeployMode), bootstrapHubKubeConfigSecret, images, registrationDriver) Expect(err).ToNot(HaveOccurred()) By(fmt.Sprintf("waiting for the managed cluster %v to be created", clusterName)) @@ -125,7 +133,7 @@ var _ = Describe("Create klusterlet CR", Label("klusterlet"), func() { var err error By(fmt.Sprintf("create klusterlet %v without managed cluster name", klusterletName)) _, err = spoke.CreateKlusterlet(klusterletName, clusterName, klusterletNamespace, - operatorapiv1.InstallMode(klusterletDeployMode), bootstrapHubKubeConfigSecret, images) + operatorapiv1.InstallMode(klusterletDeployMode), bootstrapHubKubeConfigSecret, images, registrationDriver) Expect(err).ToNot(HaveOccurred()) By("waiting for the managed cluster to be created") @@ -178,7 +186,7 @@ var _ = Describe("Create klusterlet CR", Label("klusterlet"), func() { It("Update klusterlet CR namespace", func() { By(fmt.Sprintf("create klusterlet %v with managed cluster name %v", klusterletName, clusterName)) _, err := spoke.CreateKlusterlet(klusterletName, clusterName, klusterletNamespace, - operatorapiv1.InstallMode(klusterletDeployMode), bootstrapHubKubeConfigSecret, images) + operatorapiv1.InstallMode(klusterletDeployMode), bootstrapHubKubeConfigSecret, images, registrationDriver) Expect(err).ToNot(HaveOccurred()) By(fmt.Sprintf("waiting for the managed cluster %v to be created", clusterName)) diff --git a/test/framework/klusterlet.go b/test/framework/klusterlet.go index 70142c73b..6b303ab0e 100644 --- a/test/framework/klusterlet.go +++ b/test/framework/klusterlet.go @@ -24,6 +24,7 @@ func CreateAndApproveKlusterlet( mode operatorapiv1.InstallMode, bootstrapHubKubeConfigSecret *corev1.Secret, images Images, + registrationDriver string, ) { // on the spoke side _, err := spoke.CreateKlusterlet( @@ -33,6 +34,7 @@ func CreateAndApproveKlusterlet( mode, bootstrapHubKubeConfigSecret, images, + registrationDriver, ) Expect(err).ToNot(HaveOccurred()) @@ -59,7 +61,8 @@ func (spoke *Spoke) CreateKlusterlet( name, clusterName, klusterletNamespace string, mode operatorapiv1.InstallMode, bootstrapHubKubeConfigSecret *corev1.Secret, - images Images) (*operatorapiv1.Klusterlet, error) { + images Images, + registrationDriver string) (*operatorapiv1.Klusterlet, error) { if name == "" { return nil, fmt.Errorf("the name should not be null") } @@ -91,8 +94,18 @@ func (spoke *Spoke) CreateKlusterlet( }, } + // Add registration configuration for gRPC driver + if registrationDriver == "grpc" { + klusterlet.Spec.RegistrationConfiguration = &operatorapiv1.RegistrationConfiguration{ + RegistrationDriver: operatorapiv1.RegistrationDriver{ + AuthType: "grpc", + }, + } + } + agentNamespace := helpers.AgentNamespace(klusterlet) - klog.Infof("klusterlet: %s/%s, \t mode: %v, \t agent namespace: %s", klusterlet.Name, klusterlet.Namespace, mode, agentNamespace) + klog.Infof("klusterlet: %s/%s, \t mode: %v, \t agent namespace: %s, \t registration driver: %s", + klusterlet.Name, klusterlet.Namespace, mode, agentNamespace, registrationDriver) // create agentNamespace namespace := &corev1.Namespace{ diff --git a/vendor/modules.txt b/vendor/modules.txt index 05812f19b..065ff016b 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1961,7 +1961,7 @@ open-cluster-management.io/api/operator/v1 open-cluster-management.io/api/utils/work/v1/workapplier open-cluster-management.io/api/work/v1 open-cluster-management.io/api/work/v1alpha1 -# open-cluster-management.io/sdk-go v1.1.1-0.20260120013142-6d087c9a2a3d +# open-cluster-management.io/sdk-go v1.1.1-0.20260127092137-c07e0fafa331 ## explicit; go 1.25.0 open-cluster-management.io/sdk-go/pkg/apis/cluster/v1alpha1 open-cluster-management.io/sdk-go/pkg/apis/cluster/v1beta1 diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/certrotation/signer.go b/vendor/open-cluster-management.io/sdk-go/pkg/certrotation/signer.go index 25952fa8d..93a8937c0 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/certrotation/signer.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/certrotation/signer.go @@ -4,9 +4,10 @@ import ( "bytes" "context" "fmt" - "open-cluster-management.io/sdk-go/pkg/helpers" "time" + "open-cluster-management.io/sdk-go/pkg/helpers" + "github.com/openshift/library-go/pkg/crypto" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -25,6 +26,9 @@ type SigningRotation struct { Validity time.Duration Lister corev1listers.SecretLister Client corev1client.SecretsGetter + // OwnerReference is an optional owner reference to set on the secret for garbage collection. + // When set, the secret will be automatically deleted when the owner resource is deleted. + OwnerReference *metav1.OwnerReference } func (c SigningRotation) EnsureSigningCertKeyPair() (*crypto.CA, error) { @@ -39,6 +43,11 @@ func (c SigningRotation) EnsureSigningCertKeyPair() (*crypto.CA, error) { } signingCertKeyPairSecret.Type = corev1.SecretTypeTLS + // Set owner reference if configured (ApplySecret handles add-only logic) + if c.OwnerReference != nil { + signingCertKeyPairSecret.OwnerReferences = []metav1.OwnerReference{*c.OwnerReference} + } + if reason := needNewSigningCertKeyPair(signingCertKeyPairSecret); len(reason) > 0 { if err := setSigningCertKeyPairSecret(signingCertKeyPairSecret, c.SignerNamePrefix, c.Validity); err != nil { return nil, err diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/certrotation/target.go b/vendor/open-cluster-management.io/sdk-go/pkg/certrotation/target.go index d6adefcc9..358f83c69 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/certrotation/target.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/certrotation/target.go @@ -34,6 +34,9 @@ type TargetRotation struct { HostNames []string Lister corev1listers.SecretLister Client corev1client.SecretsGetter + // OwnerReference is an optional owner reference to set on the secret for garbage collection. + // When set, the secret will be automatically deleted when the owner resource is deleted. + OwnerReference *metav1.OwnerReference } func (c TargetRotation) EnsureTargetCertKeyPair(signingCertKeyPair *crypto.CA, caBundleCerts []*x509.Certificate, @@ -42,6 +45,7 @@ func (c TargetRotation) EnsureTargetCertKeyPair(signingCertKeyPair *crypto.CA, c if err != nil && !apierrors.IsNotFound(err) { return err } + targetCertKeyPairSecret := originalTargetCertKeyPairSecret.DeepCopy() if apierrors.IsNotFound(err) { // create an empty one @@ -49,6 +53,11 @@ func (c TargetRotation) EnsureTargetCertKeyPair(signingCertKeyPair *crypto.CA, c } targetCertKeyPairSecret.Type = corev1.SecretTypeTLS + // Set owner reference if configured (ApplySecret handles add-only logic) + if c.OwnerReference != nil { + targetCertKeyPairSecret.OwnerReferences = []metav1.OwnerReference{*c.OwnerReference} + } + reason := needNewTargetCertKeyPair(targetCertKeyPairSecret, caBundleCerts, c.HostNames) if len(reason) == 0 { return nil @@ -59,6 +68,7 @@ func (c TargetRotation) EnsureTargetCertKeyPair(signingCertKeyPair *crypto.CA, c return err } + // Apply the secret (handles both create and update) if targetCertKeyPairSecret, _, err = helpers.ApplySecret(context.TODO(), c.Client, targetCertKeyPairSecret); err != nil { return err } diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/addon/v1alpha1/client.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/addon/v1alpha1/client.go index 8f9dd0d29..5a142bbb5 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/addon/v1alpha1/client.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/addon/v1alpha1/client.go @@ -145,6 +145,12 @@ func (c *ManagedClusterAddOnClient) Patch( // and reject the update if it's status update is outdated. eventType.Action = types.UpdateRequestAction if err := c.cloudEventsClient.Publish(ctx, eventType, newAddon); err != nil { + if errors.IsNotFound(err) { + // addon is not found from server, delete it from local cache + if err := c.watcherStore.Delete(last); err != nil { + return nil, errors.NewInternalError(err) + } + } return nil, cloudeventserrors.ToStatusError(common.ManagedClusterAddOnGR, name, err) } diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/addon/v1beta1/client.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/addon/v1beta1/client.go index c87d0bc31..32921e14c 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/addon/v1beta1/client.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/addon/v1beta1/client.go @@ -143,6 +143,12 @@ func (c *ManagedClusterAddOnClient) Patch( // and reject the update if it's status update is outdated. eventType.Action = types.UpdateRequestAction if err := c.cloudEventsClient.Publish(ctx, eventType, newAddon); err != nil { + if errors.IsNotFound(err) { + // addon is not found from server, delete it from local cache + if err := c.watcherStore.Delete(last); err != nil { + return nil, errors.NewInternalError(err) + } + } return nil, cloudeventserrors.ToStatusError(common.ManagedClusterAddOnGR, name, err) } diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/lease/client.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/lease/client.go index 8f4ea7c78..f171b8661 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/lease/client.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/lease/client.go @@ -37,6 +37,12 @@ func (l LeaseClient) Update(ctx context.Context, lease *coordinationv1.Lease, op } if err := l.cloudEventsClient.Publish(ctx, eventType, lease); err != nil { + if errors.IsNotFound(err) { + // lease is not found from server, delete it from local cache + if err := l.watcherStore.Delete(lease); err != nil { + return nil, errors.NewInternalError(err) + } + } return nil, cloudeventserrors.ToStatusError(coordinationv1.Resource("leases"), lease.Name, err) } diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/clients/baseclient.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/clients/baseclient.go index ede195583..f422631f6 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/clients/baseclient.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/clients/baseclient.go @@ -210,10 +210,23 @@ func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) { case <-ctx.Done(): return case <-c.subscribeChan: - if err := c.transport.Subscribe(ctx); err != nil { - // Failed to send subscribe request, it should be connection failed, will retry on next reconnection - runtime.HandleErrorWithContext(ctx, err, "failed to subscribe after connection") - continue + // Retry subscribe with backoff until success or context cancellation + for { + if err := c.transport.Subscribe(ctx); err != nil { + runtime.HandleErrorWithContext(ctx, err, "failed to subscribe after connection") + + // Wait with backoff before retrying + select { + case <-ctx.Done(): + return + case <-wait.RealTimer(DelayFn()).C(): + // Continue to retry + } + continue + } + + // Subscribe succeeded, break out of retry loop + break } // Send startReceiverSignal to start/restart the receiver after successful subscription. diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/v2/grpc/transport.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/v2/grpc/transport.go index 248604c6e..40b3dccd9 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/v2/grpc/transport.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/v2/grpc/transport.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "time" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/cloudevents/sdk-go/v2/binding" @@ -120,7 +121,28 @@ func (t *grpcTransport) Subscribe(ctx context.Context) error { values := header.Get(constants.GRPCSubscriptionIDKey) if len(values) != 1 { - return fmt.Errorf("expected exactly one subscription-id header, got %d", len(values)) + // Header() succeeded but no subscription-id was sent (header is nil or empty). + // This typically means the server rejected the subscription before sending headers + // (e.g., authorization failure). The actual error is only available via Recv(). + // Call Recv() to get the real error from the server. + recvErrCh := make(chan error, 1) + go func() { + _, err := subClient.Recv() + recvErrCh <- err + }() + select { + case recvErr := <-recvErrCh: + if recvErr != nil { + return recvErr + } + case <-ctx.Done(): + return ctx.Err() + case <-time.After(5 * time.Second): + _ = subClient.CloseSend() + return fmt.Errorf("no subscription-id in header (%v): recv timeout", header) + } + // If Recv() didn't return an error, this is a server-side configuration issue + return fmt.Errorf("no subscription-id in header (%v)", header) } t.subID = values[0] t.subClient = subClient diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/authz/kube/sar.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/authz/kube/sar.go index 777bc7f4d..0587e6a34 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/authz/kube/sar.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/authz/kube/sar.go @@ -5,8 +5,10 @@ import ( "fmt" "sync" + "github.com/cloudevents/sdk-go/v2/binding" "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/addon/v1alpha1" "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/serviceaccount" + grpcprotocol "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protocol" "google.golang.org/grpc" "k8s.io/klog/v2" @@ -83,7 +85,21 @@ func (s *SARAuthorizer) AuthorizeRequest(ctx context.Context, req any) (authz.De return authz.DecisionDeny, fmt.Errorf("missing ce-clustername in event attributes, %v", pReq.Event.Attributes) } - decision, err := s.authorize(ctx, clusterAttr.GetCeString(), *eventsType) + if pReq.Event == nil { + return authz.DecisionDeny, fmt.Errorf("missing event in request") + } + + var partial metav1.PartialObjectMetadata + + evt, err := binding.ToEvent(ctx, grpcprotocol.NewMessage(pReq.Event)) + if err != nil { + return authz.DecisionDeny, fmt.Errorf("failed to convert protobuf to cloudevent: %v", err) + } + if err := evt.DataAs(&partial); err != nil { + return authz.DecisionDeny, err + } + + decision, err := s.authorize(ctx, clusterAttr.GetCeString(), *eventsType, partial.ObjectMeta) return decision, err } @@ -113,7 +129,9 @@ func (s *SARAuthorizer) AuthorizeStream(ctx context.Context, ss grpc.ServerStrea Action: types.WatchRequestAction, } - decision, err := s.authorize(ss.Context(), req.ClusterName, eventsType) + // for now, we subscribe to all resources of a specified type + // TODO enhance the SubscriptionRequest to support specifying a resource name + decision, err := s.authorize(ss.Context(), req.ClusterName, eventsType, metav1.ObjectMeta{}) if err != nil { return decision, nil, err } @@ -121,13 +139,13 @@ func (s *SARAuthorizer) AuthorizeStream(ctx context.Context, ss grpc.ServerStrea return decision, &wrappedAuthorizedStream{ServerStream: ss, authorizedReq: &req}, nil } -func (s *SARAuthorizer) authorize(ctx context.Context, cluster string, eventsType types.CloudEventsType) (authz.Decision, error) { +func (s *SARAuthorizer) authorize(ctx context.Context, cluster string, eventsType types.CloudEventsType, metaObj metav1.ObjectMeta) (authz.Decision, error) { user, groups, err := userInfo(ctx) if err != nil { return authz.DecisionDeny, err } - sar, err := toSubjectAccessReview(cluster, user, groups, eventsType) + sar, err := toSubjectAccessReview(cluster, user, groups, eventsType, metaObj) if err != nil { return authz.DecisionDeny, err } @@ -170,7 +188,7 @@ func userInfo(ctx context.Context) (user string, groups []string, err error) { return user, groups, nil } -func toSubjectAccessReview(clusterName string, user string, groups []string, eventsType types.CloudEventsType) (*authv1.SubjectAccessReview, error) { +func toSubjectAccessReview(clusterName string, user string, groups []string, eventsType types.CloudEventsType, metaObj metav1.ObjectMeta) (*authv1.SubjectAccessReview, error) { verb, err := toVerb(eventsType.Action) if err != nil { return nil, err @@ -208,9 +226,19 @@ func toSubjectAccessReview(clusterName string, user string, groups []string, eve sar.Spec.ResourceAttributes.Group = "" sar.Spec.ResourceAttributes.Resource = "serviceaccounts" sar.Spec.ResourceAttributes.Subresource = "token" - // the verb "create" is required for both token request pub and sub. - sar.Spec.ResourceAttributes.Verb = "create" - return sar, nil + + switch verb { + case "create": + sar.Spec.ResourceAttributes.Name = metaObj.Name + return sar, nil + case "watch": + // for sub request, we use verb subscribe + // TODO enhance the SubscriptionRequest to support specifying a resource name + sar.Spec.ResourceAttributes.Verb = "subscribe" + return sar, nil + } + + return nil, fmt.Errorf("unsupported action %s", verb) case payload.ManifestBundleEventDataType: sar.Spec.ResourceAttributes.Group = workv1.SchemeGroupVersion.Group sar.Spec.ResourceAttributes.Resource = "manifestworks" diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/broker.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/broker.go index 7ba35b4ae..61623ce77 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/broker.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/broker.go @@ -11,8 +11,6 @@ import ( "open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/heartbeat" "open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/metrics" - "k8s.io/apimachinery/pkg/api/errors" - cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/cloudevents/sdk-go/v2/binding" cloudeventstypes "github.com/cloudevents/sdk-go/v2/types" @@ -124,34 +122,28 @@ func (bkr *GRPCBroker) Publish(ctx context.Context, pubReq *pbv1.PublishRequest) return &emptypb.Empty{}, nil } -// register registers a subscriber and return client id and error channel. -func (bkr *GRPCBroker) register(ctx context.Context, +// registerSubscriber registers a subscriber with a pre-generated ID. +// The subscription header must already be sent before calling this function. +func (bkr *GRPCBroker) registerSubscriber(ctx context.Context, + id string, dataType types.CloudEventsDataType, subReq *pbv1.SubscriptionRequest, - subServer pbv1.CloudEventService_SubscribeServer, - handler resourceHandler) (string, error) { + handler resourceHandler) error { logger := klog.FromContext(ctx) bkr.mu.Lock() defer bkr.mu.Unlock() - id := uuid.NewString() + logger.Info("registering subscriber", "id", id, "clusterName", subReq.ClusterName, "dataType", dataType) + bkr.subscribers[id] = &subscriber{ clusterName: subReq.ClusterName, dataType: dataType, handler: handler, } - // Signal subscriber is registered - if err := subServer.SendHeader(metadata.Pairs(constants.GRPCSubscriptionIDKey, id)); err != nil { - logger.Error(err, "failed to send subscription header, unregister subscriber", "subID", id) - delete(bkr.subscribers, id) - return "", err - } - logger.V(4).Info("register a subscriber", "id", id, "clusterName", subReq.ClusterName, "dataType", dataType) metrics.IncGRPCCESubscribersMetric(subReq.ClusterName, dataType.String()) - - return id, nil + return nil } // unregister a subscriber by id @@ -181,10 +173,17 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv return fmt.Errorf("invalid subscription request: invalid data type %v", err) } + // Generate subscription ID and send header IMMEDIATELY, before any other operations + // This ensures the client receives the header as soon as possible after the stream is established + subID := uuid.NewString() + if err := subServer.SendHeader(metadata.Pairs(constants.GRPCSubscriptionIDKey, subID)); err != nil { + return fmt.Errorf("failed to send subscription header for subID %s: %w", subID, err) + } + subCtx, cancel := context.WithCancel(subServer.Context()) defer cancel() - logger := klog.FromContext(subCtx).WithValues("clusterName", subReq.ClusterName) + logger := klog.FromContext(subCtx).WithValues("clusterName", subReq.ClusterName, "subID", subID) // TODO make the channel size configurable eventCh := make(chan *pbv1.CloudEvent, 100) @@ -195,6 +194,35 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv } sendErrCh := make(chan error, 1) + // Register the subscriber with the ID we already created and sent in the header + err = bkr.registerSubscriber(klog.NewContext(subCtx, logger), subID, *dataType, subReq, func(handlerCtx context.Context, subID string, evt *cloudevents.Event) error { + // convert the cloudevents.Event to pbv1.CloudEvent + // WARNING: don't use "pbEvt, err := pb.ToProto(evt)" to convert cloudevent to protobuf + pbEvt := &pbv1.CloudEvent{} + if err := grpcprotocol.WritePBMessage(handlerCtx, binding.ToMessage(evt), pbEvt); err != nil { + return fmt.Errorf("failed to convert cloudevent to protobuf for resource(%s): %v", evt.ID(), err) + } + + // send the cloudevent to the subscriber + klog.FromContext(handlerCtx).V(4).Info("sending the event to spec subscribers", + "subID", subID, "eventType", evt.Type(), "extensions", evt.Extensions()) + select { + case eventCh <- pbEvt: + case <-subCtx.Done(): + // The context of the stream has been canceled or completed. + // This could happen if: + // - The client closed the connection or canceled the stream. + // - The server closed the stream, potentially due to a shutdown. + // No error is returned here because the stream closure is expected. + return nil + } + + return nil + }) + if err != nil { + return err + } + // send events // The grpc send is not concurrency safe and non-blocking, see: https://github.com/grpc/grpc-go/blob/v1.75.1/stream.go#L1571 // Return the error without wrapping, as it includes the gRPC error code and message for further handling. @@ -238,34 +266,6 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv } }() - subID, err := bkr.register(klog.NewContext(subCtx, logger), *dataType, subReq, subServer, func(handlerCtx context.Context, subID string, evt *cloudevents.Event) error { - // convert the cloudevents.Event to pbv1.CloudEvent - // WARNING: don't use "pbEvt, err := pb.ToProto(evt)" to convert cloudevent to protobuf - pbEvt := &pbv1.CloudEvent{} - if err := grpcprotocol.WritePBMessage(handlerCtx, binding.ToMessage(evt), pbEvt); err != nil { - return fmt.Errorf("failed to convert cloudevent to protobuf for resource(%s): %v", evt.ID(), err) - } - - // send the cloudevent to the subscriber - logger.V(4).Info("sending the event to spec subscribers", - "subID", subID, "eventType", evt.Type(), "extensions", evt.Extensions()) - select { - case eventCh <- pbEvt: - case <-subCtx.Done(): - // The context of the stream has been canceled or completed. - // This could happen if: - // - The client closed the connection or canceled the stream. - // - The server closed the stream, potentially due to a shutdown. - // No error is returned here because the stream closure is expected. - return nil - } - - return nil - }) - if err != nil { - return err - } - if heartbeater != nil { go heartbeater.Start(subCtx) } @@ -315,30 +315,36 @@ func (bkr *GRPCBroker) respondResyncSpecRequest(ctx context.Context, eventDataTy return fmt.Errorf("failed to find service for event type %s", eventDataType) } - objs, err := service.List(types.ListOptions{ClusterName: clusterName, CloudEventsDataType: eventDataType}) + evts, err := service.List(ctx, types.ListOptions{ClusterName: clusterName, CloudEventsDataType: eventDataType}) if err != nil { return err } - if len(objs) == 0 { + if len(evts) == 0 { log.V(4).Info("no objs from the lister, do nothing") return nil } - for _, obj := range objs { + for _, evt := range evts { // respond with the deleting resource regardless of the resource version - objLogger := log.WithValues("eventType", obj.Type(), "extensions", obj.Extensions()) - if _, ok := obj.Extensions()[types.ExtensionDeletionTimestamp]; ok { + objLogger := log.WithValues("eventType", evt.Type(), "extensions", evt.Extensions()) + if _, ok := evt.Extensions()[types.ExtensionDeletionTimestamp]; ok { objLogger.V(4).Info("respond spec resync request") - err = bkr.handleRes(ctx, obj, eventDataType, "delete_request") + deleteEventTypes := types.CloudEventsType{ + CloudEventsDataType: eventDataType, + SubResource: types.SubResourceSpec, + Action: types.DeleteRequestAction, + } + evt.SetType(deleteEventTypes.String()) + err = bkr.HandleEvent(ctx, evt) if err != nil { objLogger.Error(err, "failed to handle resync spec request") } continue } - lastResourceVersion := findResourceVersion(obj.ID(), resourceVersions.Versions) - currentResourceVersion, err := cloudeventstypes.ToInteger(obj.Extensions()[types.ExtensionResourceVersion]) + lastResourceVersion := findResourceVersion(evt.ID(), resourceVersions.Versions) + currentResourceVersion, err := cloudeventstypes.ToInteger(evt.Extensions()[types.ExtensionResourceVersion]) if err != nil { objLogger.V(4).Info("ignore the event since it has a invalid resourceVersion", "error", err) continue @@ -348,7 +354,13 @@ func (bkr *GRPCBroker) respondResyncSpecRequest(ctx context.Context, eventDataTy // the newer work to agent if currentResourceVersion == 0 || int64(currentResourceVersion) > lastResourceVersion { objLogger.V(4).Info("respond spec resync request") - err := bkr.handleRes(ctx, obj, eventDataType, "update_request") + updateEventTypes := types.CloudEventsType{ + CloudEventsDataType: eventDataType, + SubResource: types.SubResourceSpec, + Action: types.UpdateRequestAction, + } + evt.SetType(updateEventTypes.String()) + err := bkr.HandleEvent(ctx, evt) if err != nil { objLogger.Error(err, "failed to handle resync spec request") } @@ -357,7 +369,7 @@ func (bkr *GRPCBroker) respondResyncSpecRequest(ctx context.Context, eventDataTy // the resources do not exist on the source, but exist on the agent, delete them for _, rv := range resourceVersions.Versions { - _, exists := getObj(rv.ResourceID, objs) + _, exists := getObj(rv.ResourceID, evts) if exists { continue } @@ -365,6 +377,7 @@ func (bkr *GRPCBroker) respondResyncSpecRequest(ctx context.Context, eventDataTy deleteEventTypes := types.CloudEventsType{ CloudEventsDataType: eventDataType, SubResource: types.SubResourceSpec, + Action: types.DeleteRequestAction, } obj := types.NewEventBuilder("source", deleteEventTypes). WithResourceID(rv.ResourceID). @@ -375,7 +388,7 @@ func (bkr *GRPCBroker) respondResyncSpecRequest(ctx context.Context, eventDataTy // send a delete event for the current resource log.V(4).Info("respond spec resync request") - err := bkr.handleRes(ctx, &obj, eventDataType, "delete_request") + err := bkr.HandleEvent(ctx, &obj) if err != nil { log.Error(err, "failed to handle delete request") } @@ -384,22 +397,20 @@ func (bkr *GRPCBroker) respondResyncSpecRequest(ctx context.Context, eventDataTy return nil } -// handleRes publish the resource to the correct subscriber. -func (bkr *GRPCBroker) handleRes( - ctx context.Context, - evt *cloudevents.Event, - t types.CloudEventsDataType, - action types.EventAction) error { +// HandleEvent publish the event to the correct subscriber. +func (bkr *GRPCBroker) HandleEvent(ctx context.Context, evt *cloudevents.Event) error { + if evt == nil { + return fmt.Errorf("event is nil") + } bkr.mu.RLock() defer bkr.mu.RUnlock() - eventType := types.CloudEventsType{ - CloudEventsDataType: t, - SubResource: types.SubResourceSpec, - Action: action, + eventType, err := types.ParseCloudEventsType(evt.Type()) + if err != nil { + return err } - evt.SetType(eventType.String()) + evtDataType := eventType.CloudEventsDataType clusterNameValue, err := evt.Context.GetExtension(types.ExtensionClusterName) if err != nil { @@ -412,7 +423,7 @@ func (bkr *GRPCBroker) handleRes( // the resource consumer name and its data type is in the subscriber list, ensuring // the event will be only processed when the consumer is subscribed to the current // broker. - if subscriber.clusterName == clusterName && subscriber.dataType == t { + if subscriber.clusterName == clusterName && subscriber.dataType == evtDataType { if err := subscriber.handler(ctx, subID, evt); err != nil { return err } @@ -421,63 +432,6 @@ func (bkr *GRPCBroker) handleRes( return nil } -// OnCreate is called by the controller when a resource is created on the maestro server. -func (bkr *GRPCBroker) OnCreate(ctx context.Context, t types.CloudEventsDataType, id string) error { - service, ok := bkr.services[t] - if !ok { - return fmt.Errorf("failed to find service for event type %s", t) - } - - resource, err := service.Get(ctx, id) - // if the resource is not found, it indicates the resource has been processed. - if errors.IsNotFound(err) { - return nil - } - if err != nil { - return err - } - - return bkr.handleRes(ctx, resource, t, "create_request") -} - -// OnUpdate is called by the controller when a resource is updated on the maestro server. -func (bkr *GRPCBroker) OnUpdate(ctx context.Context, t types.CloudEventsDataType, id string) error { - service, ok := bkr.services[t] - if !ok { - return fmt.Errorf("failed to find service for event type %s", t) - } - - resource, err := service.Get(ctx, id) - // if the resource is not found, it indicates the resource has been processed. - if errors.IsNotFound(err) { - return nil - } - if err != nil { - return err - } - - return bkr.handleRes(ctx, resource, t, "update_request") -} - -// OnDelete is called by the controller when a resource is deleted from the maestro server. -func (bkr *GRPCBroker) OnDelete(ctx context.Context, t types.CloudEventsDataType, id string) error { - service, ok := bkr.services[t] - if !ok { - return fmt.Errorf("failed to find service for event type %s", t) - } - - resource, err := service.Get(ctx, id) - // if the resource is not found, it indicates the resource has been processed. - if errors.IsNotFound(err) { - return nil - } - if err != nil { - return err - } - - return bkr.handleRes(ctx, resource, t, "delete_request") -} - // IsConsumerSubscribed returns true if the consumer is subscribed to the broker for resource spec. func (bkr *GRPCBroker) IsConsumerSubscribed(consumerName string) bool { bkr.mu.RLock() diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/server/interface.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/server/interface.go index d80470633..159f379ff 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/server/interface.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/server/interface.go @@ -3,6 +3,7 @@ package server import ( "context" + cloudevents "github.com/cloudevents/sdk-go/v2" "k8s.io/apimachinery/pkg/util/sets" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" @@ -22,14 +23,8 @@ type AgentEventServer interface { } type EventHandler interface { - // OnCreate is the callback when resource is created in the service. - OnCreate(ctx context.Context, t types.CloudEventsDataType, resourceID string) error - - // OnUpdate is the callback when resource is updated in the service. - OnUpdate(ctx context.Context, t types.CloudEventsDataType, resourceID string) error - - // OnDelete is the callback when resource is deleted from the service. - OnDelete(ctx context.Context, t types.CloudEventsDataType, resourceID string) error + // HandleEvent publish the event to the correct subscriber. + HandleEvent(ctx context.Context, evt *cloudevents.Event) error } // TODO SourceEventServer to handle the grpc conversation between consumers and grpcserver. diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/server/store.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/server/store.go index e356e3bab..4c928c53c 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/server/store.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/server/store.go @@ -2,6 +2,7 @@ package server import ( "context" + cloudevents "github.com/cloudevents/sdk-go/v2" cetypes "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" ) @@ -11,11 +12,8 @@ import ( // TODO need a method to check if an event has been processed already. type Service interface { - // Get the cloudEvent based on resourceID from the service - Get(ctx context.Context, resourceID string) (*cloudevents.Event, error) - // List the cloudEvent from the service - List(listOpts cetypes.ListOptions) ([]*cloudevents.Event, error) + List(ctx context.Context, listOpts cetypes.ListOptions) ([]*cloudevents.Event, error) // HandleStatusUpdate processes the resource status update from the agent. HandleStatusUpdate(ctx context.Context, evt *cloudevents.Event) error diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/helpers/resourceapply.go b/vendor/open-cluster-management.io/sdk-go/pkg/helpers/resourceapply.go index f03ee38e8..b52d7d07e 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/helpers/resourceapply.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/helpers/resourceapply.go @@ -65,10 +65,8 @@ func ApplyConfigMap(ctx context.Context, client coreclientv1.ConfigMapsGetter, r return actual, true, err } -// ApplySecret merges objectmeta, requires data. ref from openshift/library-go +// ApplySecret merges objectmeta, requires data, and updates OwnerReferences. ref from openshift/library-go func ApplySecret(ctx context.Context, client coreclientv1.SecretsGetter, requiredInput *corev1.Secret) (*corev1.Secret, bool, error) { - // copy the stringData to data. Error on a data content conflict inside required. This is usually a bug. - existing, err := client.Secrets(requiredInput.Namespace).Get(ctx, requiredInput.Name, metav1.GetOptions{}) if err != nil && !apierrors.IsNotFound(err) { return nil, false, err @@ -118,6 +116,11 @@ func ApplySecret(ctx context.Context, client coreclientv1.SecretsGetter, require existingCopy.Type = required.Type + // Add OwnerReferences only if the existing secret has none (add-only strategy) + if len(existingCopy.OwnerReferences) == 0 && len(required.OwnerReferences) > 0 { + existingCopy.OwnerReferences = required.OwnerReferences + } + // Server defaults some values and we need to do it as well or it will never equal. if existingCopy.Type == "" { existingCopy.Type = corev1.SecretTypeOpaque diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/server/grpc/server.go b/vendor/open-cluster-management.io/sdk-go/pkg/server/grpc/server.go index c729a02cb..376551871 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/server/grpc/server.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/server/grpc/server.go @@ -10,8 +10,10 @@ import ( grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" + "google.golang.org/grpc/status" "k8s.io/apimachinery/pkg/util/errors" k8smetrics "k8s.io/component-base/metrics" "k8s.io/klog/v2" @@ -221,7 +223,7 @@ func newAuthzUnaryInterceptor(authorizers ...authz.UnaryAuthorizer) grpc.UnarySe case authz.DecisionAllow: return handler(ctx, req) case authz.DecisionDeny: - return nil, fmt.Errorf("access denied: %v", err) + return nil, status.Error(codes.PermissionDenied, fmt.Sprintf("access denied: %v", err)) case authz.DecisionNoOpinion: if err != nil { errs = append(errs, err) @@ -234,7 +236,7 @@ func newAuthzUnaryInterceptor(authorizers ...authz.UnaryAuthorizer) grpc.UnarySe return nil, errors.NewAggregate(errs) } - return nil, fmt.Errorf("no authorizer found for %s", info.FullMethod) + return nil, status.Error(codes.Unauthenticated, fmt.Sprintf("no authorizer found for %s", info.FullMethod)) } } @@ -301,7 +303,7 @@ func newAuthzStreamInterceptor(authorizers []authz.StreamAuthorizer) grpc.Stream case authz.DecisionAllow: return handler(srv, authorizedStream) case authz.DecisionDeny: - return fmt.Errorf("access denied: %v", err) + return status.Error(codes.PermissionDenied, fmt.Sprintf("access denied: %v", err)) case authz.DecisionNoOpinion: if err != nil { errs = append(errs, err) @@ -311,9 +313,9 @@ func newAuthzStreamInterceptor(authorizers []authz.StreamAuthorizer) grpc.Stream } if len(errs) > 0 { - return errors.NewAggregate(errs) + return status.Error(codes.Internal, errors.NewAggregate(errs).Error()) } - return fmt.Errorf("no authorizer found for %s", info.FullMethod) + return status.Error(codes.Unauthenticated, fmt.Sprintf("no authorizer found for %s", info.FullMethod)) } }