Files
k3k/tests/common_test.go
Enrico Candino 0164c785ab Show correct allocatable resources when a Policy is applied (#638)
* wip

* wip

* wip

* fix lint and tests

* fixed bugs for missing resources

* cleanup and refactor

* removed coreClient from configureNode

* added comments to distribute algorithm
2026-01-27 15:56:37 +01:00

478 lines
13 KiB
Go

package k3k_test
import (
"bytes"
"context"
"fmt"
"net/url"
"os"
"strings"
"sync"
"time"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/kubectl/pkg/scheme"
"k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"github.com/rancher/k3k/k3k-kubelet/translate"
"github.com/rancher/k3k/pkg/apis/k3k.io/v1beta1"
"github.com/rancher/k3k/pkg/controller/certs"
"github.com/rancher/k3k/pkg/controller/kubeconfig"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
type VirtualCluster struct {
Cluster *v1beta1.Cluster
RestConfig *rest.Config
Client *kubernetes.Clientset
Kubeconfig []byte
}
func NewVirtualCluster() *VirtualCluster { // By default, create an ephemeral cluster
GinkgoHelper()
return NewVirtualClusterWithType(v1beta1.EphemeralPersistenceMode)
}
func NewVirtualClusterWithType(persistenceType v1beta1.PersistenceMode) *VirtualCluster {
GinkgoHelper()
namespace := NewNamespace()
cluster := NewCluster(namespace.Name)
cluster.Spec.Persistence.Type = persistenceType
CreateCluster(cluster)
client, restConfig, kubeconfig := NewVirtualK8sClientAndKubeconfig(cluster)
By(fmt.Sprintf("Created virtual cluster %s/%s", cluster.Namespace, cluster.Name))
return &VirtualCluster{
Cluster: cluster,
RestConfig: restConfig,
Client: client,
Kubeconfig: kubeconfig,
}
}
// NewVirtualClusters will create multiple Virtual Clusters asynchronously
func NewVirtualClusters(n int) []*VirtualCluster {
GinkgoHelper()
var clusters []*VirtualCluster
wg := sync.WaitGroup{}
wg.Add(n)
for range n {
go func() {
defer wg.Done()
defer GinkgoRecover()
clusters = append(clusters, NewVirtualCluster())
}()
}
wg.Wait()
return clusters
}
func NewNamespace() *v1.Namespace {
GinkgoHelper()
namespace := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{GenerateName: "ns-", Labels: map[string]string{"e2e": "true"}}}
namespace, err := k8s.CoreV1().Namespaces().Create(context.Background(), namespace, metav1.CreateOptions{})
Expect(err).To(Not(HaveOccurred()))
return namespace
}
func DeleteNamespaces(names ...string) {
GinkgoHelper()
if _, found := os.LookupEnv("KEEP_NAMESPACES"); found {
By(fmt.Sprintf("Keeping namespace %v", names))
return
}
wg := sync.WaitGroup{}
wg.Add(len(names))
for _, name := range names {
go func() {
defer wg.Done()
defer GinkgoRecover()
By(fmt.Sprintf("Deleting namespace %s", name))
err := k8s.CoreV1().Namespaces().Delete(context.Background(), name, metav1.DeleteOptions{
GracePeriodSeconds: ptr.To[int64](0),
})
Expect(client.IgnoreNotFound(err)).To(Not(HaveOccurred()))
}()
}
wg.Wait()
}
func NewCluster(namespace string) *v1beta1.Cluster {
return &v1beta1.Cluster{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "cluster-",
Namespace: namespace,
},
Spec: v1beta1.ClusterSpec{
TLSSANs: []string{hostIP},
Expose: &v1beta1.ExposeConfig{
NodePort: &v1beta1.NodePortConfig{},
},
Persistence: v1beta1.PersistenceConfig{
Type: v1beta1.EphemeralPersistenceMode,
},
},
}
}
func CreateCluster(cluster *v1beta1.Cluster) {
GinkgoHelper()
By(fmt.Sprintf("Creating new virtual cluster in namespace %s", cluster.Namespace))
ctx := context.Background()
err := k8sClient.Create(ctx, cluster)
Expect(err).To(Not(HaveOccurred()))
expectedServers := int(*cluster.Spec.Servers)
expectedAgents := int(*cluster.Spec.Agents)
By(fmt.Sprintf("Waiting for cluster %s to be ready in namespace %s. Expected servers: %d. Expected agents: %d", cluster.Name, cluster.Namespace, expectedServers, expectedAgents))
// track the Eventually status to log for changes
prev := -1
// check that the server Pod and the Kubelet are in Ready state
Eventually(func() bool {
podList, err := k8s.CoreV1().Pods(cluster.Namespace).List(ctx, metav1.ListOptions{})
Expect(err).To(Not(HaveOccurred()))
// all the servers and agents needs to be in a running phase
var serversReady, agentsReady int
for _, k3sPod := range podList.Items {
_, cond := pod.GetPodCondition(&k3sPod.Status, v1.PodReady)
// pod not ready
if cond == nil || cond.Status != v1.ConditionTrue {
continue
}
if k3sPod.Labels["role"] == "server" {
serversReady++
}
if k3sPod.Labels["type"] == "agent" {
agentsReady++
}
}
if prev != (serversReady + agentsReady) {
GinkgoLogr.Info("Waiting for pods to be Ready",
"servers", serversReady, "agents", agentsReady,
"name", cluster.Name, "namespace", cluster.Namespace,
"time", time.Now().Format(time.DateTime),
)
prev = (serversReady + agentsReady)
}
// the server pods should equal the expected servers, but since in shared mode we also have the kubelet is fine to have more than one
if (serversReady != expectedServers) || (agentsReady < expectedAgents) {
return false
}
return true
}).
WithTimeout(time.Minute * 5).
WithPolling(time.Second * 10).
Should(BeTrue())
By("Cluster is ready")
}
// NewVirtualK8sClient returns a Kubernetes ClientSet for the virtual cluster
func NewVirtualK8sClient(cluster *v1beta1.Cluster) *kubernetes.Clientset {
virtualK8sClient, _ := NewVirtualK8sClientAndConfig(cluster)
return virtualK8sClient
}
// NewVirtualK8sClient returns a Kubernetes ClientSet for the virtual cluster
func NewVirtualK8sClientAndConfig(cluster *v1beta1.Cluster) (*kubernetes.Clientset, *rest.Config) {
GinkgoHelper()
var (
err error
config *clientcmdapi.Config
)
ctx := context.Background()
Eventually(func() error {
vKubeconfig := kubeconfig.New()
kubeletAltName := fmt.Sprintf("k3k-%s-kubelet", cluster.Name)
vKubeconfig.AltNames = certs.AddSANs([]string{hostIP, kubeletAltName})
config, err = vKubeconfig.Generate(ctx, k8sClient, cluster, hostIP, 0)
return err
}).
WithTimeout(time.Minute * 2).
WithPolling(time.Second * 5).
Should(BeNil())
configData, err := clientcmd.Write(*config)
Expect(err).To(Not(HaveOccurred()))
restcfg, err := clientcmd.RESTConfigFromKubeConfig(configData)
Expect(err).To(Not(HaveOccurred()))
virtualK8sClient, err := kubernetes.NewForConfig(restcfg)
Expect(err).To(Not(HaveOccurred()))
return virtualK8sClient, restcfg
}
// NewVirtualK8sClient returns a Kubernetes ClientSet for the virtual cluster
func NewVirtualK8sClientAndKubeconfig(cluster *v1beta1.Cluster) (*kubernetes.Clientset, *rest.Config, []byte) {
GinkgoHelper()
var (
err error
config *clientcmdapi.Config
)
ctx := context.Background()
Eventually(func() error {
vKubeconfig := kubeconfig.New()
kubeletAltName := fmt.Sprintf("k3k-%s-kubelet", cluster.Name)
vKubeconfig.AltNames = certs.AddSANs([]string{hostIP, kubeletAltName})
config, err = vKubeconfig.Generate(ctx, k8sClient, cluster, hostIP, 0)
return err
}).
WithTimeout(time.Minute * 2).
WithPolling(time.Second * 5).
Should(BeNil())
configData, err := clientcmd.Write(*config)
Expect(err).To(Not(HaveOccurred()))
restcfg, err := clientcmd.RESTConfigFromKubeConfig(configData)
Expect(err).To(Not(HaveOccurred()))
virtualK8sClient, err := kubernetes.NewForConfig(restcfg)
Expect(err).To(Not(HaveOccurred()))
return virtualK8sClient, restcfg, configData
}
func (c *VirtualCluster) NewNginxPod(namespace string) (*v1.Pod, string) {
GinkgoHelper()
if namespace == "" {
namespace = "default"
}
nginxPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "nginx-",
Namespace: namespace,
},
Spec: v1.PodSpec{
Containers: []v1.Container{{
Name: "nginx",
Image: "nginx",
}},
},
}
By("Creating Nginx Pod and waiting for it to be Ready")
ctx := context.Background()
var err error
nginxPod, err = c.Client.CoreV1().Pods(nginxPod.Namespace).Create(ctx, nginxPod, metav1.CreateOptions{})
Expect(err).To(Not(HaveOccurred()))
// check that the nginx Pod is up and running in the virtual cluster
Eventually(func(g Gomega) {
nginxPod, err = c.Client.CoreV1().Pods(nginxPod.Namespace).Get(ctx, nginxPod.Name, metav1.GetOptions{})
g.Expect(err).To(Not(HaveOccurred()))
_, cond := pod.GetPodCondition(&nginxPod.Status, v1.PodReady)
g.Expect(cond).NotTo(BeNil())
g.Expect(cond.Status).To(BeEquivalentTo(metav1.ConditionTrue))
}).
WithTimeout(time.Minute).
WithPolling(time.Second).
Should(Succeed())
By(fmt.Sprintf("Nginx Pod is running (%s/%s)", nginxPod.Namespace, nginxPod.Name))
// only check the pod on the host cluster if the mode is shared mode
if c.Cluster.Spec.Mode != v1beta1.SharedClusterMode {
return nginxPod, ""
}
var podIP string
// check that the nginx Pod is up and running in the host cluster
Eventually(func() bool {
podList, err := k8s.CoreV1().Pods(c.Cluster.Namespace).List(ctx, metav1.ListOptions{})
Expect(err).To(Not(HaveOccurred()))
for _, pod := range podList.Items {
resourceName := pod.Annotations[translate.ResourceNameAnnotation]
resourceNamespace := pod.Annotations[translate.ResourceNamespaceAnnotation]
if resourceName == nginxPod.Name && resourceNamespace == nginxPod.Namespace {
podIP = pod.Status.PodIP
GinkgoWriter.Printf(
"pod=%s resource=%s/%s status=%s podIP=%s\n",
pod.Name, resourceNamespace, resourceName, pod.Status.Phase, podIP,
)
return pod.Status.Phase == v1.PodRunning && podIP != ""
}
}
return false
}).
WithTimeout(time.Minute).
WithPolling(time.Second * 5).
Should(BeTrue())
return nginxPod, podIP
}
// ExecCmd exec command on specific pod and wait the command's output.
func (c *VirtualCluster) ExecCmd(pod *v1.Pod, command string) (string, string, error) {
option := &v1.PodExecOptions{
Command: []string{"sh", "-c", command},
Stdout: true,
Stderr: true,
}
req := c.Client.CoreV1().RESTClient().Post().Resource("pods").Name(pod.Name).Namespace(pod.Namespace).SubResource("exec")
req.VersionedParams(option, scheme.ParameterCodec)
exec, err := remotecommand.NewSPDYExecutor(c.RestConfig, "POST", req.URL())
if err != nil {
return "", "", err
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
stdout := &bytes.Buffer{}
stderr := &bytes.Buffer{}
err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdout: stdout,
Stderr: stderr,
})
return stdout.String(), stderr.String(), err
}
func restartServerPod(ctx context.Context, virtualCluster *VirtualCluster) {
GinkgoHelper()
serverPods := listServerPods(ctx, virtualCluster)
Expect(len(serverPods)).To(Equal(1))
serverPod := serverPods[0]
GinkgoWriter.Printf("deleting pod %s/%s\n", serverPod.Namespace, serverPod.Name)
err := k8s.CoreV1().Pods(virtualCluster.Cluster.Namespace).Delete(ctx, serverPod.Name, metav1.DeleteOptions{})
Expect(err).To(Not(HaveOccurred()))
By("Deleting server pod")
// check that the server pods restarted
Eventually(func() any {
serverPods := listServerPods(ctx, virtualCluster)
Expect(len(serverPods)).To(Equal(1))
return serverPods[0].DeletionTimestamp
}).WithTimeout(60 * time.Second).WithPolling(time.Second * 5).Should(BeNil())
}
func listServerPods(ctx context.Context, virtualCluster *VirtualCluster) []v1.Pod {
labelSelector := "cluster=" + virtualCluster.Cluster.Name + ",role=server"
serverPods, err := k8s.CoreV1().Pods(virtualCluster.Cluster.Namespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector})
Expect(err).To(Not(HaveOccurred()))
return serverPods.Items
}
func listAgentPods(ctx context.Context, virtualCluster *VirtualCluster) []v1.Pod {
labelSelector := fmt.Sprintf("cluster=%s,type=agent,mode=%s", virtualCluster.Cluster.Name, virtualCluster.Cluster.Spec.Mode)
agentPods, err := k8s.CoreV1().Pods(virtualCluster.Cluster.Namespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector})
Expect(err).To(Not(HaveOccurred()))
return agentPods.Items
}
// getEnv will get an environment variable from a pod it will return empty string if not found
func getEnv(pod *v1.Pod, envName string) (string, bool) {
container := pod.Spec.Containers[0]
for _, envVar := range container.Env {
if envVar.Name == envName {
return envVar.Value, true
}
}
return "", false
}
// isArgFound will return true if the argument passed to the function is found in container args
func isArgFound(pod *v1.Pod, arg string) bool {
container := pod.Spec.Containers[0]
for _, cmd := range container.Command {
if strings.Contains(cmd, arg) {
return true
}
}
return false
}
func getServerIP(ctx context.Context, cfg *rest.Config) (string, error) {
if k3sContainer != nil {
return k3sContainer.ContainerIP(ctx)
}
u, err := url.Parse(cfg.Host)
if err != nil {
return "", err
}
// If Host includes a port, u.Hostname() extracts just the hostname part
return u.Hostname(), nil
}