Schedule Pods in the same Node with a preferred affinity (#724)

* Add integration tests for Deployment and StatefulSet creation with PVC in shared cluster

* Add affinity settings for pod scheduling based on agent hostname

* increased timeout

* focus test

* rmeove cleanup

* check for existing pvc

* remove focus

* add affinity tests for Pods in shared cluster

* refactor restartServerPod to improve pod restart checks and timeout handling

* unfocus

* fix test description
This commit is contained in:
Enrico Candino
2026-03-31 11:22:10 +02:00
committed by GitHub
parent 93e8ab6d8f
commit ef2bb0339a
5 changed files with 498 additions and 11 deletions

View File

@@ -131,8 +131,19 @@ func (r *PVCReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r
}
}
var currentHostPVC v1.PersistentVolumeClaim
err := r.HostClient.Get(ctx, ctrlruntimeclient.ObjectKeyFromObject(syncedPVC), &currentHostPVC)
if err == nil {
log.V(1).Info("persistent volume claim already exist in the host cluster")
}
if !apierrors.IsNotFound(err) {
return reconcile.Result{}, err
}
// create the pvc on host
log.Info("creating the persistent volume claim for the first time on the host cluster")
log.Info("creating the persistent volume claim for the first time in the host cluster")
// note that we dont need to update the PVC on the host cluster, only syncing the PVC to allow being
// handled by the host cluster.

View File

@@ -397,8 +397,27 @@ func (p *Provider) createPod(ctx context.Context, pod *corev1.Pod) error {
logger = logger.WithValues("pod", hostPod.Name)
// Schedule the host pod in the same host node of the virtual kubelet
hostPod.Spec.NodeName = p.agentHostname
// Clear the NodeName to allow scheduling, and set affinity to prefer scheduling the Pod on the same host node as the virtual kubelet,
// unless the user has specified their own affinity, in which case the user's affinity is respected.
hostPod.Spec.NodeName = ""
if hostPod.Spec.Affinity == nil {
hostPod.Spec.Affinity = &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: []corev1.PreferredSchedulingTerm{{
Weight: 100,
Preference: corev1.NodeSelectorTerm{
MatchExpressions: []corev1.NodeSelectorRequirement{{
Key: "kubernetes.io/hostname",
Operator: corev1.NodeSelectorOpIn,
Values: []string{p.agentHostname},
}},
},
}},
},
}
}
// The pod's own nodeSelector is ignored.
// The final selector is determined by the cluster spec, but overridden by a policy if present.

View File

