mirror of
https://github.com/open-cluster-management-io/ocm.git
synced 2026-02-14 18:09:57 +00:00
add token request service (#1339)
Some checks failed
Post / images (amd64, placement) (push) Failing after 46s
Post / images (amd64, registration) (push) Failing after 42s
Post / images (amd64, registration-operator) (push) Failing after 39s
Post / images (amd64, work) (push) Failing after 39s
Post / images (arm64, addon-manager) (push) Failing after 41s
Post / images (arm64, placement) (push) Failing after 40s
Post / images (arm64, registration) (push) Failing after 41s
Post / images (arm64, registration-operator) (push) Failing after 40s
Post / images (arm64, work) (push) Failing after 42s
Post / images (amd64, addon-manager) (push) Failing after 7m44s
Post / image manifest (addon-manager) (push) Has been skipped
Post / image manifest (placement) (push) Has been skipped
Post / image manifest (registration) (push) Has been skipped
Post / image manifest (registration-operator) (push) Has been skipped
Post / image manifest (work) (push) Has been skipped
Post / trigger clusteradm e2e (push) Has been skipped
Post / coverage (push) Failing after 10m19s
Scorecard supply-chain security / Scorecard analysis (push) Failing after 23s
Close stale issues and PRs / stale (push) Successful in 44s
Some checks failed
Post / images (amd64, placement) (push) Failing after 46s
Post / images (amd64, registration) (push) Failing after 42s
Post / images (amd64, registration-operator) (push) Failing after 39s
Post / images (amd64, work) (push) Failing after 39s
Post / images (arm64, addon-manager) (push) Failing after 41s
Post / images (arm64, placement) (push) Failing after 40s
Post / images (arm64, registration) (push) Failing after 41s
Post / images (arm64, registration-operator) (push) Failing after 40s
Post / images (arm64, work) (push) Failing after 42s
Post / images (amd64, addon-manager) (push) Failing after 7m44s
Post / image manifest (addon-manager) (push) Has been skipped
Post / image manifest (placement) (push) Has been skipped
Post / image manifest (registration) (push) Has been skipped
Post / image manifest (registration-operator) (push) Has been skipped
Post / image manifest (work) (push) Has been skipped
Post / trigger clusteradm e2e (push) Has been skipped
Post / coverage (push) Failing after 10m19s
Scorecard supply-chain security / Scorecard analysis (push) Failing after 23s
Close stale issues and PRs / stale (push) Successful in 44s
Signed-off-by: Wei Liu <liuweixa@redhat.com>
This commit is contained in:
2
go.mod
2
go.mod
@@ -41,7 +41,7 @@ require (
|
||||
k8s.io/utils v0.0.0-20250604170112-4c0f3b243397
|
||||
open-cluster-management.io/addon-framework v1.1.1-0.20251222073158-b5846d76add9
|
||||
open-cluster-management.io/api v1.1.1-0.20260116065909-8307845802e0
|
||||
open-cluster-management.io/sdk-go v1.1.1-0.20260112054941-b6c1a665df1b
|
||||
open-cluster-management.io/sdk-go v1.1.1-0.20260120013142-6d087c9a2a3d
|
||||
sigs.k8s.io/about-api v0.0.0-20250131010323-518069c31c03
|
||||
sigs.k8s.io/cluster-inventory-api v0.0.0-20251124125836-445319b6307a
|
||||
sigs.k8s.io/controller-runtime v0.22.4
|
||||
|
||||
4
go.sum
4
go.sum
@@ -587,8 +587,8 @@ open-cluster-management.io/addon-framework v1.1.1-0.20251222073158-b5846d76add9
|
||||
open-cluster-management.io/addon-framework v1.1.1-0.20251222073158-b5846d76add9/go.mod h1:St9LTEuZ5ADLY9cVXSp+rVE/ZbPJ+hzNQ7/YcsiQVd8=
|
||||
open-cluster-management.io/api v1.1.1-0.20260116065909-8307845802e0 h1:FLYkctX92dosLXm8+SQhfXm3h9K4iiKAKUwJiK88bF4=
|
||||
open-cluster-management.io/api v1.1.1-0.20260116065909-8307845802e0/go.mod h1:YcmA6SpGEekIMxdoeVIIyOaBhMA6ImWRLXP4g8n8T+4=
|
||||
open-cluster-management.io/sdk-go v1.1.1-0.20260112054941-b6c1a665df1b h1:r5U3cDh6kuBmzKnAUqeoYPwwVU/VS9udvpcDEkxh6g4=
|
||||
open-cluster-management.io/sdk-go v1.1.1-0.20260112054941-b6c1a665df1b/go.mod h1:4haPv/uuKqQ3gxi62/PPknlrUFi132ga0KYLwj5tpx0=
|
||||
open-cluster-management.io/sdk-go v1.1.1-0.20260120013142-6d087c9a2a3d h1:V+2LZY0aPOStdRxnFvW+yL4y5UqC97R9x4lTQdjLVfg=
|
||||
open-cluster-management.io/sdk-go v1.1.1-0.20260120013142-6d087c9a2a3d/go.mod h1:4haPv/uuKqQ3gxi62/PPknlrUFi132ga0KYLwj5tpx0=
|
||||
sigs.k8s.io/about-api v0.0.0-20250131010323-518069c31c03 h1:1ShFiMjGQOR/8jTBkmZrk1gORxnvMwm1nOy2/DbHg4U=
|
||||
sigs.k8s.io/about-api v0.0.0-20250131010323-518069c31c03/go.mod h1:F1pT4mK53U6F16/zuaPSYpBaR7x5Kjym6aKJJC0/DHU=
|
||||
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.2 h1:jpcvIRr3GLoUoEKRkHKSmGjxb6lWwrBlJsXc+eUYQHM=
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
csrce "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/csr"
|
||||
eventce "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/event"
|
||||
leasece "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/lease"
|
||||
sace "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/serviceaccount"
|
||||
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/payload"
|
||||
pbv1 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protobuf/v1"
|
||||
cloudeventsgrpc "open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc"
|
||||
@@ -25,6 +26,7 @@ import (
|
||||
"open-cluster-management.io/ocm/pkg/server/services/csr"
|
||||
"open-cluster-management.io/ocm/pkg/server/services/event"
|
||||
"open-cluster-management.io/ocm/pkg/server/services/lease"
|
||||
"open-cluster-management.io/ocm/pkg/server/services/tokenrequest"
|
||||
"open-cluster-management.io/ocm/pkg/server/services/work"
|
||||
)
|
||||
|
||||
@@ -69,6 +71,7 @@ func (o *GRPCServerOptions) Run(ctx context.Context, controllerContext *controll
|
||||
lease.NewLeaseService(clients.KubeClient, clients.KubeInformers.Coordination().V1().Leases()))
|
||||
grpcEventServer.RegisterService(ctx, payload.ManifestBundleEventDataType,
|
||||
work.NewWorkService(clients.WorkClient, clients.WorkInformers.Work().V1().ManifestWorks()))
|
||||
grpcEventServer.RegisterService(ctx, sace.TokenRequestDataType, tokenrequest.NewTokenRequestService(clients.KubeClient))
|
||||
|
||||
// start clients
|
||||
go clients.Run(ctx)
|
||||
|
||||
123
pkg/server/services/tokenrequest/tokenreqeust.go
Normal file
123
pkg/server/services/tokenrequest/tokenreqeust.go
Normal file
@@ -0,0 +1,123 @@
|
||||
package tokenrequest
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
cloudevents "github.com/cloudevents/sdk-go/v2"
|
||||
authenticationv1 "k8s.io/api/authentication/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
sace "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/serviceaccount"
|
||||
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
|
||||
"open-cluster-management.io/sdk-go/pkg/cloudevents/server"
|
||||
|
||||
"open-cluster-management.io/ocm/pkg/server/services"
|
||||
)
|
||||
|
||||
var (
|
||||
// TokenCacheTTL is the time-to-live for cached token responses
|
||||
// Tokens are cached temporarily until the agent retrieves them
|
||||
TokenCacheTTL = 30 * time.Second
|
||||
)
|
||||
|
||||
type TokenRequestService struct {
|
||||
client kubernetes.Interface
|
||||
codec *sace.TokenRequestCodec
|
||||
handler server.EventHandler
|
||||
store cache.Store
|
||||
}
|
||||
|
||||
// NewTokenRequestService creates a new TokenRequestService with a TTL-based token cache
|
||||
func NewTokenRequestService(client kubernetes.Interface) server.Service {
|
||||
return &TokenRequestService{
|
||||
client: client,
|
||||
codec: sace.NewTokenRequestCodec(),
|
||||
store: cache.NewTTLStore(func(obj interface{}) (string, error) {
|
||||
tokenRequest, ok := obj.(*authenticationv1.TokenRequest)
|
||||
if !ok {
|
||||
return "", fmt.Errorf("object is not a TokenRequest")
|
||||
}
|
||||
return string(tokenRequest.UID), nil
|
||||
}, TokenCacheTTL),
|
||||
}
|
||||
}
|
||||
|
||||
func (t *TokenRequestService) Get(ctx context.Context, resourceID string) (*cloudevents.Event, error) {
|
||||
// Get the token request from store by resourceID
|
||||
obj, exists, err := t.store.GetByKey(resourceID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get token request from store: %v", err)
|
||||
}
|
||||
if !exists {
|
||||
return nil, errors.NewNotFound(authenticationv1.Resource("tokenrequests"), resourceID)
|
||||
}
|
||||
|
||||
tokenRequest, ok := obj.(*authenticationv1.TokenRequest)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("stored object is not a TokenRequest")
|
||||
}
|
||||
|
||||
// Token will be automatically removed from cache when TTL expires
|
||||
return t.codec.Encode(services.CloudEventsSourceKube, types.CloudEventsType{CloudEventsDataType: sace.TokenRequestDataType}, tokenRequest)
|
||||
}
|
||||
|
||||
func (t *TokenRequestService) List(listOpts types.ListOptions) ([]*cloudevents.Event, error) {
|
||||
// resync is not needed, so list is not required
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (t *TokenRequestService) HandleStatusUpdate(ctx context.Context, evt *cloudevents.Event) error {
|
||||
logger := klog.FromContext(ctx)
|
||||
|
||||
eventType, err := types.ParseCloudEventsType(evt.Type())
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse cloud event type %s, %v", evt.Type(), err)
|
||||
}
|
||||
|
||||
tokenRequest, err := t.codec.Decode(evt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
requestID := tokenRequest.UID
|
||||
|
||||
logger.V(4).Info("handle token request", "namespace", tokenRequest.Namespace,
|
||||
"serviceAccount", tokenRequest.Name, "requestID", requestID,
|
||||
"subResource", eventType.SubResource, "actionType", eventType.Action)
|
||||
|
||||
switch eventType.Action {
|
||||
case types.CreateRequestAction:
|
||||
// Create a token for the service account
|
||||
tokenResponse, err := t.client.CoreV1().ServiceAccounts(tokenRequest.Namespace).CreateToken(ctx, tokenRequest.Name, tokenRequest, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create token for service account %s/%s: %v", tokenRequest.Namespace, tokenRequest.Name, err)
|
||||
}
|
||||
|
||||
// set request id back
|
||||
tokenResponse.UID = requestID
|
||||
|
||||
// Cache the token response in the store for later retrieval
|
||||
if err := t.store.Add(tokenResponse); err != nil {
|
||||
return fmt.Errorf("failed to cache token response: %v", err)
|
||||
}
|
||||
|
||||
// Notify the handler that the token is ready for retrieval
|
||||
if err := t.handler.OnCreate(ctx, eventType.CloudEventsDataType, string(tokenRequest.UID)); err != nil {
|
||||
return fmt.Errorf("failed to notify handler: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("unsupported action %s for tokenRequest %s/%s", eventType.Action, tokenRequest.Namespace, tokenRequest.Name)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *TokenRequestService) RegisterHandler(ctx context.Context, handler server.EventHandler) {
|
||||
t.handler = handler
|
||||
}
|
||||
425
pkg/server/services/tokenrequest/tokenrequest_test.go
Normal file
425
pkg/server/services/tokenrequest/tokenrequest_test.go
Normal file
@@ -0,0 +1,425 @@
|
||||
package tokenrequest
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
cloudevents "github.com/cloudevents/sdk-go/v2"
|
||||
authenticationv1 "k8s.io/api/authentication/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
kubefake "k8s.io/client-go/kubernetes/fake"
|
||||
clienttesting "k8s.io/client-go/testing"
|
||||
|
||||
sace "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/serviceaccount"
|
||||
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
|
||||
|
||||
testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
|
||||
)
|
||||
|
||||
func TestNewTokenRequestService(t *testing.T) {
|
||||
kubeClient := kubefake.NewSimpleClientset()
|
||||
service := NewTokenRequestService(kubeClient)
|
||||
|
||||
tokenService, ok := service.(*TokenRequestService)
|
||||
if !ok {
|
||||
t.Errorf("expected TokenRequestService, got %T", service)
|
||||
}
|
||||
|
||||
if tokenService.client == nil {
|
||||
t.Errorf("client should not be nil")
|
||||
}
|
||||
|
||||
if tokenService.codec == nil {
|
||||
t.Errorf("codec should not be nil")
|
||||
}
|
||||
|
||||
if tokenService.store == nil {
|
||||
t.Errorf("store should not be nil")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGet(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
resourceID string
|
||||
setupStore func(*TokenRequestService)
|
||||
expectedError bool
|
||||
errorCheck func(error) bool
|
||||
}{
|
||||
{
|
||||
name: "token not found",
|
||||
resourceID: "non-existent-token",
|
||||
setupStore: func(s *TokenRequestService) {
|
||||
// Empty store
|
||||
},
|
||||
expectedError: true,
|
||||
errorCheck: func(err error) bool {
|
||||
return errors.IsNotFound(err)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "token found",
|
||||
resourceID: "test-token-uid",
|
||||
setupStore: func(s *TokenRequestService) {
|
||||
tokenRequest := &authenticationv1.TokenRequest{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-sa",
|
||||
Namespace: "test-namespace",
|
||||
},
|
||||
Spec: authenticationv1.TokenRequestSpec{
|
||||
Audiences: []string{"test-audience"},
|
||||
},
|
||||
Status: authenticationv1.TokenRequestStatus{
|
||||
Token: "test-token-value",
|
||||
ExpirationTimestamp: metav1.NewTime(time.Now().Add(1 * time.Hour)),
|
||||
},
|
||||
}
|
||||
tokenRequest.UID = "test-token-uid"
|
||||
s.store.Add(tokenRequest)
|
||||
},
|
||||
expectedError: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
kubeClient := kubefake.NewSimpleClientset()
|
||||
service := NewTokenRequestService(kubeClient).(*TokenRequestService)
|
||||
|
||||
if c.setupStore != nil {
|
||||
c.setupStore(service)
|
||||
}
|
||||
|
||||
evt, err := service.Get(context.Background(), c.resourceID)
|
||||
if c.expectedError {
|
||||
if err == nil {
|
||||
t.Errorf("expected error, got nil")
|
||||
return
|
||||
}
|
||||
if c.errorCheck != nil && !c.errorCheck(err) {
|
||||
t.Errorf("error check failed for error: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if evt == nil {
|
||||
t.Errorf("expected event, got nil")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestList(t *testing.T) {
|
||||
kubeClient := kubefake.NewSimpleClientset()
|
||||
service := NewTokenRequestService(kubeClient)
|
||||
|
||||
evts, err := service.List(types.ListOptions{})
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if evts != nil {
|
||||
t.Errorf("expected nil events, got %v", evts)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleStatusUpdate(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
serviceAccounts []runtime.Object
|
||||
tokenRequestEvt *cloudevents.Event
|
||||
validateActions func(t *testing.T, actions []clienttesting.Action)
|
||||
validateCache func(t *testing.T, service *TokenRequestService)
|
||||
validateHandler func(t *testing.T, handler *mockEventHandler)
|
||||
reactorError error
|
||||
expectedError bool
|
||||
expectedErrorText string
|
||||
}{
|
||||
{
|
||||
name: "invalid event type",
|
||||
serviceAccounts: []runtime.Object{},
|
||||
tokenRequestEvt: func() *cloudevents.Event {
|
||||
evt := types.NewEventBuilder("test", types.CloudEventsType{}).NewEvent()
|
||||
return &evt
|
||||
}(),
|
||||
expectedError: true,
|
||||
expectedErrorText: "failed to parse cloud event type",
|
||||
},
|
||||
{
|
||||
name: "invalid action",
|
||||
serviceAccounts: []runtime.Object{},
|
||||
tokenRequestEvt: func() *cloudevents.Event {
|
||||
evt := types.NewEventBuilder("test", types.CloudEventsType{
|
||||
CloudEventsDataType: sace.TokenRequestDataType,
|
||||
SubResource: types.SubResourceSpec,
|
||||
Action: types.DeleteRequestAction,
|
||||
}).NewEvent()
|
||||
tokenRequest := &authenticationv1.TokenRequest{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-sa",
|
||||
Namespace: "test-namespace",
|
||||
},
|
||||
}
|
||||
evt.SetData(cloudevents.ApplicationJSON, tokenRequest)
|
||||
return &evt
|
||||
}(),
|
||||
expectedError: true,
|
||||
expectedErrorText: "unsupported action",
|
||||
},
|
||||
{
|
||||
name: "create token request successfully",
|
||||
serviceAccounts: []runtime.Object{
|
||||
&corev1.ServiceAccount{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-sa",
|
||||
Namespace: "test-namespace",
|
||||
},
|
||||
},
|
||||
},
|
||||
tokenRequestEvt: func() *cloudevents.Event {
|
||||
evt := types.NewEventBuilder("test", types.CloudEventsType{
|
||||
CloudEventsDataType: sace.TokenRequestDataType,
|
||||
SubResource: types.SubResourceSpec,
|
||||
Action: types.CreateRequestAction,
|
||||
}).NewEvent()
|
||||
tokenRequest := &authenticationv1.TokenRequest{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-sa",
|
||||
Namespace: "test-namespace",
|
||||
},
|
||||
Spec: authenticationv1.TokenRequestSpec{
|
||||
Audiences: []string{"test-audience"},
|
||||
},
|
||||
}
|
||||
tokenRequest.UID = "test-request-uid"
|
||||
evt.SetData(cloudevents.ApplicationJSON, tokenRequest)
|
||||
return &evt
|
||||
}(),
|
||||
validateActions: func(t *testing.T, actions []clienttesting.Action) {
|
||||
testingcommon.AssertActions(t, actions, "create")
|
||||
if actions[0].GetSubresource() != "token" {
|
||||
t.Errorf("expected subresource %s, got %s", "token", actions[0].GetSubresource())
|
||||
}
|
||||
},
|
||||
validateCache: func(t *testing.T, service *TokenRequestService) {
|
||||
obj, exists, err := service.store.GetByKey("test-request-uid")
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error getting from cache: %v", err)
|
||||
}
|
||||
if !exists {
|
||||
t.Errorf("expected token to be cached")
|
||||
}
|
||||
tokenResponse, ok := obj.(*authenticationv1.TokenRequest)
|
||||
if !ok {
|
||||
t.Errorf("expected TokenRequest, got %T", obj)
|
||||
}
|
||||
if tokenResponse.UID != "test-request-uid" {
|
||||
t.Errorf("expected UID %s, got %s", "test-request-uid", tokenResponse.UID)
|
||||
}
|
||||
},
|
||||
validateHandler: func(t *testing.T, handler *mockEventHandler) {
|
||||
if !handler.onCreateCalled {
|
||||
t.Errorf("expected OnCreate to be called")
|
||||
}
|
||||
if handler.lastResourceID != "test-request-uid" {
|
||||
t.Errorf("expected resourceID %s, got %s", "test-request-uid", handler.lastResourceID)
|
||||
}
|
||||
if handler.lastDataType != sace.TokenRequestDataType {
|
||||
t.Errorf("expected data type %s, got %s", sace.TokenRequestDataType, handler.lastDataType)
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "create token request with client error",
|
||||
serviceAccounts: []runtime.Object{},
|
||||
tokenRequestEvt: func() *cloudevents.Event {
|
||||
evt := types.NewEventBuilder("test", types.CloudEventsType{
|
||||
CloudEventsDataType: sace.TokenRequestDataType,
|
||||
SubResource: types.SubResourceSpec,
|
||||
Action: types.CreateRequestAction,
|
||||
}).NewEvent()
|
||||
tokenRequest := &authenticationv1.TokenRequest{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-sa",
|
||||
Namespace: "test-namespace",
|
||||
},
|
||||
Spec: authenticationv1.TokenRequestSpec{
|
||||
Audiences: []string{"test-audience"},
|
||||
},
|
||||
}
|
||||
evt.SetData(cloudevents.ApplicationJSON, tokenRequest)
|
||||
return &evt
|
||||
}(),
|
||||
reactorError: fmt.Errorf("simulated error"),
|
||||
expectedError: true,
|
||||
expectedErrorText: "failed to create token for service account",
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
kubeClient := kubefake.NewSimpleClientset(c.serviceAccounts...)
|
||||
|
||||
// Add reactor for error simulation
|
||||
if c.reactorError != nil {
|
||||
kubeClient.PrependReactor("create", "serviceaccounts", func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) {
|
||||
return true, nil, c.reactorError
|
||||
})
|
||||
}
|
||||
|
||||
service := NewTokenRequestService(kubeClient).(*TokenRequestService)
|
||||
|
||||
// Create and register a mock handler
|
||||
mockHandler := &mockEventHandler{}
|
||||
service.RegisterHandler(context.Background(), mockHandler)
|
||||
|
||||
err := service.HandleStatusUpdate(context.Background(), c.tokenRequestEvt)
|
||||
if c.expectedError {
|
||||
if err == nil {
|
||||
t.Errorf("expected error, got nil")
|
||||
return
|
||||
}
|
||||
if c.expectedErrorText != "" && err.Error()[:len(c.expectedErrorText)] != c.expectedErrorText {
|
||||
t.Errorf("expected error to contain %q, got %q", c.expectedErrorText, err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if c.validateActions != nil {
|
||||
c.validateActions(t, kubeClient.Actions())
|
||||
}
|
||||
|
||||
if c.validateCache != nil {
|
||||
c.validateCache(t, service)
|
||||
}
|
||||
|
||||
if c.validateHandler != nil {
|
||||
c.validateHandler(t, mockHandler)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegisterHandler(t *testing.T) {
|
||||
kubeClient := kubefake.NewSimpleClientset()
|
||||
service := NewTokenRequestService(kubeClient).(*TokenRequestService)
|
||||
|
||||
mockHandler := &mockEventHandler{}
|
||||
service.RegisterHandler(context.Background(), mockHandler)
|
||||
|
||||
if service.handler == nil {
|
||||
t.Errorf("handler should not be nil after registration")
|
||||
}
|
||||
}
|
||||
|
||||
func TestTokenCacheTTL(t *testing.T) {
|
||||
// Save original TTL and restore after test
|
||||
originalTTL := TokenCacheTTL
|
||||
defer func() {
|
||||
TokenCacheTTL = originalTTL
|
||||
}()
|
||||
|
||||
// Use a shorter TTL for faster test
|
||||
TokenCacheTTL = 2 * time.Second
|
||||
|
||||
kubeClient := kubefake.NewSimpleClientset(&corev1.ServiceAccount{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-sa",
|
||||
Namespace: "test-namespace",
|
||||
},
|
||||
})
|
||||
|
||||
service := NewTokenRequestService(kubeClient).(*TokenRequestService)
|
||||
mockHandler := &mockEventHandler{}
|
||||
service.RegisterHandler(context.Background(), mockHandler)
|
||||
|
||||
// Create a token request event
|
||||
evt := types.NewEventBuilder("test", types.CloudEventsType{
|
||||
CloudEventsDataType: sace.TokenRequestDataType,
|
||||
SubResource: types.SubResourceSpec,
|
||||
Action: types.CreateRequestAction,
|
||||
}).NewEvent()
|
||||
tokenRequest := &authenticationv1.TokenRequest{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-sa",
|
||||
Namespace: "test-namespace",
|
||||
},
|
||||
Spec: authenticationv1.TokenRequestSpec{
|
||||
Audiences: []string{"test-audience"},
|
||||
},
|
||||
}
|
||||
tokenRequest.UID = "test-ttl-uid"
|
||||
evt.SetData(cloudevents.ApplicationJSON, tokenRequest)
|
||||
|
||||
// Handle the event to cache the token
|
||||
err := service.HandleStatusUpdate(context.Background(), &evt)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// Verify token is in cache
|
||||
_, exists, err := service.store.GetByKey("test-ttl-uid")
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if !exists {
|
||||
t.Errorf("expected token to be in cache")
|
||||
}
|
||||
|
||||
// Wait for TTL to expire
|
||||
time.Sleep(TokenCacheTTL + 1*time.Second)
|
||||
|
||||
// Verify token is removed from cache
|
||||
_, exists, err = service.store.GetByKey("test-ttl-uid")
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if exists {
|
||||
t.Errorf("expected token to be removed from cache after TTL")
|
||||
}
|
||||
}
|
||||
|
||||
// mockEventHandler is a mock implementation of server.EventHandler for testing
|
||||
type mockEventHandler struct {
|
||||
onCreateCalled bool
|
||||
onUpdateCalled bool
|
||||
onDeleteCalled bool
|
||||
lastDataType types.CloudEventsDataType
|
||||
lastResourceID string
|
||||
}
|
||||
|
||||
func (m *mockEventHandler) OnCreate(ctx context.Context, dataType types.CloudEventsDataType, resourceID string) error {
|
||||
m.onCreateCalled = true
|
||||
m.lastDataType = dataType
|
||||
m.lastResourceID = resourceID
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockEventHandler) OnUpdate(ctx context.Context, dataType types.CloudEventsDataType, resourceID string) error {
|
||||
m.onUpdateCalled = true
|
||||
m.lastDataType = dataType
|
||||
m.lastResourceID = resourceID
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockEventHandler) OnDelete(ctx context.Context, dataType types.CloudEventsDataType, resourceID string) error {
|
||||
m.onDeleteCalled = true
|
||||
m.lastDataType = dataType
|
||||
m.lastResourceID = resourceID
|
||||
return nil
|
||||
}
|
||||
3
vendor/modules.txt
vendored
3
vendor/modules.txt
vendored
@@ -1961,7 +1961,7 @@ open-cluster-management.io/api/operator/v1
|
||||
open-cluster-management.io/api/utils/work/v1/workapplier
|
||||
open-cluster-management.io/api/work/v1
|
||||
open-cluster-management.io/api/work/v1alpha1
|
||||
# open-cluster-management.io/sdk-go v1.1.1-0.20260112054941-b6c1a665df1b
|
||||
# open-cluster-management.io/sdk-go v1.1.1-0.20260120013142-6d087c9a2a3d
|
||||
## explicit; go 1.25.0
|
||||
open-cluster-management.io/sdk-go/pkg/apis/cluster/v1alpha1
|
||||
open-cluster-management.io/sdk-go/pkg/apis/cluster/v1beta1
|
||||
@@ -1984,6 +1984,7 @@ open-cluster-management.io/sdk-go/pkg/cloudevents/clients/errors
|
||||
open-cluster-management.io/sdk-go/pkg/cloudevents/clients/event
|
||||
open-cluster-management.io/sdk-go/pkg/cloudevents/clients/lease
|
||||
open-cluster-management.io/sdk-go/pkg/cloudevents/clients/options
|
||||
open-cluster-management.io/sdk-go/pkg/cloudevents/clients/serviceaccount
|
||||
open-cluster-management.io/sdk-go/pkg/cloudevents/clients/statushash
|
||||
open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store
|
||||
open-cluster-management.io/sdk-go/pkg/cloudevents/clients/utils
|
||||
|
||||
67
vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/options/generic.go
generated
vendored
67
vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/options/generic.go
generated
vendored
@@ -80,9 +80,8 @@ func (o *GenericClientOptions[T]) WithSubscription(enabled bool) *GenericClientO
|
||||
return o
|
||||
}
|
||||
|
||||
// WithResyncEnabled control the client resync (Default is true), if it's true, the resync happens when
|
||||
// 1. after the client's store is initiated
|
||||
// 2. the client reconnected
|
||||
// WithResyncEnabled control the client resync (Default is true), if it's true, the resync happens after
|
||||
// the client subscribed
|
||||
func (o *GenericClientOptions[T]) WithResyncEnabled(resync bool) *GenericClientOptions[T] {
|
||||
o.resync = resync
|
||||
return o
|
||||
@@ -131,18 +130,20 @@ func (o *GenericClientOptions[T]) AgentClient(ctx context.Context) (generic.Clou
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if o.subscription {
|
||||
// start to subscribe
|
||||
cloudEventsClient.Subscribe(ctx, o.watcherStore.HandleReceivedResource)
|
||||
if !o.subscription {
|
||||
return cloudEventsClient, nil
|
||||
}
|
||||
|
||||
// start to subscribe
|
||||
cloudEventsClient.Subscribe(ctx, o.watcherStore.HandleReceivedResource)
|
||||
|
||||
// start a go routine to receive client reconnect signal
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-cloudEventsClient.ReconnectedChan():
|
||||
case <-cloudEventsClient.SubscribedChan():
|
||||
if !o.resync {
|
||||
logger.Info("resync is disabled, do nothing")
|
||||
continue
|
||||
@@ -150,26 +151,15 @@ func (o *GenericClientOptions[T]) AgentClient(ctx context.Context) (generic.Clou
|
||||
|
||||
// when receiving a client reconnected signal, we resync all sources for this agent
|
||||
// TODO after supporting multiple sources, we should only resync agent known sources
|
||||
if err := cloudEventsClient.Resync(ctx, types.SourceAll); err != nil {
|
||||
logger.Error(err, "failed to send resync request")
|
||||
if store.WaitForStoreInit(ctx, o.watcherStore.HasInitiated) {
|
||||
if err := cloudEventsClient.Resync(ctx, types.SourceAll); err != nil {
|
||||
logger.Error(err, "failed to send resync request")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
if !o.resync {
|
||||
return cloudEventsClient, nil
|
||||
}
|
||||
|
||||
// start a go routine to resync the works after this client's store is initiated
|
||||
go func() {
|
||||
if store.WaitForStoreInit(ctx, o.watcherStore.HasInitiated) {
|
||||
if err := cloudEventsClient.Resync(ctx, types.SourceAll); err != nil {
|
||||
logger.Error(err, "failed to send resync request")
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return cloudEventsClient, nil
|
||||
}
|
||||
|
||||
@@ -204,43 +194,34 @@ func (o *GenericClientOptions[T]) SourceClient(ctx context.Context) (generic.Clo
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if o.subscription {
|
||||
// start to subscribe
|
||||
cloudEventsClient.Subscribe(ctx, o.watcherStore.HandleReceivedResource)
|
||||
if !o.subscription {
|
||||
return cloudEventsClient, nil
|
||||
}
|
||||
|
||||
// start a go routine to receive client reconnect signal
|
||||
// start to subscribe
|
||||
cloudEventsClient.Subscribe(ctx, o.watcherStore.HandleReceivedResource)
|
||||
|
||||
// start a go routine to receive client subscribed signal
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-cloudEventsClient.ReconnectedChan():
|
||||
case <-cloudEventsClient.SubscribedChan():
|
||||
if !o.resync {
|
||||
logger.Info("resync is disabled, do nothing")
|
||||
continue
|
||||
}
|
||||
|
||||
// when receiving a client reconnected signal, we resync all clusters for this source
|
||||
if err := cloudEventsClient.Resync(ctx, types.ClusterAll); err != nil {
|
||||
logger.Error(err, "failed to send resync request")
|
||||
// when receiving a client subscribed signal, we resync all clusters for this source
|
||||
if store.WaitForStoreInit(ctx, o.watcherStore.HasInitiated) {
|
||||
if err := cloudEventsClient.Resync(ctx, types.ClusterAll); err != nil {
|
||||
logger.Error(err, "failed to send resync request")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
if !o.resync {
|
||||
return cloudEventsClient, nil
|
||||
}
|
||||
|
||||
// start a go routine to resync the works after this client's store is initiated
|
||||
go func() {
|
||||
if store.WaitForStoreInit(ctx, o.watcherStore.HasInitiated) {
|
||||
if err := cloudEventsClient.Resync(ctx, types.ClusterAll); err != nil {
|
||||
logger.Error(err, "failed to send resync request")
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return cloudEventsClient, nil
|
||||
}
|
||||
|
||||
158
vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/serviceaccount/client.go
generated
vendored
Normal file
158
vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/serviceaccount/client.go
generated
vendored
Normal file
@@ -0,0 +1,158 @@
|
||||
package serviceaccount
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
authenticationv1 "k8s.io/api/authentication/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
applyconfigurationscorev1 "k8s.io/client-go/applyconfigurations/core/v1"
|
||||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/clients"
|
||||
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/builder"
|
||||
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc"
|
||||
cetypes "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
|
||||
)
|
||||
|
||||
var (
|
||||
// TokenRequestTimeout is the timeout for CreateToken requests
|
||||
TokenRequestTimeout = 10 * time.Second
|
||||
)
|
||||
|
||||
type ServiceAccountClient struct {
|
||||
grpcOptions *grpc.GRPCOptions
|
||||
clusterName string
|
||||
}
|
||||
|
||||
var _ corev1client.ServiceAccountInterface = &ServiceAccountClient{}
|
||||
|
||||
// NewServiceAccountClient returns a ServiceAccountInterface
|
||||
// This client only supports creating token via gRPC in cluster namespace on the hub.
|
||||
func NewServiceAccountClient(clusterName string, opt *grpc.GRPCOptions) *ServiceAccountClient {
|
||||
return &ServiceAccountClient{
|
||||
grpcOptions: opt,
|
||||
clusterName: clusterName,
|
||||
}
|
||||
}
|
||||
|
||||
func (sa *ServiceAccountClient) Create(ctx context.Context, serviceAccount *corev1.ServiceAccount, opts metav1.CreateOptions) (*corev1.ServiceAccount, error) {
|
||||
return nil, errors.NewMethodNotSupported(corev1.Resource("serviceaccounts"), "create")
|
||||
}
|
||||
|
||||
func (sa *ServiceAccountClient) Update(ctx context.Context, serviceAccount *corev1.ServiceAccount, opts metav1.UpdateOptions) (*corev1.ServiceAccount, error) {
|
||||
return nil, errors.NewMethodNotSupported(corev1.Resource("serviceaccounts"), "update")
|
||||
}
|
||||
|
||||
func (sa *ServiceAccountClient) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error {
|
||||
return errors.NewMethodNotSupported(corev1.Resource("serviceaccounts"), "delete")
|
||||
}
|
||||
|
||||
func (sa *ServiceAccountClient) DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error {
|
||||
return errors.NewMethodNotSupported(corev1.Resource("serviceaccounts"), "delete")
|
||||
}
|
||||
|
||||
func (sa *ServiceAccountClient) Get(ctx context.Context, name string, opts metav1.GetOptions) (*corev1.ServiceAccount, error) {
|
||||
return nil, errors.NewMethodNotSupported(corev1.Resource("serviceaccounts"), "get")
|
||||
}
|
||||
|
||||
func (sa *ServiceAccountClient) List(ctx context.Context, opts metav1.ListOptions) (*corev1.ServiceAccountList, error) {
|
||||
return nil, errors.NewMethodNotSupported(corev1.Resource("serviceaccounts"), "list")
|
||||
}
|
||||
|
||||
func (sa *ServiceAccountClient) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
|
||||
return nil, errors.NewMethodNotSupported(corev1.Resource("serviceaccounts"), "watch")
|
||||
}
|
||||
|
||||
func (sa *ServiceAccountClient) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *corev1.ServiceAccount, err error) {
|
||||
return nil, errors.NewMethodNotSupported(corev1.Resource("serviceaccounts"), "patch")
|
||||
}
|
||||
|
||||
func (sa *ServiceAccountClient) Apply(ctx context.Context, serviceAccount *applyconfigurationscorev1.ServiceAccountApplyConfiguration, opts metav1.ApplyOptions) (result *corev1.ServiceAccount, err error) {
|
||||
return nil, errors.NewMethodNotSupported(corev1.Resource("serviceaccounts"), "apply")
|
||||
}
|
||||
|
||||
func (sa *ServiceAccountClient) CreateToken(ctx context.Context, serviceAccountName string, tokenRequest *authenticationv1.TokenRequest, opts metav1.CreateOptions) (*authenticationv1.TokenRequest, error) {
|
||||
if tokenRequest == nil {
|
||||
return nil, errors.NewBadRequest("tokenRequest is nil")
|
||||
}
|
||||
|
||||
tokenRequestCtx, cancel := context.WithTimeout(ctx, TokenRequestTimeout)
|
||||
defer cancel() // Ensure client resources are cleaned up
|
||||
|
||||
responseChan := make(chan *authenticationv1.TokenRequest, 1)
|
||||
|
||||
options, err := builder.BuildCloudEventsAgentOptions(
|
||||
sa.grpcOptions,
|
||||
sa.clusterName,
|
||||
sa.clusterName,
|
||||
TokenRequestDataType,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cloudEventsClient, err := clients.NewCloudEventAgentClient(
|
||||
tokenRequestCtx,
|
||||
options,
|
||||
nil, // resync is disabled, so lister is not required
|
||||
nil, // resync is disabled, so statusHashGetter is not required
|
||||
&TokenRequestCodec{},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
requestID := types.UID(uuid.New().String())
|
||||
|
||||
// subscribe before publish to avoid missing the response
|
||||
cloudEventsClient.Subscribe(tokenRequestCtx, func(handlerCtx context.Context, resp *authenticationv1.TokenRequest) error {
|
||||
if resp.UID != requestID {
|
||||
return nil
|
||||
}
|
||||
|
||||
logger := klog.FromContext(handlerCtx)
|
||||
logger.V(4).Info("response token", "requestID", resp.UID, "serviceAccountName", resp.Name)
|
||||
responseChan <- resp
|
||||
return nil
|
||||
})
|
||||
|
||||
// Wait for subscription to complete on the server before publishing
|
||||
select {
|
||||
case <-cloudEventsClient.SubscribedChan():
|
||||
// Subscription is ready
|
||||
case <-tokenRequestCtx.Done():
|
||||
return nil, errors.NewInternalError(tokenRequestCtx.Err())
|
||||
}
|
||||
|
||||
eventType := cetypes.CloudEventsType{
|
||||
CloudEventsDataType: TokenRequestDataType,
|
||||
SubResource: cetypes.SubResourceSpec,
|
||||
Action: cetypes.CreateRequestAction,
|
||||
}
|
||||
|
||||
newTokenRequest := tokenRequest.DeepCopy()
|
||||
newTokenRequest.UID = requestID
|
||||
newTokenRequest.Name = serviceAccountName
|
||||
newTokenRequest.Namespace = sa.clusterName // the serviceaccount should locate in cluster namespace on hub
|
||||
|
||||
logger := klog.FromContext(tokenRequestCtx)
|
||||
logger.V(4).Info("request token", "requestID", requestID, "serviceAccountName", serviceAccountName)
|
||||
if err := cloudEventsClient.Publish(tokenRequestCtx, eventType, newTokenRequest); err != nil {
|
||||
return nil, errors.NewInternalError(err)
|
||||
}
|
||||
|
||||
// wait until the tokenRequestResponse is received or timeout
|
||||
select {
|
||||
case tokenRequestResponse := <-responseChan:
|
||||
return tokenRequestResponse, nil
|
||||
case <-tokenRequestCtx.Done():
|
||||
return nil, errors.NewInternalError(tokenRequestCtx.Err())
|
||||
}
|
||||
}
|
||||
61
vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/serviceaccount/codec.go
generated
vendored
Normal file
61
vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/serviceaccount/codec.go
generated
vendored
Normal file
@@ -0,0 +1,61 @@
|
||||
package serviceaccount
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
authenticationv1 "k8s.io/api/authentication/v1"
|
||||
|
||||
cloudevents "github.com/cloudevents/sdk-go/v2"
|
||||
|
||||
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
|
||||
)
|
||||
|
||||
var TokenRequestDataType = types.CloudEventsDataType{
|
||||
Group: authenticationv1.GroupName,
|
||||
Version: "v1",
|
||||
Resource: "tokenrequests",
|
||||
}
|
||||
|
||||
// TokenRequestCodec is a codec to encode/decode a event/cloudevent for an agent.
|
||||
type TokenRequestCodec struct{}
|
||||
|
||||
func NewTokenRequestCodec() *TokenRequestCodec {
|
||||
return &TokenRequestCodec{}
|
||||
}
|
||||
|
||||
// EventDataType always returns the event data type `authentication.k8s.io.v1.tokenrequests`.
|
||||
func (c *TokenRequestCodec) EventDataType() types.CloudEventsDataType {
|
||||
return TokenRequestDataType
|
||||
}
|
||||
|
||||
// Encode the event to a cloudevent
|
||||
func (c *TokenRequestCodec) Encode(source string, eventType types.CloudEventsType, tokenRequest *authenticationv1.TokenRequest) (*cloudevents.Event, error) {
|
||||
if tokenRequest == nil {
|
||||
return nil, fmt.Errorf("tokenRequest is nil")
|
||||
}
|
||||
|
||||
if eventType.CloudEventsDataType != TokenRequestDataType {
|
||||
return nil, fmt.Errorf("unsupported cloudevents data type %v", eventType.CloudEventsDataType)
|
||||
}
|
||||
|
||||
evt := types.NewEventBuilder(source, eventType).
|
||||
WithResourceID(string(tokenRequest.UID)).
|
||||
WithClusterName(tokenRequest.Namespace).
|
||||
NewEvent()
|
||||
|
||||
if err := evt.SetData(cloudevents.ApplicationJSON, tokenRequest); err != nil {
|
||||
return nil, fmt.Errorf("failed to encode event to a cloudevent: %v", err)
|
||||
}
|
||||
|
||||
return &evt, nil
|
||||
}
|
||||
|
||||
// Decode a cloudevent to an event object
|
||||
func (c *TokenRequestCodec) Decode(evt *cloudevents.Event) (*authenticationv1.TokenRequest, error) {
|
||||
tokenRequest := &authenticationv1.TokenRequest{}
|
||||
if err := evt.DataAs(tokenRequest); err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal event data: %w", err)
|
||||
}
|
||||
|
||||
return tokenRequest, nil
|
||||
}
|
||||
@@ -59,10 +59,10 @@ func NewCloudEventAgentClient[T generic.ResourceObject](
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ReconnectedChan returns a chan which indicates the source/agent client is reconnected.
|
||||
// The source/agent client callers should consider sending a resync request when receiving this signal.
|
||||
func (c *CloudEventAgentClient[T]) ReconnectedChan() <-chan struct{} {
|
||||
return c.resyncChan
|
||||
// SubscribedChan returns a chan which indicates the agent client is subscribed.
|
||||
// The agent client callers should consider sending a resync request when receiving this signal.
|
||||
func (c *CloudEventAgentClient[T]) SubscribedChan() <-chan struct{} {
|
||||
return c.subscribedChan
|
||||
}
|
||||
|
||||
// Resync the resources spec by sending a spec resync request from the current to the given source.
|
||||
|
||||
@@ -60,7 +60,7 @@ type baseClient struct {
|
||||
transport options.CloudEventTransport
|
||||
cloudEventsRateLimiter flowcontrol.RateLimiter
|
||||
receiverChan chan int
|
||||
resyncChan chan struct{}
|
||||
subscribedChan chan struct{}
|
||||
subscribeChan chan struct{}
|
||||
connected atomic.Bool
|
||||
subscribed atomic.Bool
|
||||
@@ -71,7 +71,7 @@ func newBaseClient(clientID string, transport options.CloudEventTransport, limit
|
||||
clientID: clientID,
|
||||
transport: transport,
|
||||
cloudEventsRateLimiter: utils.NewRateLimiter(limit),
|
||||
resyncChan: make(chan struct{}, 1),
|
||||
subscribedChan: make(chan struct{}, 1),
|
||||
subscribeChan: make(chan struct{}, 1),
|
||||
receiverChan: make(chan int, 2), // Allow both stop and start signals to be buffered
|
||||
}
|
||||
@@ -229,7 +229,7 @@ func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) {
|
||||
|
||||
// Notify the client caller to resync the resources
|
||||
select {
|
||||
case c.resyncChan <- struct{}{}:
|
||||
case c.subscribedChan <- struct{}{}:
|
||||
// Signal sent successfully
|
||||
default:
|
||||
// Resync channel is unavailable, that's okay - don't block
|
||||
|
||||
@@ -59,8 +59,10 @@ func NewCloudEventSourceClient[T generic.ResourceObject](
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *CloudEventSourceClient[T]) ReconnectedChan() <-chan struct{} {
|
||||
return c.resyncChan
|
||||
// SubscribedChan returns a chan which indicates the source client is subscribed.
|
||||
// The source client callers should consider sending a resync request when receiving this signal.
|
||||
func (c *CloudEventSourceClient[T]) SubscribedChan() <-chan struct{} {
|
||||
return c.subscribedChan
|
||||
}
|
||||
|
||||
// Resync the resources status by sending a status resync request from the current source to a specified cluster.
|
||||
|
||||
4
vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/interface.go
generated
vendored
4
vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/interface.go
generated
vendored
@@ -72,7 +72,7 @@ type CloudEventsClient[T ResourceObject] interface {
|
||||
// ResourceHandler to handle them.
|
||||
Subscribe(ctx context.Context, handlers ...ResourceHandler[T])
|
||||
|
||||
// ReconnectedChan returns a chan which indicates the source/agent client is reconnected.
|
||||
// SubscribedChan returns a chan which indicates the source/agent client is subscribed.
|
||||
// The source/agent client callers should consider sending a resync request when receiving this signal.
|
||||
ReconnectedChan() <-chan struct{}
|
||||
SubscribedChan() <-chan struct{}
|
||||
}
|
||||
|
||||
11
vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/authz/kube/sar.go
generated
vendored
11
vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/authz/kube/sar.go
generated
vendored
@@ -3,9 +3,11 @@ package sar
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/addon/v1alpha1"
|
||||
"sync"
|
||||
|
||||
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/addon/v1alpha1"
|
||||
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/serviceaccount"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"k8s.io/klog/v2"
|
||||
pbv1 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protobuf/v1"
|
||||
@@ -202,6 +204,13 @@ func toSubjectAccessReview(clusterName string, user string, groups []string, eve
|
||||
sar.Spec.ResourceAttributes.Group = eventsType.Group
|
||||
sar.Spec.ResourceAttributes.Resource = eventsType.Resource
|
||||
return sar, nil
|
||||
case serviceaccount.TokenRequestDataType:
|
||||
sar.Spec.ResourceAttributes.Group = ""
|
||||
sar.Spec.ResourceAttributes.Resource = "serviceaccounts"
|
||||
sar.Spec.ResourceAttributes.Subresource = "token"
|
||||
// the verb "create" is required for both token request pub and sub.
|
||||
sar.Spec.ResourceAttributes.Verb = "create"
|
||||
return sar, nil
|
||||
case payload.ManifestBundleEventDataType:
|
||||
sar.Spec.ResourceAttributes.Group = workv1.SchemeGroupVersion.Group
|
||||
sar.Spec.ResourceAttributes.Resource = "manifestworks"
|
||||
|
||||
7
vendor/open-cluster-management.io/sdk-go/pkg/server/grpc/server.go
generated
vendored
7
vendor/open-cluster-management.io/sdk-go/pkg/server/grpc/server.go
generated
vendored
@@ -14,12 +14,13 @@ import (
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"k8s.io/apimachinery/pkg/util/errors"
|
||||
k8smetrics "k8s.io/component-base/metrics"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/klog/v2/textlogger"
|
||||
"open-cluster-management.io/sdk-go/pkg/server/grpc/authn"
|
||||
"open-cluster-management.io/sdk-go/pkg/server/grpc/authz"
|
||||
"open-cluster-management.io/sdk-go/pkg/server/grpc/metrics"
|
||||
"sigs.k8s.io/controller-runtime/pkg/certwatcher"
|
||||
|
||||
"k8s.io/klog/v2"
|
||||
"sigs.k8s.io/controller-runtime/pkg/log"
|
||||
)
|
||||
|
||||
type GRPCServer struct {
|
||||
@@ -80,6 +81,8 @@ func (b *GRPCServer) Run(ctx context.Context) error {
|
||||
Timeout: b.options.ServerPingTimeout,
|
||||
}))
|
||||
|
||||
// Set textlogger with verbosity level 4 for controller-runtime logging
|
||||
log.SetLogger(textlogger.NewLogger(textlogger.NewConfig(textlogger.Verbosity(4))))
|
||||
// Serve with TLS - use certwatcher for dynamic certificate reloading
|
||||
certWatcher, err := certwatcher.New(b.options.TLSCertFile, b.options.TLSKeyFile)
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user