Files
open-cluster-management/pkg/spoke/auth/cache/executor_cache_controller.go
Jian Zhu 922fbd90c3 cache the executor validation results (#165)
* cache the executor validation results

Signed-off-by: zhujian <jiazhu@redhat.com>

* move executor cache controller to auth package

Signed-off-by: zhujian <jiazhu@redhat.com>

* add a binding resource executor mapper to process delete event

Signed-off-by: zhujian <jiazhu@redhat.com>

* initialize caches before starting the cache controller

Signed-off-by: zhujian <jiazhu@redhat.com>

* read enable executor caches from flag

Signed-off-by: zhujian <jiazhu@redhat.com>

* add unit tests for cache store

Signed-off-by: zhujian <jiazhu@redhat.com>

* add unit tests for cache validator

Signed-off-by: zhujian <jiazhu@redhat.com>

* add unit tests for cache controller

Signed-off-by: zhujian <jiazhu@redhat.com>

* read enable executor caches from feature gate

Signed-off-by: zhujian <jiazhu@redhat.com>

* add integration tests for cache controller

Signed-off-by: zhujian <jiazhu@redhat.com>

* add a description doc for the cache package

Signed-off-by: zhujian <jiazhu@redhat.com>

Signed-off-by: zhujian <jiazhu@redhat.com>
2022-11-22 03:03:37 -05:00

328 lines
11 KiB
Go

package cache
import (
"context"
"fmt"
"sync"
"time"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
rbacapiv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
rbacv1 "k8s.io/client-go/informers/rbac/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
workapiv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/work/pkg/spoke/auth/store"
)
var (
ResyncInterval = 10 * time.Minute
)
// CacheController is to refresh the executor auth result for manfiestwork workloads on the spoke cluster.
type CacheController struct {
// executorCaches caches the subject access review results of a specific resource for executors
executorCaches *store.ExecutorCaches
sarCheckerFn SubjectAccessReviewCheckFn
// manifestWorkExecutorCachesLoader can load all valuable caches in the current state cluster into an
// executor cache data structure. This is used by the controller to clean up unneeded items in the
// executor caches every ResyncInterval period.
manifestWorkExecutorCachesLoader manifestWorkExecutorCachesLoader
// bindingExecutorsMapper caches the mapping relationship between binding resources(ClusterRoleBinding &
// RoleBinding) and executors. The reason why we need this is that there is a case: when the binding
// resources are deleted, and we can only get the binding resource key, but can not get its object(spec),
// so we can not get the corresponding executor.
//
// The key of the map could be:
// - a ClusterRoleBinding, in the format of "cluster-role-binding-name"
// - OR a RoleBinding, in the format of "role-binding-namespace/role-binding-name"
// The value of the map is the executor in the format of "executor-namespace/executor-name"
bindingExecutorsMapper *safeMap
}
// NewExecutorCacheController returns an ExecutorCacheController, the controller will watch all the RBAC resources(role,
// rolebinding, clusterrole, clusterrolebinding) related to the executors used by the manifestworks, and update the
// caches of the corresponding executor when the RBAC resources change
func NewExecutorCacheController(
ctx context.Context,
recorder events.Recorder,
crbInformer rbacv1.ClusterRoleBindingInformer,
rbInformer rbacv1.RoleBindingInformer,
crInformer rbacv1.ClusterRoleInformer,
rInformer rbacv1.RoleInformer,
manifestWorkExecutorCachesLoader manifestWorkExecutorCachesLoader,
executorCaches *store.ExecutorCaches,
sarCheckerFn SubjectAccessReviewCheckFn,
) factory.Controller {
controller := &CacheController{
manifestWorkExecutorCachesLoader: manifestWorkExecutorCachesLoader,
executorCaches: executorCaches,
sarCheckerFn: sarCheckerFn,
bindingExecutorsMapper: newSafeMap(),
}
return newControllerInner(controller, recorder, crbInformer, rbInformer, crInformer, rInformer)
}
func newSafeMap() *safeMap {
return &safeMap{
lock: sync.RWMutex{},
items: make(map[string][]string),
}
}
// newControllerInner is an inner function to create a cache controller,
// this is useful for unit test to fake the value of CacheController
func newControllerInner(controller *CacheController,
recorder events.Recorder,
crbInformer rbacv1.ClusterRoleBindingInformer,
rbInformer rbacv1.RoleBindingInformer,
crInformer rbacv1.ClusterRoleInformer,
rInformer rbacv1.RoleInformer,
) factory.Controller {
err := crbInformer.Informer().AddIndexers(
cache.Indexers{
byClusterRole: crbIndexByClusterRole,
},
)
if err != nil {
utilruntime.HandleError(err)
}
err = rbInformer.Informer().AddIndexers(
cache.Indexers{
byClusterRole: rbIndexByClusterRole,
byRole: rbIndexByRole,
},
)
if err != nil {
utilruntime.HandleError(err)
}
cacheControllerName := "ManifestWorkExecutorCache"
syncCtx := factory.NewSyncContext(cacheControllerName, recorder)
rbInformer.Informer().AddEventHandler(&roleBindingEventHandler{
enqueueUpsertFunc: controller.bindingResourceUpsertEnqueueFn(syncCtx),
enqueueDeleteFunc: controller.bindingResourceDeleteEnqueueFn(syncCtx),
})
crbInformer.Informer().AddEventHandler(&clusterRoleBindingEventHandler{
enqueueUpsertFunc: controller.bindingResourceUpsertEnqueueFn(syncCtx),
enqueueDeleteFunc: controller.bindingResourceDeleteEnqueueFn(syncCtx),
})
return factory.New().
WithSyncContext(syncCtx).
WithInformersQueueKeysFunc(
controller.roleEnqueueFu(rbInformer.Informer().GetIndexer()),
rInformer.Informer()).
WithInformersQueueKeysFunc(
controller.clusterRoleEnqueueFu(rbInformer.Informer().GetIndexer(), crbInformer.Informer().GetIndexer()),
crInformer.Informer()).
WithBareInformers(rbInformer.Informer(), crbInformer.Informer()).
WithSync(controller.sync).
ResyncEvery(ResyncInterval). // cleanup unnecessary cache every ResyncInterval
ToController(cacheControllerName, recorder)
}
func (c *CacheController) roleEnqueueFu(rbIndexer cache.Indexer) func(runtime.Object) []string {
return func(obj runtime.Object) []string {
accessor, _ := meta.Accessor(obj)
ret := make([]string, 0)
roleKey := fmt.Sprintf("%s/%s", accessor.GetNamespace(), accessor.GetName())
items, err := rbIndexer.ByIndex(byRole, roleKey)
if err != nil {
klog.V(4).Infof("RoleBinding indexer get RoleBinding by %s index %s error: %v", byRole, roleKey, err)
} else {
for _, item := range items {
if rb, ok := item.(*rbacapiv1.RoleBinding); ok {
executors := getInterestedExecutors(rb.Subjects, c.executorCaches)
ret = append(ret, executors...)
}
}
}
return ret
}
}
func (c *CacheController) clusterRoleEnqueueFu(
rbIndexer cache.Indexer, crbIndexer cache.Indexer) func(runtime.Object) []string {
return func(obj runtime.Object) []string {
accessor, _ := meta.Accessor(obj)
ret := make([]string, 0)
clusterRoleKey := accessor.GetName()
items, err := rbIndexer.ByIndex(byRole, clusterRoleKey)
if err != nil {
klog.V(4).Infof("RoleBinding indexer get RoleBinding by %s index %s error: %v",
byRole, clusterRoleKey, err)
} else {
for _, item := range items {
if rb, ok := item.(*rbacapiv1.RoleBinding); ok {
executors := getInterestedExecutors(rb.Subjects, c.executorCaches)
ret = append(ret, executors...)
}
}
}
items, err = crbIndexer.ByIndex(byClusterRole, clusterRoleKey)
if err != nil {
klog.V(4).Infof("ClusterRoleBinding indexer get ClusterRoleBinding by %s index %s error: %v",
byClusterRole, clusterRoleKey, err)
} else {
for _, item := range items {
if crb, ok := item.(*rbacapiv1.ClusterRoleBinding); ok {
executors := getInterestedExecutors(crb.Subjects, c.executorCaches)
ret = append(ret, executors...)
}
}
}
return ret
}
}
func (c *CacheController) bindingResourceUpsertEnqueueFn(
syncCtx factory.SyncContext) func(key string, subjects []rbacapiv1.Subject) {
return func(key string, subjects []rbacapiv1.Subject) {
executors := getInterestedExecutors(subjects, c.executorCaches)
for _, executor := range executors {
syncCtx.Queue().Add(executor)
}
if len(executors) > 0 {
c.bindingExecutorsMapper.upsert(key, executors)
klog.V(4).Infof("Binding executor mapper upsert key %s executors %s", key, executors)
}
}
}
func (c *CacheController) bindingResourceDeleteEnqueueFn(
syncCtx factory.SyncContext) func(key string, subjects []rbacapiv1.Subject) {
return func(key string, subjects []rbacapiv1.Subject) {
enqueued := false
if subjects != nil {
for _, executor := range getInterestedExecutors(subjects, c.executorCaches) {
syncCtx.Queue().Add(executor)
enqueued = true
}
} else {
for _, executor := range c.bindingExecutorsMapper.get(key) {
syncCtx.Queue().Add(executor)
enqueued = true
klog.V(4).Infof("Deletion event, enqueue executor %s from binding executor mapper key %s", executor, key)
}
}
if enqueued {
c.bindingExecutorsMapper.delete(key)
klog.V(4).Infof("Binding executor mapper delete key %s", key)
}
}
}
func getInterestedExecutors(subjects []rbacapiv1.Subject, executorCaches *store.ExecutorCaches) []string {
executors := make([]string, 0)
for _, subject := range subjects {
if subject.Kind == "ServiceAccount" {
executor := store.ExecutorKey(subject.Namespace, subject.Name)
if ok := executorCaches.DimensionCachesExists(executor); ok {
executors = append(executors, executor)
}
}
}
return executors
}
// sync is the main reconcile loop for executors. It is triggered when RBAC resources(
// role, rolebinding, clusterrole, clusterrolebinding) for the executor changed
func (c *CacheController) sync(ctx context.Context, controllerContext factory.SyncContext) error {
executorKey := controllerContext.QueueKey()
klog.V(4).Infof("Executor cache sync, executorKey: %v", executorKey)
if executorKey == "key" {
// cleanup unnecessary cache
klog.V(4).Infof("There are %v cache items before cleanup", c.executorCaches.Count())
c.cleanupUnnecessaryCache()
klog.V(4).Infof("There are %v cache items after cleanup", c.executorCaches.Count())
return nil
}
saNamespace, saName, err := cache.SplitMetaNamespaceKey(executorKey)
if err != nil {
// ignore executor whose key is not in format: namespace/name
return nil
}
c.executorCaches.IterateCacheItems(executorKey, c.iterateCacheItemsFn(ctx, executorKey, saNamespace, saName))
return nil
}
func (c *CacheController) iterateCacheItemsFn(ctx context.Context,
executorKey, saNamespace, saName string) func(v store.CacheValue) error {
return func(v store.CacheValue) error {
err := c.sarCheckerFn(ctx, &workapiv1.ManifestWorkSubjectServiceAccount{
Namespace: saNamespace,
Name: saName,
}, schema.GroupVersionResource{
Group: v.Dimension.Group,
Version: v.Dimension.Version,
Resource: v.Dimension.Resource,
},
v.Dimension.Namespace, v.Dimension.Name, store.GetOwnedByWork(v.Dimension.ExecuteAction))
klog.V(4).Infof("Update executor cache for executorKey: %s, dimension: %+v result: %v",
executorKey, v.Dimension, err)
updateSARCheckResultToCache(c.executorCaches, executorKey, v.Dimension, err)
return nil
}
}
func (c *CacheController) cleanupUnnecessaryCache() {
// first, need to load all valuable caches in the current state cluster into an executor cache data
// structure, so we know which caches should be retained, then compare them with existing caches
// and clear unneeded cache items
retainableCache := store.NewExecutorCache()
c.manifestWorkExecutorCachesLoader.loadAllValuableCaches(retainableCache)
c.executorCaches.CleanupUnnecessaryCaches(retainableCache)
}
type safeMap struct {
lock sync.RWMutex
items map[string][]string
}
func (m *safeMap) upsert(k string, v []string) {
m.lock.Lock()
defer m.lock.Unlock()
m.items[k] = v
}
func (m *safeMap) delete(k string) {
m.lock.Lock()
defer m.lock.Unlock()
delete(m.items, k)
}
func (m *safeMap) get(k string) []string {
m.lock.RLock()
defer m.lock.RUnlock()
return m.items[k]
}
func (m *safeMap) count() int {
m.lock.RLock()
defer m.lock.RUnlock()
return len(m.items)
}