@@ -0,0 +1,341 @@
package k3k_test
import (
"context"
"time"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/utils/ptr"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/rancher/k3k/k3k-kubelet/translate"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
var _ = Context("In a shared cluster", Label(e2eTestLabel), Ordered, func() {
var (
virtualCluster *VirtualCluster
translator *translate.ToHostTranslator
)
BeforeAll(func() {
virtualCluster = NewVirtualCluster()
translator = translate.NewHostTranslator(virtualCluster.Cluster)
DeferCleanup(func() {
DeleteNamespaces(virtualCluster.Cluster.Namespace)
})
})
When("creating a Deployment with a PVC", func() {
var (
deployment *appsv1.Deployment
pvc *v1.PersistentVolumeClaim
namespace = "default"
labels = map[string]string{
"app": "k3k-deployment-test-app",
}
)
BeforeAll(func() {
var err error
ctx := context.Background()
By("Creating the PVC")
pvc = &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "k3k-test-app-",
Namespace: namespace,
},
Spec: v1.PersistentVolumeClaimSpec{
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
Resources: v1.VolumeResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceStorage: resource.MustParse("1Gi"),
},
},
},
}
pvc, err = virtualCluster.Client.CoreV1().PersistentVolumeClaims(namespace).Create(ctx, pvc, metav1.CreateOptions{})
Expect(err).To(Not(HaveOccurred()))
By("Creating the Deployment")
deployment = &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "k3k-test-app-",
Namespace: namespace,
},
Spec: appsv1.DeploymentSpec{
Replicas: ptr.To[int32](3),
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "nginx",
Image: "nginx",
VolumeMounts: []v1.VolumeMount{{
Name: "data-volume",
MountPath: "/data",
}},
},
},
Volumes: []v1.Volume{{
Name: "data-volume",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc.Name,
},
},
}},
},
},
},
}
deployment, err = virtualCluster.Client.AppsV1().Deployments(namespace).Create(ctx, deployment, metav1.CreateOptions{})
Expect(err).To(Not(HaveOccurred()))
})
It("should bound the PVC in the virtual cluster", func() {
ctx := context.Background()
Eventually(func(g Gomega) {
virtualPVC, err := virtualCluster.Client.CoreV1().PersistentVolumeClaims(namespace).Get(ctx, pvc.Name, metav1.GetOptions{})
g.Expect(err).NotTo(HaveOccurred())
g.Expect(virtualPVC.Status.Phase).To(Equal(v1.ClaimBound))
}).
WithPolling(time.Second * 3).
WithTimeout(time.Minute * 3).
Should(Succeed())
})
It("should bound the PVC in the host cluster", func() {
ctx := context.Background()
Eventually(func(g Gomega) {
hostPVCName := translator.NamespacedName(pvc)
hostPVC, err := k8s.CoreV1().PersistentVolumeClaims(hostPVCName.Namespace).Get(ctx, hostPVCName.Name, metav1.GetOptions{})
g.Expect(err).NotTo(HaveOccurred())
g.Expect(hostPVC.Status.Phase).To(Equal(v1.ClaimBound))
}).
WithPolling(time.Second * 3).
WithTimeout(time.Minute * 3).
Should(Succeed())
})
It("should have the Pods running in the virtual cluster", func() {
ctx := context.Background()
Eventually(func(g Gomega) {
labelSelector := metav1.FormatLabelSelector(deployment.Spec.Selector)
listOpts := metav1.ListOptions{LabelSelector: labelSelector}
pods, err := virtualCluster.Client.CoreV1().Pods(namespace).List(ctx, listOpts)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(pods.Items).Should(HaveLen(int(*deployment.Spec.Replicas)))
for _, pod := range pods.Items {
g.Expect(pod.Status.Phase).To(Equal(v1.PodRunning))
}
}).
WithPolling(time.Second * 3).
WithTimeout(time.Minute * 3).
Should(Succeed())
})
It("should have the Pods running in the host cluster", func() {
ctx := context.Background()
Eventually(func(g Gomega) {
labelSelector := metav1.FormatLabelSelector(deployment.Spec.Selector)
listOpts := metav1.ListOptions{LabelSelector: labelSelector}
pods, err := virtualCluster.Client.CoreV1().Pods(namespace).List(ctx, listOpts)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(pods.Items).Should(HaveLen(int(*deployment.Spec.Replicas)))
for _, pod := range pods.Items {
hostPodName := translator.NamespacedName(&pod)
pod, err := k8s.CoreV1().Pods(hostPodName.Namespace).Get(ctx, hostPodName.Name, metav1.GetOptions{})
g.Expect(err).NotTo(HaveOccurred())
g.Expect(pod.Status.Phase).To(Equal(v1.PodRunning))
}
}).
WithPolling(time.Second * 3).
WithTimeout(time.Minute * 3).
Should(Succeed())
})
})
When("creating a StatefulSet with a PVC", func() {
var (
statefulSet *appsv1.StatefulSet
namespace = "default"
labels = map[string]string{
"app": "k3k-sts-test-app",
}
)
BeforeAll(func() {
var err error
ctx := context.Background()
namespace := "default"
By("Creating the StatefulSet")
statefulSet = &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "k3k-sts-test-app-",
Namespace: namespace,
},
Spec: appsv1.StatefulSetSpec{
Replicas: ptr.To[int32](3),
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "nginx",
Image: "nginx",
VolumeMounts: []v1.VolumeMount{{
Name: "www",
MountPath: "/usr/share/nginx/html",
}},
},
},
},
},
VolumeClaimTemplates: []v1.PersistentVolumeClaim{{
ObjectMeta: metav1.ObjectMeta{
Name: "www",
Labels: labels,
},
Spec: v1.PersistentVolumeClaimSpec{
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
Resources: v1.VolumeResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceStorage: resource.MustParse("1Gi"),
},
},
},
}},
},
}
statefulSet, err = virtualCluster.Client.AppsV1().StatefulSets(namespace).Create(ctx, statefulSet, metav1.CreateOptions{})
Expect(err).To(Not(HaveOccurred()))
})
It("should bound the PVCs in the virtual cluster", func() {
ctx := context.Background()
Eventually(func(g Gomega) {
labelSelector := metav1.FormatLabelSelector(statefulSet.Spec.Selector)
listOpts := metav1.ListOptions{LabelSelector: labelSelector}
pvcs, err := virtualCluster.Client.CoreV1().PersistentVolumeClaims(namespace).List(ctx, listOpts)
g.Expect(err).NotTo(HaveOccurred())
for _, pvc := range pvcs.Items {
g.Expect(pvc.Status.Phase).To(Equal(v1.ClaimBound))
}
}).
WithPolling(time.Second * 3).
WithTimeout(time.Minute * 3).
Should(Succeed())
})
It("should bound the PVCs in the host cluster", func() {
ctx := context.Background()
Eventually(func(g Gomega) {
labelSelector := metav1.FormatLabelSelector(statefulSet.Spec.Selector)
listOpts := metav1.ListOptions{LabelSelector: labelSelector}
pvcs, err := virtualCluster.Client.CoreV1().PersistentVolumeClaims(statefulSet.Namespace).List(ctx, listOpts)
g.Expect(err).NotTo(HaveOccurred())
for _, pvc := range pvcs.Items {
hostPVCName := translator.NamespacedName(&pvc)
hostPVC, err := k8s.CoreV1().PersistentVolumeClaims(hostPVCName.Namespace).Get(ctx, hostPVCName.Name, metav1.GetOptions{})
g.Expect(err).NotTo(HaveOccurred())
g.Expect(hostPVC.Status.Phase).To(Equal(v1.ClaimBound))
}
}).
WithPolling(time.Second * 3).
WithTimeout(time.Minute * 3).
Should(Succeed())
})
It("should have the Pods running in the virtual cluster", func() {
ctx := context.Background()
Eventually(func(g Gomega) {
labelSelector := metav1.FormatLabelSelector(statefulSet.Spec.Selector)
listOpts := metav1.ListOptions{LabelSelector: labelSelector}
pods, err := virtualCluster.Client.CoreV1().Pods(namespace).List(ctx, listOpts)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(pods.Items).Should(HaveLen(int(*statefulSet.Spec.Replicas)))
for _, pod := range pods.Items {
g.Expect(pod.Status.Phase).To(Equal(v1.PodRunning))
}
}).
WithPolling(time.Second * 3).
WithTimeout(time.Minute * 3).
Should(Succeed())
})
It("should have the Pods running in the host cluster", func() {
ctx := context.Background()
Eventually(func(g Gomega) {
labelSelector := metav1.FormatLabelSelector(statefulSet.Spec.Selector)
listOpts := metav1.ListOptions{LabelSelector: labelSelector}
pods, err := virtualCluster.Client.CoreV1().Pods(namespace).List(ctx, listOpts)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(pods.Items).Should(HaveLen(int(*statefulSet.Spec.Replicas)))
for _, pod := range pods.Items {
hostPodName := translator.NamespacedName(&pod)
pod, err := k8s.CoreV1().Pods(hostPodName.Namespace).Get(ctx, hostPodName.Name, metav1.GetOptions{})
g.Expect(err).NotTo(HaveOccurred())
g.Expect(pod.Status.Phase).To(Equal(v1.PodRunning))
}
}).
WithPolling(time.Second * 3).
WithTimeout(time.Minute * 3).
Should(Succeed())
})
})
})

