mirror of
https://github.com/rancher/k3k.git
synced 2026-03-05 19:22:12 +00:00
305 lines
7.4 KiB
Go
305 lines
7.4 KiB
Go
package k3k_test
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/rest"
|
|
"k8s.io/client-go/tools/clientcmd"
|
|
"k8s.io/client-go/tools/remotecommand"
|
|
"k8s.io/kubectl/pkg/scheme"
|
|
"k8s.io/utils/ptr"
|
|
|
|
corev1 "k8s.io/api/core/v1"
|
|
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
|
|
|
|
"github.com/rancher/k3k/k3k-kubelet/translate"
|
|
"github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1"
|
|
"github.com/rancher/k3k/pkg/controller/certs"
|
|
"github.com/rancher/k3k/pkg/controller/kubeconfig"
|
|
|
|
. "github.com/onsi/ginkgo/v2"
|
|
. "github.com/onsi/gomega"
|
|
)
|
|
|
|
type VirtualCluster struct {
|
|
Cluster *v1alpha1.Cluster
|
|
RestConfig *rest.Config
|
|
Client *kubernetes.Clientset
|
|
}
|
|
|
|
func NewVirtualCluster() *VirtualCluster {
|
|
GinkgoHelper()
|
|
|
|
namespace := NewNamespace()
|
|
|
|
By(fmt.Sprintf("Creating new virtual cluster in namespace %s", namespace.Name))
|
|
cluster := NewCluster(namespace.Name)
|
|
CreateCluster(cluster)
|
|
|
|
client, restConfig := NewVirtualK8sClientAndConfig(cluster)
|
|
|
|
By(fmt.Sprintf("Created virtual cluster %s/%s", cluster.Namespace, cluster.Name))
|
|
|
|
return &VirtualCluster{
|
|
Cluster: cluster,
|
|
RestConfig: restConfig,
|
|
Client: client,
|
|
}
|
|
}
|
|
|
|
// NewVirtualClusters will create multiple Virtual Clusters asynchronously
|
|
func NewVirtualClusters(n int) []*VirtualCluster {
|
|
GinkgoHelper()
|
|
|
|
var clusters []*VirtualCluster
|
|
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(n)
|
|
|
|
for range n {
|
|
go func() {
|
|
defer wg.Done()
|
|
defer GinkgoRecover()
|
|
|
|
clusters = append(clusters, NewVirtualCluster())
|
|
}()
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
return clusters
|
|
}
|
|
|
|
func NewNamespace() *corev1.Namespace {
|
|
GinkgoHelper()
|
|
|
|
namespace := &corev1.Namespace{ObjectMeta: v1.ObjectMeta{GenerateName: "ns-"}}
|
|
namespace, err := k8s.CoreV1().Namespaces().Create(context.Background(), namespace, v1.CreateOptions{})
|
|
Expect(err).To(Not(HaveOccurred()))
|
|
|
|
return namespace
|
|
}
|
|
|
|
func DeleteNamespaces(names ...string) {
|
|
GinkgoHelper()
|
|
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(len(names))
|
|
|
|
for _, name := range names {
|
|
go func() {
|
|
defer wg.Done()
|
|
defer GinkgoRecover()
|
|
|
|
deleteNamespace(name)
|
|
}()
|
|
}
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
func deleteNamespace(name string) {
|
|
GinkgoHelper()
|
|
|
|
By(fmt.Sprintf("Deleting namespace %s", name))
|
|
|
|
err := k8s.CoreV1().Namespaces().Delete(context.Background(), name, v1.DeleteOptions{
|
|
GracePeriodSeconds: ptr.To[int64](0),
|
|
})
|
|
Expect(err).To(Not(HaveOccurred()))
|
|
}
|
|
|
|
func NewCluster(namespace string) *v1alpha1.Cluster {
|
|
return &v1alpha1.Cluster{
|
|
ObjectMeta: v1.ObjectMeta{
|
|
GenerateName: "cluster-",
|
|
Namespace: namespace,
|
|
},
|
|
Spec: v1alpha1.ClusterSpec{
|
|
TLSSANs: []string{hostIP},
|
|
Expose: &v1alpha1.ExposeConfig{
|
|
NodePort: &v1alpha1.NodePortConfig{},
|
|
},
|
|
Persistence: v1alpha1.PersistenceConfig{
|
|
Type: v1alpha1.EphemeralPersistenceMode,
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
func CreateCluster(cluster *v1alpha1.Cluster) {
|
|
GinkgoHelper()
|
|
|
|
ctx := context.Background()
|
|
err := k8sClient.Create(ctx, cluster)
|
|
Expect(err).To(Not(HaveOccurred()))
|
|
|
|
// check that the server Pod and the Kubelet are in Ready state
|
|
Eventually(func() bool {
|
|
podList, err := k8s.CoreV1().Pods(cluster.Namespace).List(ctx, v1.ListOptions{})
|
|
Expect(err).To(Not(HaveOccurred()))
|
|
|
|
serverRunning := false
|
|
kubeletRunning := false
|
|
|
|
for _, pod := range podList.Items {
|
|
imageName := pod.Spec.Containers[0].Image
|
|
imageName = strings.Split(imageName, ":")[0] // remove tag
|
|
|
|
switch imageName {
|
|
case "rancher/k3s":
|
|
serverRunning = pod.Status.Phase == corev1.PodRunning
|
|
case "rancher/k3k-kubelet":
|
|
kubeletRunning = pod.Status.Phase == corev1.PodRunning
|
|
}
|
|
|
|
if serverRunning && kubeletRunning {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}).
|
|
WithTimeout(time.Minute * 2).
|
|
WithPolling(time.Second * 5).
|
|
Should(BeTrue())
|
|
}
|
|
|
|
// NewVirtualK8sClient returns a Kubernetes ClientSet for the virtual cluster
|
|
func NewVirtualK8sClient(cluster *v1alpha1.Cluster) *kubernetes.Clientset {
|
|
virtualK8sClient, _ := NewVirtualK8sClientAndConfig(cluster)
|
|
return virtualK8sClient
|
|
}
|
|
|
|
// NewVirtualK8sClient returns a Kubernetes ClientSet for the virtual cluster
|
|
func NewVirtualK8sClientAndConfig(cluster *v1alpha1.Cluster) (*kubernetes.Clientset, *rest.Config) {
|
|
GinkgoHelper()
|
|
|
|
var (
|
|
err error
|
|
config *clientcmdapi.Config
|
|
)
|
|
|
|
ctx := context.Background()
|
|
|
|
Eventually(func() error {
|
|
vKubeconfig := kubeconfig.New()
|
|
kubeletAltName := fmt.Sprintf("k3k-%s-kubelet", cluster.Name)
|
|
vKubeconfig.AltNames = certs.AddSANs([]string{hostIP, kubeletAltName})
|
|
config, err = vKubeconfig.Generate(ctx, k8sClient, cluster, hostIP, 0)
|
|
return err
|
|
}).
|
|
WithTimeout(time.Minute * 2).
|
|
WithPolling(time.Second * 5).
|
|
Should(BeNil())
|
|
|
|
configData, err := clientcmd.Write(*config)
|
|
Expect(err).To(Not(HaveOccurred()))
|
|
|
|
restcfg, err := clientcmd.RESTConfigFromKubeConfig(configData)
|
|
Expect(err).To(Not(HaveOccurred()))
|
|
virtualK8sClient, err := kubernetes.NewForConfig(restcfg)
|
|
Expect(err).To(Not(HaveOccurred()))
|
|
|
|
return virtualK8sClient, restcfg
|
|
}
|
|
|
|
func (c *VirtualCluster) NewNginxPod(namespace string) (*corev1.Pod, string) {
|
|
GinkgoHelper()
|
|
|
|
if namespace == "" {
|
|
namespace = "default"
|
|
}
|
|
|
|
nginxPod := &corev1.Pod{
|
|
ObjectMeta: v1.ObjectMeta{
|
|
GenerateName: "nginx-",
|
|
Namespace: namespace,
|
|
},
|
|
Spec: corev1.PodSpec{
|
|
Containers: []corev1.Container{{
|
|
Name: "nginx",
|
|
Image: "nginx",
|
|
}},
|
|
},
|
|
}
|
|
|
|
By("Creating Pod")
|
|
|
|
ctx := context.Background()
|
|
nginxPod, err := c.Client.CoreV1().Pods(nginxPod.Namespace).Create(ctx, nginxPod, v1.CreateOptions{})
|
|
Expect(err).To(Not(HaveOccurred()))
|
|
|
|
var podIP string
|
|
|
|
// check that the nginx Pod is up and running in the host cluster
|
|
Eventually(func() bool {
|
|
podList, err := k8s.CoreV1().Pods(c.Cluster.Namespace).List(ctx, v1.ListOptions{})
|
|
Expect(err).To(Not(HaveOccurred()))
|
|
|
|
for _, pod := range podList.Items {
|
|
resourceName := pod.Annotations[translate.ResourceNameAnnotation]
|
|
resourceNamespace := pod.Annotations[translate.ResourceNamespaceAnnotation]
|
|
|
|
if resourceName == nginxPod.Name && resourceNamespace == nginxPod.Namespace {
|
|
podIP = pod.Status.PodIP
|
|
|
|
GinkgoWriter.Printf(
|
|
"pod=%s resource=%s/%s status=%s podIP=%s\n",
|
|
pod.Name, resourceNamespace, resourceName, pod.Status.Phase, podIP,
|
|
)
|
|
|
|
return pod.Status.Phase == corev1.PodRunning && podIP != ""
|
|
}
|
|
}
|
|
|
|
return false
|
|
}).
|
|
WithTimeout(time.Minute).
|
|
WithPolling(time.Second * 5).
|
|
Should(BeTrue())
|
|
|
|
// get the running pod from the virtual cluster
|
|
nginxPod, err = c.Client.CoreV1().Pods(nginxPod.Namespace).Get(ctx, nginxPod.Name, v1.GetOptions{})
|
|
Expect(err).To(Not(HaveOccurred()))
|
|
|
|
return nginxPod, podIP
|
|
}
|
|
|
|
// ExecCmd exec command on specific pod and wait the command's output.
|
|
func (c *VirtualCluster) ExecCmd(pod *corev1.Pod, command string) (string, string, error) {
|
|
option := &corev1.PodExecOptions{
|
|
Command: []string{"sh", "-c", command},
|
|
Stdout: true,
|
|
Stderr: true,
|
|
}
|
|
|
|
req := c.Client.CoreV1().RESTClient().Post().Resource("pods").Name(pod.Name).Namespace(pod.Namespace).SubResource("exec")
|
|
req.VersionedParams(option, scheme.ParameterCodec)
|
|
|
|
exec, err := remotecommand.NewSPDYExecutor(c.RestConfig, "POST", req.URL())
|
|
if err != nil {
|
|
return "", "", err
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
|
defer cancel()
|
|
|
|
stdout := &bytes.Buffer{}
|
|
stderr := &bytes.Buffer{}
|
|
|
|
err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
|
|
Stdout: stdout,
|
|
Stderr: stderr,
|
|
})
|
|
|
|
return stdout.String(), stderr.String(), err
|
|
}
|