Support for multi node in shared mode (#269)

* Support for multi node in shared mode

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* Fixing typo

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* Fixes

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

---------

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>
This commit is contained in:
Hussein Galal
2025-02-26 22:55:31 +02:00
committed by GitHub
parent 29438121ba
commit ec0e5a4a87
9 changed files with 204 additions and 35 deletions

1
go.mod
View File

@@ -32,6 +32,7 @@ require (
k8s.io/apiserver v0.29.11
k8s.io/client-go v0.29.11
k8s.io/component-base v0.29.11
k8s.io/component-helpers v0.29.11
k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738
sigs.k8s.io/controller-runtime v0.17.5
)

2
go.sum
View File

@@ -2019,6 +2019,8 @@ k8s.io/client-go v0.29.11 h1:mBX7Ub0uqpLMwWz3J/AGS/xKOZsjr349qZ1vxVoL1l8=
k8s.io/client-go v0.29.11/go.mod h1:WOEoi/eLg2YEg3/yEd7YK3CNScYkM8AEScQadxUnaTE=
k8s.io/component-base v0.29.11 h1:H3GJIyDNPrscvXGP6wx+9gApcwwmrUd0YtCGp5BcHBA=
k8s.io/component-base v0.29.11/go.mod h1:0qu1WStER4wu5o8RMRndZUWPVcPH1XBy/QQiDcD6lew=
k8s.io/component-helpers v0.29.11 h1:GdZaSLBLlCa+EzjAnpZ4fGB75rA3qqPLLZKk+CsqNyo=
k8s.io/component-helpers v0.29.11/go.mod h1:gloyih9IiE4Qy/7iLUXqAmxYSUduuIpMCiNYuHfYvD4=
k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=
k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
k8s.io/kms v0.29.11 h1:pylaiDJhgfqczvcjMDPI89+VH0OVoGQhscPH1VbBzQE=

View File

@@ -63,7 +63,6 @@ func (r *PVCReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r
log := r.logger.With("Cluster", r.clusterName, "PersistentVolumeClaim", req.NamespacedName)
var (
virtPVC v1.PersistentVolumeClaim
hostPVC v1.PersistentVolumeClaim
cluster v1alpha1.Cluster
)
if err := r.hostClient.Get(ctx, types.NamespacedName{Name: r.clusterName, Namespace: r.clusterNamespace}, &cluster); err != nil {
@@ -93,24 +92,17 @@ func (r *PVCReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r
return reconcile.Result{}, nil
}
// getting the cluster for setting the controller reference
// Add finalizer if it does not exist
if controllerutil.AddFinalizer(&virtPVC, pvcFinalizerName) {
if err := r.virtualClient.Update(ctx, &virtPVC); err != nil {
return reconcile.Result{}, err
}
}
// create or update the pvc on host
if err := r.hostClient.Get(ctx, types.NamespacedName{Name: syncedPVC.Name, Namespace: r.clusterNamespace}, &hostPVC); err != nil {
if apierrors.IsNotFound(err) {
log.Info("creating the persistent volume for the first time on the host cluster")
return reconcile.Result{}, r.hostClient.Create(ctx, syncedPVC)
}
return reconcile.Result{}, err
}
log.Info("updating pvc on the host cluster")
return reconcile.Result{}, r.hostClient.Update(ctx, syncedPVC)
// create the pvc on host
log.Info("creating the persistent volume for the first time on the host cluster")
// note that we dont need to update the PVC on the host cluster, only syncing the PVC to allow being
// handled by the host cluster.
return reconcile.Result{}, ctrlruntimeclient.IgnoreAlreadyExists(r.hostClient.Create(ctx, syncedPVC))
}

View File

@@ -0,0 +1,165 @@
package controller
import (
"context"
"github.com/rancher/k3k/k3k-kubelet/translate"
"github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1"
"github.com/rancher/k3k/pkg/log"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/component-helpers/storage/volume"
ctrl "sigs.k8s.io/controller-runtime"
ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
const (
podController = "pod-pvc-controller"
pseudoPVLabel = "pod.k3k.io/pseudoPV"
)
type PodReconciler struct {
virtualClient ctrlruntimeclient.Client
hostClient ctrlruntimeclient.Client
clusterName string
clusterNamespace string
Scheme *runtime.Scheme
HostScheme *runtime.Scheme
logger *log.Logger
Translator translate.ToHostTranslator
}
// AddPodPVCController adds pod controller to k3k-kubelet
func AddPodPVCController(ctx context.Context, virtMgr, hostMgr manager.Manager, clusterName, clusterNamespace string, logger *log.Logger) error {
translator := translate.ToHostTranslator{
ClusterName: clusterName,
ClusterNamespace: clusterNamespace,
}
// initialize a new Reconciler
reconciler := PodReconciler{
virtualClient: virtMgr.GetClient(),
hostClient: hostMgr.GetClient(),
Scheme: virtMgr.GetScheme(),
HostScheme: hostMgr.GetScheme(),
logger: logger.Named(podController),
Translator: translator,
clusterName: clusterName,
clusterNamespace: clusterNamespace,
}
return ctrl.NewControllerManagedBy(virtMgr).
For(&v1.Pod{}).
WithOptions(controller.Options{
MaxConcurrentReconciles: maxConcurrentReconciles,
}).
Complete(&reconciler)
}
func (r *PodReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
log := ctrl.LoggerFrom(ctx).WithValues("cluster", r.clusterName, "clusterNamespace", r.clusterNamespace)
var (
virtPod v1.Pod
cluster v1alpha1.Cluster
)
if err := r.hostClient.Get(ctx, types.NamespacedName{Name: r.clusterName, Namespace: r.clusterNamespace}, &cluster); err != nil {
return reconcile.Result{}, err
}
// handling pod
if err := r.virtualClient.Get(ctx, req.NamespacedName, &virtPod); err != nil {
return reconcile.Result{}, ctrlruntimeclient.IgnoreNotFound(err)
}
// reconcile pods with pvcs
for _, vol := range virtPod.Spec.Volumes {
if vol.PersistentVolumeClaim != nil {
log.Info("Handling pod with pvc")
if err := r.reconcilePodWithPVC(ctx, &virtPod, vol.PersistentVolumeClaim); err != nil {
return reconcile.Result{}, err
}
}
}
return reconcile.Result{}, nil
}
// reconcilePodWithPVC will make sure to create a fake PV for each PVC for any pod so that it can be scheduled on the virtual-kubelet
// and then created on the host, the PV is not synced to the host cluster.
func (r *PodReconciler) reconcilePodWithPVC(ctx context.Context, pod *v1.Pod, pvcSource *v1.PersistentVolumeClaimVolumeSource) error {
log := ctrl.LoggerFrom(ctx).WithValues("PersistentVolumeClaim", pvcSource.ClaimName)
var (
pvc v1.PersistentVolumeClaim
)
if err := r.virtualClient.Get(ctx, types.NamespacedName{Name: pvcSource.ClaimName, Namespace: pod.Namespace}, &pvc); err != nil {
return ctrlruntimeclient.IgnoreNotFound(err)
}
log.Info("Creating pseudo Persistent Volume")
pv := r.pseudoPV(&pvc)
if err := r.virtualClient.Create(ctx, pv); err != nil {
return ctrlruntimeclient.IgnoreAlreadyExists(err)
}
orig := pv.DeepCopy()
pv.Status = v1.PersistentVolumeStatus{
Phase: v1.VolumeBound,
}
if err := r.virtualClient.Status().Patch(ctx, pv, ctrlruntimeclient.MergeFrom(orig)); err != nil {
return err
}
log.Info("Patch the status of PersistentVolumeClaim to Bound")
pvcPatch := pvc.DeepCopy()
if pvcPatch.Annotations == nil {
pvcPatch.Annotations = make(map[string]string)
}
pvcPatch.Annotations[volume.AnnBoundByController] = "yes"
pvcPatch.Annotations[volume.AnnBindCompleted] = "yes"
pvcPatch.Status.Phase = v1.ClaimBound
pvcPatch.Status.AccessModes = pvcPatch.Spec.AccessModes
return r.virtualClient.Status().Update(ctx, pvcPatch)
}
func (r *PodReconciler) pseudoPV(obj *v1.PersistentVolumeClaim) *v1.PersistentVolume {
storageClass := ""
if obj.Spec.StorageClassName != nil {
storageClass = *obj.Spec.StorageClassName
}
return &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: obj.Name,
Labels: map[string]string{
pseudoPVLabel: "true",
},
Annotations: map[string]string{
volume.AnnBoundByController: "true",
volume.AnnDynamicallyProvisioned: "k3k-kubelet",
},
},
TypeMeta: metav1.TypeMeta{
Kind: "PersistentVolume",
APIVersion: "v1",
},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
FlexVolume: &v1.FlexPersistentVolumeSource{
Driver: "pseudopv",
},
},
StorageClassName: storageClass,
VolumeMode: obj.Spec.VolumeMode,
PersistentVolumeReclaimPolicy: v1.PersistentVolumeReclaimDelete,
AccessModes: obj.Spec.AccessModes,
Capacity: obj.Spec.Resources.Requests,
ClaimRef: &v1.ObjectReference{
APIVersion: obj.APIVersion,
UID: obj.UID,
ResourceVersion: obj.ResourceVersion,
Kind: obj.Kind,
Namespace: obj.Namespace,
Name: obj.Name,
},
},
}
}