View File

@@ -20,16 +20,132 @@ import (
)
var _ = Context("In a shared cluster", Label(e2eTestLabel), Ordered, func() {
var virtualCluster *VirtualCluster
var (
virtualCluster *VirtualCluster
translator *translate.ToHostTranslator
)
BeforeAll(func() {
virtualCluster = NewVirtualCluster()
translator = translate.NewHostTranslator(virtualCluster.Cluster)
DeferCleanup(func() {
DeleteNamespaces(virtualCluster.Cluster.Namespace)
})
})
When("creating a Pod without any Affinity", func() {
var pod *v1.Pod
BeforeAll(func() {
var err error
ctx := context.Background()
pod = &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "nginx-",
Namespace: "default",
},
Spec: v1.PodSpec{
Containers: []v1.Container{{
Name: "nginx",
Image: "nginx",
}},
},
}
pod, err = virtualCluster.Client.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{})
Expect(err).To(Not(HaveOccurred()))
})
It("should have the default Affinity", func() {
ctx := context.Background()
Eventually(func(g Gomega) {
hostPodName := translator.NamespacedName(pod)
hostPod, err := k8s.CoreV1().Pods(hostPodName.Namespace).Get(ctx, hostPodName.Name, metav1.GetOptions{})
g.Expect(err).NotTo(HaveOccurred())
g.Expect(hostPod.Spec.Affinity).To(Not(BeNil()))
g.Expect(hostPod.Spec.Affinity.NodeAffinity).To(Not(BeNil()))
g.Expect(hostPod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution).To(Not(BeNil()))
preferredScheduling := hostPod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution
g.Expect(preferredScheduling).To(Not(BeEmpty()))
g.Expect(preferredScheduling[0].Weight).To(Equal(int32(100)))
g.Expect(preferredScheduling[0].Preference.MatchExpressions).To(Not(BeEmpty()))
g.Expect(preferredScheduling[0].Preference.MatchExpressions[0].Key).To(Equal("kubernetes.io/hostname"))
}).
WithPolling(time.Second).
WithTimeout(time.Minute).
Should(Succeed())
})
})
When("creating a Pod with an Affinity", func() {
var pod *v1.Pod
BeforeAll(func() {
var err error
ctx := context.Background()
pod = &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "nginx-",
Namespace: "default",
},
Spec: v1.PodSpec{
Containers: []v1.Container{{
Name: "nginx",
Image: "nginx",
}},
Affinity: &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{{
MatchExpressions: []v1.NodeSelectorRequirement{{
Key: "kubernetes.io/hostname",
Operator: v1.NodeSelectorOpNotIn,
Values: []string{"fake"},
}},
}},
},
},
},
},
}
pod, err = virtualCluster.Client.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{})
Expect(err).To(Not(HaveOccurred()))
})
It("should not have the default Affinity", func() {
ctx := context.Background()
Eventually(func(g Gomega) {
hostPodName := translator.NamespacedName(pod)
hostPod, err := k8s.CoreV1().Pods(hostPodName.Namespace).Get(ctx, hostPodName.Name, metav1.GetOptions{})
g.Expect(err).NotTo(HaveOccurred())
g.Expect(hostPod.Spec.Affinity).To(Not(BeNil()))
g.Expect(hostPod.Spec.Affinity.NodeAffinity).To(Not(BeNil()))
g.Expect(hostPod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution).To(Not(BeNil()))
requiredScheduling := hostPod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution
g.Expect(requiredScheduling).To(Not(BeNil()))
g.Expect(requiredScheduling.NodeSelectorTerms).To(Not(BeEmpty()))
g.Expect(requiredScheduling.NodeSelectorTerms[0].MatchExpressions).To(Not(BeEmpty()))
g.Expect(requiredScheduling.NodeSelectorTerms[0].MatchExpressions[0].Key).To(Equal("kubernetes.io/hostname"))
g.Expect(requiredScheduling.NodeSelectorTerms[0].MatchExpressions[0].Values).To(ContainElement("fake"))
}).
WithPolling(time.Second).
WithTimeout(time.Minute).
Should(Succeed())
})
})
When("creating a Pod with an invalid configuration", func() {
var virtualPod *v1.Pod
@@ -140,7 +256,6 @@ var _ = Context("In a shared cluster", Label(e2eTestLabel), Ordered, func() {
By("Checking the container status of the Pod in the Host Cluster")
Eventually(func(g Gomega) {
translator := translate.NewHostTranslator(virtualCluster.Cluster)
hostPodName := translator.NamespacedName(virtualPod)
pod, err := k8s.CoreV1().Pods(hostPodName.Namespace).Get(ctx, hostPodName.Name, metav1.GetOptions{})
@@ -207,7 +322,6 @@ var _ = Context("In a shared cluster", Label(e2eTestLabel), Ordered, func() {
By("Checking the status of the Pod in the Host Cluster")
Eventually(func(g Gomega) {
translator := translate.NewHostTranslator(virtualCluster.Cluster)
hostPodName := translator.NamespacedName(virtualPod)
hPod, err := k8s.CoreV1().Pods(hostPodName.Namespace).Get(ctx, hostPodName.Name, metav1.GetOptions{})

View File

@@ -412,13 +412,15 @@ func restartServerPod(ctx context.Context, virtualCluster *VirtualCluster) {
By("Deleting server pod")
// check that the server pods restarted
Eventually(func() any {
Eventually(func(g Gomega) {
serverPods := listServerPods(ctx, virtualCluster)
Expect(len(serverPods)).To(Equal(1))
return serverPods[0].DeletionTimestamp
}).WithTimeout(60 * time.Second).WithPolling(time.Second * 5).Should(BeNil())
g.Expect(serverPods).To(HaveLen(1))
g.Expect(serverPods[0].DeletionTimestamp).To(Not(BeNil()))
}).
WithTimeout(time.Minute * 2).
WithPolling(time.Second * 5).
Should(Succeed())
}
func listServerPods(ctx context.Context, virtualCluster *VirtualCluster) []v1.Pod {