mirror of
https://github.com/rancher/k3k.git
synced 2026-04-10 12:46:59 +00:00
* Add initial affinity to podspecs * Fix go generate * Add field to the policy and prioritize it over the cluster spec * Fix linter issue * Add docs * Address comments * Fix the tests and improve the field descriptions * Fix formatter issues * Change logs to info level * run validation * undo pandoc changes
496 lines
14 KiB
Go
496 lines
14 KiB
Go
package policy
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"reflect"
|
|
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/client-go/util/workqueue"
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
"sigs.k8s.io/controller-runtime/pkg/controller"
|
|
"sigs.k8s.io/controller-runtime/pkg/event"
|
|
"sigs.k8s.io/controller-runtime/pkg/handler"
|
|
"sigs.k8s.io/controller-runtime/pkg/manager"
|
|
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
|
|
|
v1 "k8s.io/api/core/v1"
|
|
networkingv1 "k8s.io/api/networking/v1"
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
ctrl "sigs.k8s.io/controller-runtime"
|
|
|
|
"github.com/rancher/k3k/pkg/apis/k3k.io/v1beta1"
|
|
k3kcontroller "github.com/rancher/k3k/pkg/controller"
|
|
)
|
|
|
|
const (
|
|
PolicyNameLabelKey = "policy.k3k.io/policy-name"
|
|
ManagedByLabelKey = "app.kubernetes.io/managed-by"
|
|
VirtualPolicyControllerName = "k3k-policy-controller"
|
|
)
|
|
|
|
type VirtualClusterPolicyReconciler struct {
|
|
Client client.Client
|
|
Scheme *runtime.Scheme
|
|
ClusterCIDR string
|
|
}
|
|
|
|
// Add the controller to manage the Virtual Cluster policies
|
|
func Add(mgr manager.Manager, clusterCIDR string, maxConcurrentReconciles int) error {
|
|
reconciler := VirtualClusterPolicyReconciler{
|
|
Client: mgr.GetClient(),
|
|
Scheme: mgr.GetScheme(),
|
|
ClusterCIDR: clusterCIDR,
|
|
}
|
|
|
|
return ctrl.NewControllerManagedBy(mgr).
|
|
For(&v1beta1.VirtualClusterPolicy{}).
|
|
Watches(&v1.Namespace{}, namespaceEventHandler()).
|
|
Watches(&v1.Node{}, nodeEventHandler(&reconciler)).
|
|
Watches(&v1beta1.Cluster{}, clusterEventHandler(&reconciler)).
|
|
Owns(&networkingv1.NetworkPolicy{}).
|
|
Owns(&v1.ResourceQuota{}).
|
|
Owns(&v1.LimitRange{}).
|
|
WithOptions(controller.Options{MaxConcurrentReconciles: maxConcurrentReconciles}).
|
|
Complete(&reconciler)
|
|
}
|
|
|
|
// namespaceEventHandler will enqueue a reconciliation of VCP when a Namespace changes
|
|
func namespaceEventHandler() handler.Funcs {
|
|
return handler.Funcs{
|
|
// When a Namespace is created, if it has the "policy.k3k.io/policy-name" label
|
|
CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
|
ns, ok := e.Object.(*v1.Namespace)
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
if ns.Labels[PolicyNameLabelKey] != "" {
|
|
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Name: ns.Labels[PolicyNameLabelKey]}})
|
|
}
|
|
},
|
|
// When a Namespace is updated, if it has the "policy.k3k.io/policy-name" label
|
|
UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
|
oldNs, okOld := e.ObjectOld.(*v1.Namespace)
|
|
newNs, okNew := e.ObjectNew.(*v1.Namespace)
|
|
|
|
if !okOld || !okNew {
|
|
return
|
|
}
|
|
|
|
oldVCPName := oldNs.Labels[PolicyNameLabelKey]
|
|
newVCPName := newNs.Labels[PolicyNameLabelKey]
|
|
|
|
// If labels haven't changed we can skip the reconciliation
|
|
if reflect.DeepEqual(oldNs.Labels, newNs.Labels) {
|
|
return
|
|
}
|
|
|
|
// If No VCP before and after we can skip the reconciliation
|
|
if oldVCPName == "" && newVCPName == "" {
|
|
return
|
|
}
|
|
|
|
// The VCP has not changed, but we enqueue a reconciliation because the PSA or other labels have changed
|
|
if oldVCPName == newVCPName {
|
|
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Name: oldVCPName}})
|
|
return
|
|
}
|
|
|
|
// Enqueue the old VCP name for cleanup
|
|
if oldVCPName != "" {
|
|
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Name: oldVCPName}})
|
|
}
|
|
|
|
// Enqueue the new VCP name
|
|
if newVCPName != "" {
|
|
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Name: newVCPName}})
|
|
}
|
|
},
|
|
// When a namespace is deleted all the resources in the namespace are deleted
|
|
// but we trigger the reconciliation to eventually perform some cluster-wide cleanup if necessary
|
|
DeleteFunc: func(ctx context.Context, e event.DeleteEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
|
ns, ok := e.Object.(*v1.Namespace)
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
if ns.Labels[PolicyNameLabelKey] != "" {
|
|
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Name: ns.Labels[PolicyNameLabelKey]}})
|
|
}
|
|
},
|
|
}
|
|
}
|
|
|
|
// nodeEventHandler will enqueue a reconciliation of all the VCPs when a Node changes.
|
|
// This happens only if the ClusterCIDR is NOT specified, to handle the PodCIDRs in the NetworkPolicies.
|
|
func nodeEventHandler(r *VirtualClusterPolicyReconciler) handler.Funcs {
|
|
// enqueue all the available VirtualClusterPolicies
|
|
enqueueAllVCPs := func(ctx context.Context, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
|
vcpList := &v1beta1.VirtualClusterPolicyList{}
|
|
if err := r.Client.List(ctx, vcpList); err != nil {
|
|
return
|
|
}
|
|
|
|
for _, vcp := range vcpList.Items {
|
|
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Name: vcp.Name}})
|
|
}
|
|
}
|
|
|
|
return handler.Funcs{
|
|
CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
|
if r.ClusterCIDR != "" {
|
|
return
|
|
}
|
|
|
|
enqueueAllVCPs(ctx, q)
|
|
},
|
|
UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
|
if r.ClusterCIDR != "" {
|
|
return
|
|
}
|
|
|
|
oldNode, okOld := e.ObjectOld.(*v1.Node)
|
|
newNode, okNew := e.ObjectNew.(*v1.Node)
|
|
|
|
if !okOld || !okNew {
|
|
return
|
|
}
|
|
|
|
// Check if PodCIDR or PodCIDRs fields have changed.
|
|
|
|
var podCIDRChanged bool
|
|
if oldNode.Spec.PodCIDR != newNode.Spec.PodCIDR {
|
|
podCIDRChanged = true
|
|
}
|
|
|
|
if !reflect.DeepEqual(oldNode.Spec.PodCIDRs, newNode.Spec.PodCIDRs) {
|
|
podCIDRChanged = true
|
|
}
|
|
|
|
if podCIDRChanged {
|
|
enqueueAllVCPs(ctx, q)
|
|
}
|
|
},
|
|
DeleteFunc: func(ctx context.Context, e event.DeleteEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
|
if r.ClusterCIDR != "" {
|
|
return
|
|
}
|
|
|
|
enqueueAllVCPs(ctx, q)
|
|
},
|
|
}
|
|
}
|
|
|
|
// clusterEventHandler will enqueue a reconciliation of the VCP associated to the Namespace when a Cluster changes.
|
|
func clusterEventHandler(r *VirtualClusterPolicyReconciler) handler.Funcs {
|
|
type clusterSubSpec struct {
|
|
PriorityClass string
|
|
NodeSelector map[string]string
|
|
}
|
|
|
|
return handler.Funcs{
|
|
// When a Cluster is created, if its Namespace has the "policy.k3k.io/policy-name" label
|
|
CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
|
cluster, ok := e.Object.(*v1beta1.Cluster)
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
var ns v1.Namespace
|
|
if err := r.Client.Get(ctx, types.NamespacedName{Name: cluster.Namespace}, &ns); err != nil {
|
|
return
|
|
}
|
|
|
|
if ns.Labels[PolicyNameLabelKey] != "" {
|
|
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Name: ns.Labels[PolicyNameLabelKey]}})
|
|
}
|
|
},
|
|
// When a Cluster is updated, if its Namespace has the "policy.k3k.io/policy-name" label
|
|
// and if some of its spec influenced by the policy changed
|
|
UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
|
oldCluster, okOld := e.ObjectOld.(*v1beta1.Cluster)
|
|
newCluster, okNew := e.ObjectNew.(*v1beta1.Cluster)
|
|
|
|
if !okOld || !okNew {
|
|
return
|
|
}
|
|
|
|
var ns v1.Namespace
|
|
if err := r.Client.Get(ctx, types.NamespacedName{Name: oldCluster.Namespace}, &ns); err != nil {
|
|
return
|
|
}
|
|
|
|
if ns.Labels[PolicyNameLabelKey] == "" {
|
|
return
|
|
}
|
|
|
|
clusterSubSpecOld := clusterSubSpec{
|
|
PriorityClass: oldCluster.Spec.PriorityClass,
|
|
NodeSelector: oldCluster.Spec.NodeSelector,
|
|
}
|
|
|
|
clusterSubSpecNew := clusterSubSpec{
|
|
PriorityClass: newCluster.Spec.PriorityClass,
|
|
NodeSelector: newCluster.Spec.NodeSelector,
|
|
}
|
|
|
|
if !reflect.DeepEqual(clusterSubSpecOld, clusterSubSpecNew) {
|
|
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Name: ns.Labels[PolicyNameLabelKey]}})
|
|
}
|
|
},
|
|
// When a Cluster is deleted -> nothing to do
|
|
DeleteFunc: func(ctx context.Context, e event.DeleteEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
|
},
|
|
}
|
|
}
|
|
|
|
func (c *VirtualClusterPolicyReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
|
|
log := ctrl.LoggerFrom(ctx)
|
|
log.Info("Reconciling VirtualClusterPolicy")
|
|
|
|
var policy v1beta1.VirtualClusterPolicy
|
|
if err := c.Client.Get(ctx, req.NamespacedName, &policy); err != nil {
|
|
return reconcile.Result{}, client.IgnoreNotFound(err)
|
|
}
|
|
|
|
orig := policy.DeepCopy()
|
|
|
|
reconcilerErr := c.reconcileVirtualClusterPolicy(ctx, &policy)
|
|
|
|
// update Status if needed
|
|
if !reflect.DeepEqual(orig.Status, policy.Status) {
|
|
log.Info("Updating VirtualClusterPolicy Status")
|
|
|
|
if err := c.Client.Status().Update(ctx, &policy); err != nil {
|
|
return reconcile.Result{}, err
|
|
}
|
|
}
|
|
|
|
// if there was an error during the reconciliation, return
|
|
if reconcilerErr != nil {
|
|
return reconcile.Result{}, reconcilerErr
|
|
}
|
|
|
|
// update VirtualClusterPolicy if needed
|
|
if !reflect.DeepEqual(orig, policy) {
|
|
log.Info("Updating VirtualClusterPolicy")
|
|
|
|
if err := c.Client.Update(ctx, &policy); err != nil {
|
|
return reconcile.Result{}, err
|
|
}
|
|
}
|
|
|
|
return reconcile.Result{}, nil
|
|
}
|
|
|
|
func (c *VirtualClusterPolicyReconciler) reconcileVirtualClusterPolicy(ctx context.Context, policy *v1beta1.VirtualClusterPolicy) error {
|
|
if err := c.reconcileMatchingNamespaces(ctx, policy); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := c.cleanupNamespaces(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *VirtualClusterPolicyReconciler) reconcileMatchingNamespaces(ctx context.Context, policy *v1beta1.VirtualClusterPolicy) error {
|
|
log := ctrl.LoggerFrom(ctx)
|
|
log.V(1).Info("Reconciling matching Namespaces")
|
|
|
|
listOpts := client.MatchingLabels{
|
|
PolicyNameLabelKey: policy.Name,
|
|
}
|
|
|
|
var namespaces v1.NamespaceList
|
|
if err := c.Client.List(ctx, &namespaces, listOpts); err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, ns := range namespaces.Items {
|
|
log = log.WithValues("namespace", ns.Name)
|
|
ctx = ctrl.LoggerInto(ctx, log)
|
|
|
|
log.V(1).Info("Reconciling Namespace")
|
|
|
|
orig := ns.DeepCopy()
|
|
|
|
if err := c.reconcileNetworkPolicy(ctx, ns.Name, policy); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := c.reconcileQuota(ctx, ns.Name, policy); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := c.reconcileLimit(ctx, ns.Name, policy); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := c.reconcileClusters(ctx, &ns, policy); err != nil {
|
|
return err
|
|
}
|
|
|
|
c.reconcileNamespacePodSecurityLabels(ctx, &ns, policy)
|
|
|
|
if !reflect.DeepEqual(orig, &ns) {
|
|
log.Info("Updating Namespace")
|
|
|
|
if err := c.Client.Update(ctx, &ns); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *VirtualClusterPolicyReconciler) reconcileQuota(ctx context.Context, namespace string, policy *v1beta1.VirtualClusterPolicy) error {
|
|
log := ctrl.LoggerFrom(ctx)
|
|
log.V(1).Info("Reconciling ResourceQuota")
|
|
|
|
if policy.Spec.Quota == nil {
|
|
// check if resourceQuota object exists and deletes it.
|
|
var toDeleteResourceQuota v1.ResourceQuota
|
|
|
|
key := types.NamespacedName{
|
|
Name: k3kcontroller.SafeConcatNameWithPrefix(policy.Name),
|
|
Namespace: namespace,
|
|
}
|
|
|
|
if err := c.Client.Get(ctx, key, &toDeleteResourceQuota); err != nil {
|
|
return client.IgnoreNotFound(err)
|
|
}
|
|
|
|
log.V(1).Info("Deleting ResourceQuota")
|
|
|
|
return c.Client.Delete(ctx, &toDeleteResourceQuota)
|
|
}
|
|
|
|
// create/update resource Quota
|
|
resourceQuota := &v1.ResourceQuota{
|
|
TypeMeta: metav1.TypeMeta{
|
|
Kind: "ResourceQuota",
|
|
APIVersion: "v1",
|
|
},
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: k3kcontroller.SafeConcatNameWithPrefix(policy.Name),
|
|
Namespace: namespace,
|
|
Labels: map[string]string{
|
|
ManagedByLabelKey: VirtualPolicyControllerName,
|
|
PolicyNameLabelKey: policy.Name,
|
|
},
|
|
},
|
|
Spec: *policy.Spec.Quota,
|
|
}
|
|
|
|
if err := ctrl.SetControllerReference(policy, resourceQuota, c.Scheme); err != nil {
|
|
return err
|
|
}
|
|
|
|
log.V(1).Info("Creating ResourceQuota")
|
|
|
|
err := c.Client.Create(ctx, resourceQuota)
|
|
if apierrors.IsAlreadyExists(err) {
|
|
log.V(1).Info("ResourceQuota already exists, updating.")
|
|
|
|
return c.Client.Update(ctx, resourceQuota)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (c *VirtualClusterPolicyReconciler) reconcileLimit(ctx context.Context, namespace string, policy *v1beta1.VirtualClusterPolicy) error {
|
|
log := ctrl.LoggerFrom(ctx)
|
|
log.V(1).Info("Reconciling LimitRange")
|
|
|
|
// delete limitrange if spec.limits isnt specified.
|
|
if policy.Spec.Limit == nil {
|
|
var toDeleteLimitRange v1.LimitRange
|
|
|
|
key := types.NamespacedName{
|
|
Name: k3kcontroller.SafeConcatNameWithPrefix(policy.Name),
|
|
Namespace: namespace,
|
|
}
|
|
|
|
if err := c.Client.Get(ctx, key, &toDeleteLimitRange); err != nil {
|
|
return client.IgnoreNotFound(err)
|
|
}
|
|
|
|
log.V(1).Info("Deleting LimitRange")
|
|
|
|
return c.Client.Delete(ctx, &toDeleteLimitRange)
|
|
}
|
|
|
|
limitRange := &v1.LimitRange{
|
|
TypeMeta: metav1.TypeMeta{
|
|
Kind: "LimitRange",
|
|
APIVersion: "v1",
|
|
},
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: k3kcontroller.SafeConcatNameWithPrefix(policy.Name),
|
|
Namespace: namespace,
|
|
Labels: map[string]string{
|
|
ManagedByLabelKey: VirtualPolicyControllerName,
|
|
PolicyNameLabelKey: policy.Name,
|
|
},
|
|
},
|
|
Spec: *policy.Spec.Limit,
|
|
}
|
|
|
|
if err := ctrl.SetControllerReference(policy, limitRange, c.Scheme); err != nil {
|
|
return err
|
|
}
|
|
|
|
log.V(1).Info("Creating LimitRange")
|
|
|
|
err := c.Client.Create(ctx, limitRange)
|
|
if apierrors.IsAlreadyExists(err) {
|
|
log.V(1).Info("LimitRange already exists, updating.")
|
|
|
|
return c.Client.Update(ctx, limitRange)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (c *VirtualClusterPolicyReconciler) reconcileClusters(ctx context.Context, namespace *v1.Namespace, policy *v1beta1.VirtualClusterPolicy) error {
|
|
log := ctrl.LoggerFrom(ctx)
|
|
log.V(1).Info("Reconciling Clusters")
|
|
|
|
var clusters v1beta1.ClusterList
|
|
if err := c.Client.List(ctx, &clusters, client.InNamespace(namespace.Name)); err != nil {
|
|
return err
|
|
}
|
|
|
|
var clusterUpdateErrs []error
|
|
|
|
for _, cluster := range clusters.Items {
|
|
origStatus := cluster.Status.DeepCopy()
|
|
|
|
cluster.Status.Policy = &v1beta1.AppliedPolicy{
|
|
Name: policy.Name,
|
|
PriorityClass: &policy.Spec.DefaultPriorityClass,
|
|
NodeSelector: policy.Spec.DefaultNodeSelector,
|
|
Sync: policy.Spec.Sync,
|
|
ServerAffinity: policy.Spec.DefaultServerAffinity,
|
|
AgentAffinity: policy.Spec.DefaultAgentAffinity,
|
|
}
|
|
|
|
if !reflect.DeepEqual(origStatus, &cluster.Status) {
|
|
log.V(1).Info("Updating Cluster", "cluster", cluster.Name, "namespace", namespace.Name)
|
|
|
|
// continue updating also the other clusters even if an error occurred
|
|
if err := c.Client.Status().Update(ctx, &cluster); err != nil {
|
|
clusterUpdateErrs = append(clusterUpdateErrs, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
return errors.Join(clusterUpdateErrs...)
|
|
}
|