diff --git a/k3k-kubelet/controller/syncer/persistentvolumeclaims.go b/k3k-kubelet/controller/syncer/persistentvolumeclaims.go index 13c6985..6dac500 100644 --- a/k3k-kubelet/controller/syncer/persistentvolumeclaims.go +++ b/k3k-kubelet/controller/syncer/persistentvolumeclaims.go @@ -131,8 +131,19 @@ func (r *PVCReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r } } + var currentHostPVC v1.PersistentVolumeClaim + + err := r.HostClient.Get(ctx, ctrlruntimeclient.ObjectKeyFromObject(syncedPVC), ¤tHostPVC) + if err == nil { + log.V(1).Info("persistent volume claim already exist in the host cluster") + } + + if !apierrors.IsNotFound(err) { + return reconcile.Result{}, err + } + // create the pvc on host - log.Info("creating the persistent volume claim for the first time on the host cluster") + log.Info("creating the persistent volume claim for the first time in the host cluster") // note that we dont need to update the PVC on the host cluster, only syncing the PVC to allow being // handled by the host cluster. diff --git a/k3k-kubelet/provider/provider.go b/k3k-kubelet/provider/provider.go index caf317b..ef8c27f 100644 --- a/k3k-kubelet/provider/provider.go +++ b/k3k-kubelet/provider/provider.go @@ -397,8 +397,27 @@ func (p *Provider) createPod(ctx context.Context, pod *corev1.Pod) error { logger = logger.WithValues("pod", hostPod.Name) - // Schedule the host pod in the same host node of the virtual kubelet - hostPod.Spec.NodeName = p.agentHostname + // Clear the NodeName to allow scheduling, and set affinity to prefer scheduling the Pod on the same host node as the virtual kubelet, + // unless the user has specified their own affinity, in which case the user's affinity is respected. + + hostPod.Spec.NodeName = "" + + if hostPod.Spec.Affinity == nil { + hostPod.Spec.Affinity = &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.PreferredSchedulingTerm{{ + Weight: 100, + Preference: corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{{ + Key: "kubernetes.io/hostname", + Operator: corev1.NodeSelectorOpIn, + Values: []string{p.agentHostname}, + }}, + }, + }}, + }, + } + } // The pod's own nodeSelector is ignored. // The final selector is determined by the cluster spec, but overridden by a policy if present. diff --git a/tests/e2e/cluster_app_test.go b/tests/e2e/cluster_app_test.go new file mode 100644 index 0000000..0461f2e --- /dev/null +++ b/tests/e2e/cluster_app_test.go @@ -0,0 +1,341 @@ +package k3k_test + +import ( + "context" + "time" + + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/utils/ptr" + + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/rancher/k3k/k3k-kubelet/translate" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Context("In a shared cluster", Label(e2eTestLabel), Ordered, func() { + var ( + virtualCluster *VirtualCluster + translator *translate.ToHostTranslator + ) + + BeforeAll(func() { + virtualCluster = NewVirtualCluster() + translator = translate.NewHostTranslator(virtualCluster.Cluster) + + DeferCleanup(func() { + DeleteNamespaces(virtualCluster.Cluster.Namespace) + }) + }) + + When("creating a Deployment with a PVC", func() { + var ( + deployment *appsv1.Deployment + pvc *v1.PersistentVolumeClaim + + namespace = "default" + labels = map[string]string{ + "app": "k3k-deployment-test-app", + } + ) + + BeforeAll(func() { + var err error + + ctx := context.Background() + + By("Creating the PVC") + + pvc = &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "k3k-test-app-", + Namespace: namespace, + }, + Spec: v1.PersistentVolumeClaimSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, + Resources: v1.VolumeResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceStorage: resource.MustParse("1Gi"), + }, + }, + }, + } + + pvc, err = virtualCluster.Client.CoreV1().PersistentVolumeClaims(namespace).Create(ctx, pvc, metav1.CreateOptions{}) + Expect(err).To(Not(HaveOccurred())) + + By("Creating the Deployment") + + deployment = &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "k3k-test-app-", + Namespace: namespace, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: ptr.To[int32](3), + Selector: &metav1.LabelSelector{ + MatchLabels: labels, + }, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "nginx", + Image: "nginx", + VolumeMounts: []v1.VolumeMount{{ + Name: "data-volume", + MountPath: "/data", + }}, + }, + }, + Volumes: []v1.Volume{{ + Name: "data-volume", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc.Name, + }, + }, + }}, + }, + }, + }, + } + + deployment, err = virtualCluster.Client.AppsV1().Deployments(namespace).Create(ctx, deployment, metav1.CreateOptions{}) + Expect(err).To(Not(HaveOccurred())) + }) + + It("should bound the PVC in the virtual cluster", func() { + ctx := context.Background() + + Eventually(func(g Gomega) { + virtualPVC, err := virtualCluster.Client.CoreV1().PersistentVolumeClaims(namespace).Get(ctx, pvc.Name, metav1.GetOptions{}) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(virtualPVC.Status.Phase).To(Equal(v1.ClaimBound)) + }). + WithPolling(time.Second * 3). + WithTimeout(time.Minute * 3). + Should(Succeed()) + }) + + It("should bound the PVC in the host cluster", func() { + ctx := context.Background() + + Eventually(func(g Gomega) { + hostPVCName := translator.NamespacedName(pvc) + + hostPVC, err := k8s.CoreV1().PersistentVolumeClaims(hostPVCName.Namespace).Get(ctx, hostPVCName.Name, metav1.GetOptions{}) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(hostPVC.Status.Phase).To(Equal(v1.ClaimBound)) + }). + WithPolling(time.Second * 3). + WithTimeout(time.Minute * 3). + Should(Succeed()) + }) + + It("should have the Pods running in the virtual cluster", func() { + ctx := context.Background() + + Eventually(func(g Gomega) { + labelSelector := metav1.FormatLabelSelector(deployment.Spec.Selector) + listOpts := metav1.ListOptions{LabelSelector: labelSelector} + + pods, err := virtualCluster.Client.CoreV1().Pods(namespace).List(ctx, listOpts) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(pods.Items).Should(HaveLen(int(*deployment.Spec.Replicas))) + + for _, pod := range pods.Items { + g.Expect(pod.Status.Phase).To(Equal(v1.PodRunning)) + } + }). + WithPolling(time.Second * 3). + WithTimeout(time.Minute * 3). + Should(Succeed()) + }) + + It("should have the Pods running in the host cluster", func() { + ctx := context.Background() + + Eventually(func(g Gomega) { + labelSelector := metav1.FormatLabelSelector(deployment.Spec.Selector) + listOpts := metav1.ListOptions{LabelSelector: labelSelector} + + pods, err := virtualCluster.Client.CoreV1().Pods(namespace).List(ctx, listOpts) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(pods.Items).Should(HaveLen(int(*deployment.Spec.Replicas))) + + for _, pod := range pods.Items { + hostPodName := translator.NamespacedName(&pod) + + pod, err := k8s.CoreV1().Pods(hostPodName.Namespace).Get(ctx, hostPodName.Name, metav1.GetOptions{}) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(pod.Status.Phase).To(Equal(v1.PodRunning)) + } + }). + WithPolling(time.Second * 3). + WithTimeout(time.Minute * 3). + Should(Succeed()) + }) + }) + + When("creating a StatefulSet with a PVC", func() { + var ( + statefulSet *appsv1.StatefulSet + + namespace = "default" + labels = map[string]string{ + "app": "k3k-sts-test-app", + } + ) + + BeforeAll(func() { + var err error + + ctx := context.Background() + + namespace := "default" + + By("Creating the StatefulSet") + + statefulSet = &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "k3k-sts-test-app-", + Namespace: namespace, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: ptr.To[int32](3), + Selector: &metav1.LabelSelector{ + MatchLabels: labels, + }, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "nginx", + Image: "nginx", + VolumeMounts: []v1.VolumeMount{{ + Name: "www", + MountPath: "/usr/share/nginx/html", + }}, + }, + }, + }, + }, + VolumeClaimTemplates: []v1.PersistentVolumeClaim{{ + ObjectMeta: metav1.ObjectMeta{ + Name: "www", + Labels: labels, + }, + Spec: v1.PersistentVolumeClaimSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, + Resources: v1.VolumeResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceStorage: resource.MustParse("1Gi"), + }, + }, + }, + }}, + }, + } + + statefulSet, err = virtualCluster.Client.AppsV1().StatefulSets(namespace).Create(ctx, statefulSet, metav1.CreateOptions{}) + Expect(err).To(Not(HaveOccurred())) + }) + + It("should bound the PVCs in the virtual cluster", func() { + ctx := context.Background() + + Eventually(func(g Gomega) { + labelSelector := metav1.FormatLabelSelector(statefulSet.Spec.Selector) + listOpts := metav1.ListOptions{LabelSelector: labelSelector} + + pvcs, err := virtualCluster.Client.CoreV1().PersistentVolumeClaims(namespace).List(ctx, listOpts) + g.Expect(err).NotTo(HaveOccurred()) + + for _, pvc := range pvcs.Items { + g.Expect(pvc.Status.Phase).To(Equal(v1.ClaimBound)) + } + }). + WithPolling(time.Second * 3). + WithTimeout(time.Minute * 3). + Should(Succeed()) + }) + + It("should bound the PVCs in the host cluster", func() { + ctx := context.Background() + + Eventually(func(g Gomega) { + labelSelector := metav1.FormatLabelSelector(statefulSet.Spec.Selector) + listOpts := metav1.ListOptions{LabelSelector: labelSelector} + + pvcs, err := virtualCluster.Client.CoreV1().PersistentVolumeClaims(statefulSet.Namespace).List(ctx, listOpts) + g.Expect(err).NotTo(HaveOccurred()) + + for _, pvc := range pvcs.Items { + hostPVCName := translator.NamespacedName(&pvc) + + hostPVC, err := k8s.CoreV1().PersistentVolumeClaims(hostPVCName.Namespace).Get(ctx, hostPVCName.Name, metav1.GetOptions{}) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(hostPVC.Status.Phase).To(Equal(v1.ClaimBound)) + } + }). + WithPolling(time.Second * 3). + WithTimeout(time.Minute * 3). + Should(Succeed()) + }) + + It("should have the Pods running in the virtual cluster", func() { + ctx := context.Background() + + Eventually(func(g Gomega) { + labelSelector := metav1.FormatLabelSelector(statefulSet.Spec.Selector) + listOpts := metav1.ListOptions{LabelSelector: labelSelector} + + pods, err := virtualCluster.Client.CoreV1().Pods(namespace).List(ctx, listOpts) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(pods.Items).Should(HaveLen(int(*statefulSet.Spec.Replicas))) + + for _, pod := range pods.Items { + g.Expect(pod.Status.Phase).To(Equal(v1.PodRunning)) + } + }). + WithPolling(time.Second * 3). + WithTimeout(time.Minute * 3). + Should(Succeed()) + }) + + It("should have the Pods running in the host cluster", func() { + ctx := context.Background() + + Eventually(func(g Gomega) { + labelSelector := metav1.FormatLabelSelector(statefulSet.Spec.Selector) + listOpts := metav1.ListOptions{LabelSelector: labelSelector} + + pods, err := virtualCluster.Client.CoreV1().Pods(namespace).List(ctx, listOpts) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(pods.Items).Should(HaveLen(int(*statefulSet.Spec.Replicas))) + + for _, pod := range pods.Items { + hostPodName := translator.NamespacedName(&pod) + + pod, err := k8s.CoreV1().Pods(hostPodName.Namespace).Get(ctx, hostPodName.Name, metav1.GetOptions{}) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(pod.Status.Phase).To(Equal(v1.PodRunning)) + } + }). + WithPolling(time.Second * 3). + WithTimeout(time.Minute * 3). + Should(Succeed()) + }) + }) +}) diff --git a/tests/e2e/cluster_pod_test.go b/tests/e2e/cluster_pod_test.go index 5e9b2f8..44f4849 100644 --- a/tests/e2e/cluster_pod_test.go +++ b/tests/e2e/cluster_pod_test.go @@ -20,16 +20,132 @@ import ( ) var _ = Context("In a shared cluster", Label(e2eTestLabel), Ordered, func() { - var virtualCluster *VirtualCluster + var ( + virtualCluster *VirtualCluster + translator *translate.ToHostTranslator + ) BeforeAll(func() { virtualCluster = NewVirtualCluster() + translator = translate.NewHostTranslator(virtualCluster.Cluster) DeferCleanup(func() { DeleteNamespaces(virtualCluster.Cluster.Namespace) }) }) + When("creating a Pod without any Affinity", func() { + var pod *v1.Pod + + BeforeAll(func() { + var err error + + ctx := context.Background() + + pod = &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "nginx-", + Namespace: "default", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{{ + Name: "nginx", + Image: "nginx", + }}, + }, + } + + pod, err = virtualCluster.Client.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{}) + Expect(err).To(Not(HaveOccurred())) + }) + + It("should have the default Affinity", func() { + ctx := context.Background() + + Eventually(func(g Gomega) { + hostPodName := translator.NamespacedName(pod) + + hostPod, err := k8s.CoreV1().Pods(hostPodName.Namespace).Get(ctx, hostPodName.Name, metav1.GetOptions{}) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(hostPod.Spec.Affinity).To(Not(BeNil())) + g.Expect(hostPod.Spec.Affinity.NodeAffinity).To(Not(BeNil())) + g.Expect(hostPod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution).To(Not(BeNil())) + + preferredScheduling := hostPod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution + g.Expect(preferredScheduling).To(Not(BeEmpty())) + g.Expect(preferredScheduling[0].Weight).To(Equal(int32(100))) + g.Expect(preferredScheduling[0].Preference.MatchExpressions).To(Not(BeEmpty())) + g.Expect(preferredScheduling[0].Preference.MatchExpressions[0].Key).To(Equal("kubernetes.io/hostname")) + }). + WithPolling(time.Second). + WithTimeout(time.Minute). + Should(Succeed()) + }) + }) + + When("creating a Pod with an Affinity", func() { + var pod *v1.Pod + + BeforeAll(func() { + var err error + + ctx := context.Background() + + pod = &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "nginx-", + Namespace: "default", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{{ + Name: "nginx", + Image: "nginx", + }}, + Affinity: &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{{ + MatchExpressions: []v1.NodeSelectorRequirement{{ + Key: "kubernetes.io/hostname", + Operator: v1.NodeSelectorOpNotIn, + Values: []string{"fake"}, + }}, + }}, + }, + }, + }, + }, + } + + pod, err = virtualCluster.Client.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{}) + Expect(err).To(Not(HaveOccurred())) + }) + + It("should not have the default Affinity", func() { + ctx := context.Background() + + Eventually(func(g Gomega) { + hostPodName := translator.NamespacedName(pod) + + hostPod, err := k8s.CoreV1().Pods(hostPodName.Namespace).Get(ctx, hostPodName.Name, metav1.GetOptions{}) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(hostPod.Spec.Affinity).To(Not(BeNil())) + g.Expect(hostPod.Spec.Affinity.NodeAffinity).To(Not(BeNil())) + g.Expect(hostPod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution).To(Not(BeNil())) + + requiredScheduling := hostPod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution + g.Expect(requiredScheduling).To(Not(BeNil())) + g.Expect(requiredScheduling.NodeSelectorTerms).To(Not(BeEmpty())) + g.Expect(requiredScheduling.NodeSelectorTerms[0].MatchExpressions).To(Not(BeEmpty())) + g.Expect(requiredScheduling.NodeSelectorTerms[0].MatchExpressions[0].Key).To(Equal("kubernetes.io/hostname")) + g.Expect(requiredScheduling.NodeSelectorTerms[0].MatchExpressions[0].Values).To(ContainElement("fake")) + }). + WithPolling(time.Second). + WithTimeout(time.Minute). + Should(Succeed()) + }) + }) + When("creating a Pod with an invalid configuration", func() { var virtualPod *v1.Pod @@ -140,7 +256,6 @@ var _ = Context("In a shared cluster", Label(e2eTestLabel), Ordered, func() { By("Checking the container status of the Pod in the Host Cluster") Eventually(func(g Gomega) { - translator := translate.NewHostTranslator(virtualCluster.Cluster) hostPodName := translator.NamespacedName(virtualPod) pod, err := k8s.CoreV1().Pods(hostPodName.Namespace).Get(ctx, hostPodName.Name, metav1.GetOptions{}) @@ -207,7 +322,6 @@ var _ = Context("In a shared cluster", Label(e2eTestLabel), Ordered, func() { By("Checking the status of the Pod in the Host Cluster") Eventually(func(g Gomega) { - translator := translate.NewHostTranslator(virtualCluster.Cluster) hostPodName := translator.NamespacedName(virtualPod) hPod, err := k8s.CoreV1().Pods(hostPodName.Namespace).Get(ctx, hostPodName.Name, metav1.GetOptions{}) diff --git a/tests/e2e/common_test.go b/tests/e2e/common_test.go index cab06d4..06f83a4 100644 --- a/tests/e2e/common_test.go +++ b/tests/e2e/common_test.go @@ -412,13 +412,15 @@ func restartServerPod(ctx context.Context, virtualCluster *VirtualCluster) { By("Deleting server pod") // check that the server pods restarted - Eventually(func() any { + Eventually(func(g Gomega) { serverPods := listServerPods(ctx, virtualCluster) - Expect(len(serverPods)).To(Equal(1)) - - return serverPods[0].DeletionTimestamp - }).WithTimeout(60 * time.Second).WithPolling(time.Second * 5).Should(BeNil()) + g.Expect(serverPods).To(HaveLen(1)) + g.Expect(serverPods[0].DeletionTimestamp).To(Not(BeNil())) + }). + WithTimeout(time.Minute * 2). + WithPolling(time.Second * 5). + Should(Succeed()) } func listServerPods(ctx context.Context, virtualCluster *VirtualCluster) []v1.Pod {