View File

@@ -32,7 +32,6 @@ const (
type webhookHandler struct {
client ctrlruntimeclient.Client
scheme *runtime.Scheme
nodeName string
serviceName string
clusterName string
clusterNamespace string
@@ -42,7 +41,7 @@ type webhookHandler struct {
// AddPodMutatorWebhook will add a mutator webhook to the virtual cluster to
// modify the nodeName of the created pods with the name of the virtual kubelet node name
// as well as remove any status fields of the downward apis env fields
func AddPodMutatorWebhook(ctx context.Context, mgr manager.Manager, hostClient ctrlruntimeclient.Client, clusterName, clusterNamespace, nodeName, serviceName string, logger *log.Logger) error {
func AddPodMutatorWebhook(ctx context.Context, mgr manager.Manager, hostClient ctrlruntimeclient.Client, clusterName, clusterNamespace, serviceName string, logger *log.Logger) error {
handler := webhookHandler{
client: mgr.GetClient(),
scheme: mgr.GetScheme(),
@@ -50,7 +49,6 @@ func AddPodMutatorWebhook(ctx context.Context, mgr manager.Manager, hostClient c
serviceName: serviceName,
clusterName: clusterName,
clusterNamespace: clusterNamespace,
nodeName: nodeName,
}
// create mutator webhook configuration to the cluster
@@ -73,9 +71,6 @@ func (w *webhookHandler) Default(ctx context.Context, obj runtime.Object) error
return fmt.Errorf("invalid request: object was type %t not cluster", obj)
}
w.logger.Infow("mutator webhook request", "Pod", pod.Name, "Namespace", pod.Namespace)
if pod.Spec.NodeName == "" {
pod.Spec.NodeName = w.nodeName
}
// look for status.* fields in the env
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)

View File

@@ -93,7 +93,10 @@ func newKubelet(ctx context.Context, c *config, logger *k3klog.Logger) (*kubelet
}
hostMgr, err := ctrl.NewManager(hostConfig, manager.Options{
Scheme: baseScheme,
Scheme: baseScheme,
LeaderElection: true,
LeaderElectionNamespace: c.ClusterNamespace,
LeaderElectionID: c.ClusterName,
Metrics: ctrlserver.Options{
BindAddress: ":8083",
},
@@ -117,8 +120,11 @@ func newKubelet(ctx context.Context, c *config, logger *k3klog.Logger) (*kubelet
CertDir: "/opt/rancher/k3k-webhook",
})
virtualMgr, err := ctrl.NewManager(virtConfig, manager.Options{
Scheme: virtualScheme,
WebhookServer: webhookServer,
Scheme: virtualScheme,
WebhookServer: webhookServer,
LeaderElection: true,
LeaderElectionNamespace: "kube-system",
LeaderElectionID: c.ClusterName,
Metrics: ctrlserver.Options{
BindAddress: ":8084",
},
@@ -127,7 +133,7 @@ func newKubelet(ctx context.Context, c *config, logger *k3klog.Logger) (*kubelet
return nil, errors.New("unable to create controller-runtime mgr for virtual cluster: " + err.Error())
}
logger.Info("adding pod mutator webhook")
if err := k3kwebhook.AddPodMutatorWebhook(ctx, virtualMgr, hostClient, c.ClusterName, c.ClusterNamespace, c.AgentHostname, c.ServiceName, logger); err != nil {
if err := k3kwebhook.AddPodMutatorWebhook(ctx, virtualMgr, hostClient, c.ClusterName, c.ClusterNamespace, c.ServiceName, logger); err != nil {
return nil, errors.New("unable to add pod mutator webhook for virtual cluster: " + err.Error())
}
@@ -141,6 +147,11 @@ func newKubelet(ctx context.Context, c *config, logger *k3klog.Logger) (*kubelet
return nil, errors.New("failed to add pvc syncer controller: " + err.Error())
}
logger.Info("adding pod pvc controller")
if err := k3kkubeletcontroller.AddPodPVCController(ctx, virtualMgr, hostMgr, c.ClusterName, c.ClusterNamespace, k3klog.New(false)); err != nil {
return nil, errors.New("failed to add pod pvc controller: " + err.Error())
}
clusterIP, err := clusterIP(ctx, c.ServiceName, c.ClusterNamespace, hostClient)
if err != nil {
return nil, errors.New("failed to extract the clusterIP for the server service: " + err.Error())

View File

@@ -8,7 +8,6 @@ import (
"github.com/rancher/k3k/pkg/controller"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)
@@ -47,7 +46,7 @@ func ensureObject(ctx context.Context, cfg *Config, obj ctrlruntimeclient.Object
})
if result != controllerutil.OperationResultNone {
key := client.ObjectKeyFromObject(obj)
key := ctrlruntimeclient.ObjectKeyFromObject(obj)
log.Info(fmt.Sprintf("ensuring %T", obj), "key", key, "result", result)
}

View File

@@ -19,7 +19,6 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"sigs.k8s.io/controller-runtime/pkg/client"
ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client"
)
@@ -58,11 +57,11 @@ func (s *SharedAgent) EnsureResources(ctx context.Context) error {
s.role(ctx),
s.roleBinding(ctx),
s.service(ctx),
s.deployment(ctx),
s.daemonset(ctx),
s.dnsService(ctx),
s.webhookTLS(ctx),
); err != nil {
return fmt.Errorf("failed to ensure some resources: %w\n", err)
return fmt.Errorf("failed to ensure some resources: %w", err)
}
return nil
@@ -106,16 +105,16 @@ version: %s`,
cluster.Name, cluster.Namespace, ip, serviceName, token, version)
}
func (s *SharedAgent) deployment(ctx context.Context) error {
func (s *SharedAgent) daemonset(ctx context.Context) error {
labels := map[string]string{
"cluster": s.cluster.Name,
"type": "agent",
"mode": "shared",
}
deploy := &apps.Deployment{
deploy := &apps.DaemonSet{
TypeMeta: metav1.TypeMeta{
Kind: "Deployment",
Kind: "DaemonSet",
APIVersion: "apps/v1",
},
ObjectMeta: metav1.ObjectMeta{
@@ -123,7 +122,7 @@ func (s *SharedAgent) deployment(ctx context.Context) error {
Namespace: s.cluster.Namespace,
Labels: labels,
},
Spec: apps.DeploymentSpec{
Spec: apps.DaemonSetSpec{
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
@@ -141,9 +140,9 @@ func (s *SharedAgent) deployment(ctx context.Context) error {
func (s *SharedAgent) podSpec() v1.PodSpec {
var limit v1.ResourceList
return v1.PodSpec{
ServiceAccountName: s.Name(),
NodeSelector: s.cluster.Spec.NodeSelector,
Volumes: []v1.Volume{
{
Name: "config",
@@ -345,6 +344,11 @@ func (s *SharedAgent) role(ctx context.Context) error {
Resources: []string{"clusters"},
Verbs: []string{"get", "watch", "list"},
},
{
APIGroups: []string{"coordination.k8s.io"},
Resources: []string{"leases"},
Verbs: []string{"*"},
},
},
}
@@ -390,7 +394,7 @@ func (s *SharedAgent) webhookTLS(ctx context.Context) error {
},
}
key := client.ObjectKeyFromObject(webhookSecret)
key := ctrlruntimeclient.ObjectKeyFromObject(webhookSecret)
if err := s.client.Get(ctx, key, webhookSecret); err != nil {
if !apierrors.IsNotFound(err) {
return err

View File

@@ -41,7 +41,7 @@ func (v *VirtualAgent) EnsureResources(ctx context.Context) error {
v.config(ctx),
v.deployment(ctx),
); err != nil {
return fmt.Errorf("failed to ensure some resources: %w\n", err)
return fmt.Errorf("failed to ensure some resources: %w", err)
}
return nil