Compare commits

..

5 Commits

Author SHA1 Message Date
Enrico Candino
93025d301b Bump Charts to 1.0.1-rc2 (#586) 2025-12-03 11:44:49 +01:00
Enrico Candino
e385ceb66f Fixed missing Kubernetes host version when specified (#585)
* fix for missing host version

* added test

* fix test

* fix test
2025-12-03 09:21:27 +01:00
Enrico Candino
5c49c3d6b7 Fix create events rbac (#575)
* cleanup logs in kubelet provider

* added events create rbac to kubelet

* fix lint, moved fetch pod logic in separate func
2025-11-25 13:48:04 +01:00
Enrico Candino
521ff17ef6 Added test for SubPathExpr (#569)
* small fixes

* added test for subpathexpr

* removed old comment
2025-11-21 16:28:59 +01:00
Enrico Candino
5b4f31ef73 bump version to 1.0.1-rc1 in Chart.yaml (#567) 2025-11-17 18:24:57 +01:00
18 changed files with 411 additions and 140 deletions

View File

@@ -19,7 +19,7 @@ CRD_REF_DOCS := go run github.com/elastic/crd-ref-docs@$(CRD_REF_DOCS_VER)
ENVTEST ?= go run sigs.k8s.io/controller-runtime/tools/setup-envtest@$(ENVTEST_VERSION)
ENVTEST_DIR ?= $(shell pwd)/.envtest
E2E_LABEL_FILTER ?= "e2e"
E2E_LABEL_FILTER ?= e2e
export KUBEBUILDER_ASSETS ?= $(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(ENVTEST_DIR) -p path)

View File

@@ -2,5 +2,5 @@ apiVersion: v2
name: k3k
description: A Helm chart for K3K
type: application
version: 1.0.0
appVersion: v1.0.0
version: 1.0.1-rc2
appVersion: v1.0.1-rc2

View File

@@ -86,7 +86,7 @@ func New(hostConfig rest.Config, hostMgr, virtualMgr manager.Manager, logger log
CoreClient: coreClient,
ClusterNamespace: namespace,
ClusterName: name,
logger: logger,
logger: logger.WithValues("cluster", name),
serverIP: serverIP,
dnsIP: dnsIP,
}
@@ -95,8 +95,12 @@ func New(hostConfig rest.Config, hostMgr, virtualMgr manager.Manager, logger log
}
// GetContainerLogs retrieves the logs of a container by name from the provider.
func (p *Provider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) {
hostPodName := p.Translator.TranslateName(namespace, podName)
func (p *Provider) GetContainerLogs(ctx context.Context, namespace, name, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) {
hostPodName := p.Translator.TranslateName(namespace, name)
logger := p.logger.WithValues("namespace", namespace, "name", name, "pod", hostPodName, "container", containerName)
logger.V(1).Info("GetContainerLogs")
options := corev1.PodLogOptions{
Container: containerName,
Timestamps: opts.Timestamps,
@@ -125,20 +129,27 @@ func (p *Provider) GetContainerLogs(ctx context.Context, namespace, podName, con
}
closer, err := p.CoreClient.Pods(p.ClusterNamespace).GetLogs(hostPodName, &options).Stream(ctx)
p.logger.Error(err, fmt.Sprintf("got error when getting logs for %s in %s", hostPodName, p.ClusterNamespace))
if err != nil {
logger.Error(err, "Error getting logs from container")
}
return closer, err
}
// RunInContainer executes a command in a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
func (p *Provider) RunInContainer(ctx context.Context, namespace, podName, containerName string, cmd []string, attach api.AttachIO) error {
hostPodName := p.Translator.TranslateName(namespace, podName)
func (p *Provider) RunInContainer(ctx context.Context, namespace, name, containerName string, cmd []string, attach api.AttachIO) error {
hostPodName := p.Translator.TranslateName(namespace, name)
logger := p.logger.WithValues("namespace", namespace, "name", name, "pod", hostPodName, "container", containerName)
logger.V(1).Info("RunInContainer")
req := p.CoreClient.RESTClient().Post().
Resource("pods").
Name(hostPodName).
Namespace(p.ClusterNamespace).
SubResource("exec")
req.VersionedParams(&corev1.PodExecOptions{
Container: containerName,
Command: cmd,
@@ -150,10 +161,11 @@ func (p *Provider) RunInContainer(ctx context.Context, namespace, podName, conta
exec, err := remotecommand.NewSPDYExecutor(&p.ClientConfig, http.MethodPost, req.URL())
if err != nil {
logger.Error(err, "Error creating SPDY executor")
return err
}
return exec.StreamWithContext(ctx, remotecommand.StreamOptions{
if err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdin: attach.Stdin(),
Stdout: attach.Stdout(),
Stderr: attach.Stderr(),
@@ -161,18 +173,28 @@ func (p *Provider) RunInContainer(ctx context.Context, namespace, podName, conta
TerminalSizeQueue: &translatorSizeQueue{
resizeChan: attach.Resize(),
},
})
}); err != nil {
logger.Error(err, "Error while executing command in container")
return err
}
return nil
}
// AttachToContainer attaches to the executing process of a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
func (p *Provider) AttachToContainer(ctx context.Context, namespace, podName, containerName string, attach api.AttachIO) error {
hostPodName := p.Translator.TranslateName(namespace, podName)
func (p *Provider) AttachToContainer(ctx context.Context, namespace, name, containerName string, attach api.AttachIO) error {
hostPodName := p.Translator.TranslateName(namespace, name)
logger := p.logger.WithValues("namespace", namespace, "name", name, "pod", hostPodName, "container", containerName)
logger.V(1).Info("AttachToContainer")
req := p.CoreClient.RESTClient().Post().
Resource("pods").
Name(hostPodName).
Namespace(p.ClusterNamespace).
SubResource("attach")
req.VersionedParams(&corev1.PodAttachOptions{
Container: containerName,
TTY: attach.TTY(),
@@ -183,10 +205,11 @@ func (p *Provider) AttachToContainer(ctx context.Context, namespace, podName, co
exec, err := remotecommand.NewSPDYExecutor(&p.ClientConfig, http.MethodPost, req.URL())
if err != nil {
logger.Error(err, "Error creating SPDY executor")
return err
}
return exec.StreamWithContext(ctx, remotecommand.StreamOptions{
if err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdin: attach.Stdin(),
Stdout: attach.Stdout(),
Stderr: attach.Stderr(),
@@ -194,7 +217,12 @@ func (p *Provider) AttachToContainer(ctx context.Context, namespace, podName, co
TerminalSizeQueue: &translatorSizeQueue{
resizeChan: attach.Resize(),
},
})
}); err != nil {
logger.Error(err, "Error while attaching to container")
return err
}
return nil
}
// GetStatsSummary gets the stats for the node, including running pods
@@ -203,7 +231,8 @@ func (p *Provider) GetStatsSummary(ctx context.Context) (*stats.Summary, error)
nodeList := &corev1.NodeList{}
if err := p.CoreClient.RESTClient().Get().Resource("nodes").Do(ctx).Into(nodeList); err != nil {
return nil, fmt.Errorf("unable to get nodes of cluster %s in namespace %s: %w", p.ClusterName, p.ClusterNamespace, err)
p.logger.Error(err, "Unable to get nodes of cluster")
return nil, err
}
// fetch the stats from all the nodes
@@ -221,14 +250,13 @@ func (p *Provider) GetStatsSummary(ctx context.Context) (*stats.Summary, error)
Suffix("stats/summary").
DoRaw(ctx)
if err != nil {
return nil, fmt.Errorf(
"unable to get stats of node '%s', from cluster %s in namespace %s: %w",
n.Name, p.ClusterName, p.ClusterNamespace, err,
)
p.logger.Error(err, "Unable to get stats/summary from cluster node", "node", n.Name)
return nil, err
}
stats := &stats.Summary{}
if err := json.Unmarshal(res, stats); err != nil {
p.logger.Error(err, "Error unmarshaling stats/summary from cluster node", "node", n.Name)
return nil, err
}
@@ -241,6 +269,7 @@ func (p *Provider) GetStatsSummary(ctx context.Context) (*stats.Summary, error)
pods, err := p.GetPods(ctx)
if err != nil {
p.logger.Error(err, "Error getting pods from cluster for stats")
return nil, err
}
@@ -278,9 +307,12 @@ func (p *Provider) GetStatsSummary(ctx context.Context) (*stats.Summary, error)
// GetMetricsResource gets the metrics for the node, including running pods
func (p *Provider) GetMetricsResource(ctx context.Context) ([]*dto.MetricFamily, error) {
p.logger.V(1).Info("GetMetricsResource")
statsSummary, err := p.GetStatsSummary(ctx)
if err != nil {
return nil, errors.Join(err, errors.New("error fetching MetricsResource"))
p.logger.Error(err, "Error getting stats summary from cluster for metrics")
return nil, err
}
registry := compbasemetrics.NewKubeRegistry()
@@ -288,15 +320,20 @@ func (p *Provider) GetMetricsResource(ctx context.Context) ([]*dto.MetricFamily,
metricFamily, err := registry.Gather()
if err != nil {
return nil, errors.Join(err, errors.New("error gathering metrics from collector"))
p.logger.Error(err, "Error gathering metrics from collector")
return nil, err
}
return metricFamily, nil
}
// PortForward forwards a local port to a port on the pod
func (p *Provider) PortForward(ctx context.Context, namespace, pod string, port int32, stream io.ReadWriteCloser) error {
hostPodName := p.Translator.TranslateName(namespace, pod)
func (p *Provider) PortForward(ctx context.Context, namespace, name string, port int32, stream io.ReadWriteCloser) error {
hostPodName := p.Translator.TranslateName(namespace, name)
logger := p.logger.WithValues("namespace", namespace, "name", name, "pod", hostPodName, "port", port)
logger.V(1).Info("PortForward")
req := p.CoreClient.RESTClient().Post().
Resource("pods").
Name(hostPodName).
@@ -305,6 +342,7 @@ func (p *Provider) PortForward(ctx context.Context, namespace, pod string, port
transport, upgrader, err := spdy.RoundTripperFor(&p.ClientConfig)
if err != nil {
logger.Error(err, "Error creating RoundTripper for PortForward")
return err
}
@@ -318,10 +356,16 @@ func (p *Provider) PortForward(ctx context.Context, namespace, pod string, port
// so more work is needed to detect a close and handle that appropriately.
fw, err := portforward.New(dialer, []string{portAsString}, stopChannel, readyChannel, stream, stream)
if err != nil {
logger.Error(err, "Error creating new PortForward")
return err
}
return fw.ForwardPorts()
if err := fw.ForwardPorts(); err != nil {
logger.Error(err, "Error forwarding ports")
return err
}
return nil
}
// CreatePod executes createPod with retry
@@ -331,16 +375,22 @@ func (p *Provider) CreatePod(ctx context.Context, pod *corev1.Pod) error {
// createPod takes a Kubernetes Pod and deploys it within the provider.
func (p *Provider) createPod(ctx context.Context, pod *corev1.Pod) error {
logger := p.logger.WithValues("namespace", pod.Namespace, "name", pod.Name)
logger.V(1).Info("CreatePod")
// fieldPath envs are not being translated correctly using the virtual kubelet pod controller
// as a workaround we will try to fetch the pod from the virtual cluster and copy over the envSource
var sourcePod corev1.Pod
if err := p.VirtualClient.Get(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, &sourcePod); err != nil {
logger.Error(err, "Error getting Pod from Virtual Cluster")
return err
}
tPod := sourcePod.DeepCopy()
p.Translator.TranslateTo(tPod)
logger = p.logger.WithValues("pod", tPod.Name)
// get Cluster definition
clusterKey := types.NamespacedName{
Namespace: p.ClusterNamespace,
@@ -350,7 +400,8 @@ func (p *Provider) createPod(ctx context.Context, pod *corev1.Pod) error {
var cluster v1beta1.Cluster
if err := p.HostClient.Get(ctx, clusterKey, &cluster); err != nil {
return fmt.Errorf("unable to get cluster %s in namespace %s: %w", p.ClusterName, p.ClusterNamespace, err)
logger.Error(err, "Error getting Virtual Cluster definition")
return err
}
// these values shouldn't be set on create
@@ -384,16 +435,18 @@ func (p *Provider) createPod(ctx context.Context, pod *corev1.Pod) error {
// fieldpath annotations
if err := p.configureFieldPathEnv(&sourcePod, tPod); err != nil {
return fmt.Errorf("unable to fetch fieldpath annotations for pod %s/%s: %w", pod.Namespace, pod.Name, err)
}
// volumes will often refer to resources in the virtual cluster, but instead need to refer to the sync'd
// host cluster version
if err := p.transformVolumes(pod.Namespace, tPod.Spec.Volumes); err != nil {
return fmt.Errorf("unable to sync volumes for pod %s/%s: %w", pod.Namespace, pod.Name, err)
logger.Error(err, "Unable to fetch fieldpath annotations for pod")
return err
}
// volumes will often refer to resources in the virtual cluster
// but instead need to refer to the synced host cluster version
p.transformVolumes(pod.Namespace, tPod.Spec.Volumes)
// sync serviceaccount token to a the host cluster
if err := p.transformTokens(ctx, pod, tPod); err != nil {
return fmt.Errorf("unable to transform tokens for pod %s/%s: %w", pod.Namespace, pod.Name, err)
logger.Error(err, "Unable to transform tokens for pod")
return err
}
for i, imagePullSecret := range tPod.Spec.ImagePullSecrets {
@@ -403,17 +456,20 @@ func (p *Provider) createPod(ctx context.Context, pod *corev1.Pod) error {
// inject networking information to the pod including the virtual cluster controlplane endpoint
configureNetworking(tPod, pod.Name, pod.Namespace, p.serverIP, p.dnsIP)
p.logger.Info("creating pod",
"host_namespace", tPod.Namespace, "host_name", tPod.Name,
"virtual_namespace", pod.Namespace, "virtual_name", pod.Name,
)
// set ownerReference to the cluster object
if err := controllerutil.SetControllerReference(&cluster, tPod, p.HostClient.Scheme()); err != nil {
logger.Error(err, "Unable to set owner reference for pod")
return err
}
return p.HostClient.Create(ctx, tPod)
if err := p.HostClient.Create(ctx, tPod); err != nil {
logger.Error(err, "Error creating pod on host cluster")
return err
}
logger.Info("Pod created on host cluster")
return nil
}
// withRetry retries passed function with interval and timeout
@@ -445,7 +501,7 @@ func (p *Provider) withRetry(ctx context.Context, f func(context.Context, *corev
// transformVolumes changes the volumes to the representation in the host cluster. Will return an error
// if one/more volumes couldn't be transformed
func (p *Provider) transformVolumes(podNamespace string, volumes []corev1.Volume) error {
func (p *Provider) transformVolumes(podNamespace string, volumes []corev1.Volume) {
for _, volume := range volumes {
if strings.HasPrefix(volume.Name, kubeAPIAccessPrefix) {
continue
@@ -479,8 +535,6 @@ func (p *Provider) transformVolumes(podNamespace string, volumes []corev1.Volume
}
}
}
return nil
}
// UpdatePod executes updatePod with retry
@@ -489,7 +543,10 @@ func (p *Provider) UpdatePod(ctx context.Context, pod *corev1.Pod) error {
}
func (p *Provider) updatePod(ctx context.Context, pod *corev1.Pod) error {
p.logger.V(1).Info("got a request for update pod")
hostPodName := p.Translator.TranslateName(pod.Namespace, pod.Name)
logger := p.logger.WithValues("namespace", pod.Namespace, "name", pod.Name, "pod", hostPodName)
logger.V(1).Info("UpdatePod")
// Once scheduled a Pod cannot update other fields than the image of the containers, initcontainers and a few others
// See: https://kubernetes.io/docs/concepts/workloads/pods/#pod-update-and-replacement
@@ -498,28 +555,31 @@ func (p *Provider) updatePod(ctx context.Context, pod *corev1.Pod) error {
var currentVirtualPod corev1.Pod
if err := p.VirtualClient.Get(ctx, client.ObjectKeyFromObject(pod), &currentVirtualPod); err != nil {
return fmt.Errorf("unable to get pod to update from virtual cluster: %w", err)
logger.Error(err, "Unable to get pod to update from virtual cluster")
return err
}
hostNamespaceName := types.NamespacedName{
Namespace: p.ClusterNamespace,
Name: p.Translator.TranslateName(pod.Namespace, pod.Name),
Name: hostPodName,
}
var currentHostPod corev1.Pod
logger = logger.WithValues()
var currentHostPod corev1.Pod
if err := p.HostClient.Get(ctx, hostNamespaceName, &currentHostPod); err != nil {
return fmt.Errorf("unable to get pod to update from host cluster: %w", err)
logger.Error(err, "Unable to get Pod to update from host cluster")
return err
}
// Handle ephemeral containers
if !cmp.Equal(currentHostPod.Spec.EphemeralContainers, pod.Spec.EphemeralContainers) {
p.logger.Info("Updating ephemeral containers")
logger.V(1).Info("Updating ephemeral containers")
currentHostPod.Spec.EphemeralContainers = pod.Spec.EphemeralContainers
if _, err := p.CoreClient.Pods(p.ClusterNamespace).UpdateEphemeralContainers(ctx, currentHostPod.Name, &currentHostPod, metav1.UpdateOptions{}); err != nil {
p.logger.Error(err, "error when updating ephemeral containers")
logger.Error(err, "error when updating ephemeral containers")
return err
}
@@ -528,7 +588,8 @@ func (p *Provider) updatePod(ctx context.Context, pod *corev1.Pod) error {
// fieldpath annotations
if err := p.configureFieldPathEnv(&currentVirtualPod, &currentHostPod); err != nil {
return fmt.Errorf("unable to fetch fieldpath annotations for pod %s/%s: %w", pod.Namespace, pod.Name, err)
logger.Error(err, "Unable to fetch fieldpath annotations for Pod")
return err
}
currentVirtualPod.Spec.Containers = updateContainerImages(currentVirtualPod.Spec.Containers, pod.Spec.Containers)
@@ -542,7 +603,8 @@ func (p *Provider) updatePod(ctx context.Context, pod *corev1.Pod) error {
currentVirtualPod.Labels = pod.Labels
if err := p.VirtualClient.Update(ctx, &currentVirtualPod); err != nil {
return fmt.Errorf("unable to update pod in the virtual cluster: %w", err)
logger.Error(err, "Unable to update Pod in virtual cluster")
return err
}
// Update Pod in the host cluster
@@ -558,9 +620,12 @@ func (p *Provider) updatePod(ctx context.Context, pod *corev1.Pod) error {
maps.Copy(currentHostPod.Labels, pod.Labels)
if err := p.HostClient.Update(ctx, &currentHostPod); err != nil {
return fmt.Errorf("unable to update pod in the host cluster: %w", err)
logger.Error(err, "Unable to update Pod in host cluster")
return err
}
logger.Info("Pod updated in virtual and host cluster")
return nil
}
@@ -590,20 +655,24 @@ func (p *Provider) DeletePod(ctx context.Context, pod *corev1.Pod) error {
// expected to call the NotifyPods callback with a terminal pod status where all the containers are in a terminal
// state, as well as the pod. DeletePod may be called multiple times for the same pod.
func (p *Provider) deletePod(ctx context.Context, pod *corev1.Pod) error {
p.logger.Info(fmt.Sprintf("got request to delete pod %s/%s", pod.Namespace, pod.Name))
hostName := p.Translator.TranslateName(pod.Namespace, pod.Name)
hostPodName := p.Translator.TranslateName(pod.Namespace, pod.Name)
err := p.CoreClient.Pods(p.ClusterNamespace).Delete(ctx, hostName, metav1.DeleteOptions{})
logger := p.logger.WithValues("namespace", pod.Namespace, "name", pod.Name, "pod", hostPodName)
logger.V(1).Info("DeletePod")
err := p.CoreClient.Pods(p.ClusterNamespace).Delete(ctx, hostPodName, metav1.DeleteOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
p.logger.Info(fmt.Sprintf("pod %s/%s already deleted from host cluster", p.ClusterNamespace, hostName))
logger.Info("Pod to delete not found in host cluster")
return nil
}
return fmt.Errorf("unable to delete pod %s/%s: %w", pod.Namespace, pod.Name, err)
logger.Error(err, "Error trying to delete pod from host cluster")
return err
}
p.logger.Info(fmt.Sprintf("pod %s/%s deleted from host cluster", p.ClusterNamespace, hostName))
logger.Info("Pod deleted from host cluster")
return nil
}
@@ -613,21 +682,18 @@ func (p *Provider) deletePod(ctx context.Context, pod *corev1.Pod) error {
// concurrently outside of the calling goroutine. Therefore it is recommended
// to return a version after DeepCopy.
func (p *Provider) GetPod(ctx context.Context, namespace, name string) (*corev1.Pod, error) {
p.logger.V(1).Info("got a request for get pod", "namespace", namespace, "name", name)
hostNamespaceName := types.NamespacedName{
Namespace: p.ClusterNamespace,
Name: p.Translator.TranslateName(namespace, name),
hostPodName := p.Translator.TranslateName(namespace, name)
logger := p.logger.WithValues("namespace", namespace, "name", name, "pod", hostPodName)
logger.V(1).Info("GetPod")
pod, err := p.getPodFromHostCluster(ctx, hostPodName)
if err != nil {
logger.Error(err, "Error getting pod from host cluster for GetPod")
return nil, err
}
var pod corev1.Pod
if err := p.HostClient.Get(ctx, hostNamespaceName, &pod); err != nil {
return nil, fmt.Errorf("error when retrieving pod: %w", err)
}
p.Translator.TranslateFrom(&pod)
return &pod, nil
return pod, nil
}
// GetPodStatus retrieves the status of a pod by name from the provider.
@@ -635,28 +701,49 @@ func (p *Provider) GetPod(ctx context.Context, namespace, name string) (*corev1.
// concurrently outside of the calling goroutine. Therefore it is recommended
// to return a version after DeepCopy.
func (p *Provider) GetPodStatus(ctx context.Context, namespace, name string) (*corev1.PodStatus, error) {
p.logger.V(1).Info("got a request for pod status", "namespace", namespace, "name", name)
hostPodName := p.Translator.TranslateName(namespace, name)
pod, err := p.GetPod(ctx, namespace, name)
logger := p.logger.WithValues("namespace", namespace, "name", name, "pod", hostPodName)
logger.V(1).Info("GetPodStatus")
pod, err := p.getPodFromHostCluster(ctx, hostPodName)
if err != nil {
return nil, fmt.Errorf("unable to get pod for status: %w", err)
logger.Error(err, "Error getting pod from host cluster for PodStatus")
return nil, err
}
p.logger.V(1).Info("got pod status", "namespace", namespace, "name", name, "status", pod.Status)
return pod.Status.DeepCopy(), nil
}
func (p *Provider) getPodFromHostCluster(ctx context.Context, hostPodName string) (*corev1.Pod, error) {
key := types.NamespacedName{
Namespace: p.ClusterNamespace,
Name: hostPodName,
}
var pod corev1.Pod
if err := p.HostClient.Get(ctx, key, &pod); err != nil {
return nil, err
}
p.Translator.TranslateFrom(&pod)
return &pod, nil
}
// GetPods retrieves a list of all pods running on the provider (can be cached).
// The Pods returned are expected to be immutable, and may be accessed
// concurrently outside of the calling goroutine. Therefore it is recommended
// to return a version after DeepCopy.
func (p *Provider) GetPods(ctx context.Context) ([]*corev1.Pod, error) {
p.logger.V(1).Info("GetPods")
selector := labels.NewSelector()
requirement, err := labels.NewRequirement(translate.ClusterNameLabel, selection.Equals, []string{p.ClusterName})
if err != nil {
return nil, fmt.Errorf("unable to create label selector: %w", err)
p.logger.Error(err, "Error creating label selector for GetPods")
return nil, err
}
selector = selector.Add(*requirement)
@@ -665,7 +752,8 @@ func (p *Provider) GetPods(ctx context.Context) ([]*corev1.Pod, error) {
err = p.HostClient.List(ctx, &podList, &client.ListOptions{LabelSelector: selector})
if err != nil {
return nil, fmt.Errorf("unable to list pods: %w", err)
p.logger.Error(err, "Error listing pods from host cluster")
return nil, err
}
retPods := []*corev1.Pod{}

View File

@@ -381,6 +381,11 @@ func (s *SharedAgent) role(ctx context.Context) error {
Resources: []string{"persistentvolumeclaims", "pods", "pods/log", "pods/attach", "pods/exec", "pods/ephemeralcontainers", "secrets", "configmaps", "services"},
Verbs: []string{"*"},
},
{
APIGroups: []string{""},
Resources: []string{"events"},
Verbs: []string{"create"},
},
{
APIGroups: []string{"networking.k8s.io"},
Resources: []string{"ingresses"},

View File

@@ -269,8 +269,8 @@ func (c *ClusterReconciler) reconcile(ctx context.Context, cluster *v1beta1.Clus
// if the Version is not specified we will try to use the same Kubernetes version of the host.
// This version is stored in the Status object, and it will not be updated if already set.
if cluster.Spec.Version == "" && cluster.Status.HostVersion == "" {
log.V(1).Info("Cluster version not set. Using host version.")
if cluster.Status.HostVersion == "" {
log.V(1).Info("Cluster host version not set.")
hostVersion, err := c.DiscoveryClient.ServerVersion()
if err != nil {
@@ -278,8 +278,9 @@ func (c *ClusterReconciler) reconcile(ctx context.Context, cluster *v1beta1.Clus
}
// update Status HostVersion
k8sVersion := strings.Split(hostVersion.GitVersion, "+")[0]
cluster.Status.HostVersion = k8sVersion + "-k3s1"
k8sVersion, _, _ := strings.Cut(hostVersion.GitVersion, "+")
k8sVersion, _, _ = strings.Cut(k8sVersion, "-")
cluster.Status.HostVersion = k8sVersion
}
token, err := c.token(ctx, cluster)

View File

@@ -2,7 +2,6 @@ package cluster_test
import (
"context"
"fmt"
"time"
"k8s.io/utils/ptr"
@@ -73,7 +72,6 @@ var _ = Describe("Cluster Controller", Label("controller"), Label("Cluster"), fu
serverVersion, err := k8s.ServerVersion()
Expect(err).To(Not(HaveOccurred()))
expectedHostVersion := fmt.Sprintf("%s-k3s1", serverVersion.GitVersion)
Eventually(func() string {
err := k8sClient.Get(ctx, client.ObjectKeyFromObject(cluster), cluster)
@@ -82,7 +80,7 @@ var _ = Describe("Cluster Controller", Label("controller"), Label("Cluster"), fu
}).
WithTimeout(time.Second * 30).
WithPolling(time.Second).
Should(Equal(expectedHostVersion))
Should(Equal(serverVersion.GitVersion))
// check NetworkPolicy
expectedNetworkPolicy := &networkingv1.NetworkPolicy{

View File

@@ -25,21 +25,24 @@ var Backoff = wait.Backoff{
Jitter: 0.1,
}
// Image returns the rancher/k3s image tagged with the specified Version.
// If Version is empty it will use with the same k8s version of the host cluster,
// stored in the Status object. It will return the latest version as last fallback.
// K3SImage returns the rancher/k3s image tagged with the found K3SVersion.
func K3SImage(cluster *v1beta1.Cluster, k3SImage string) string {
image := k3SImage
imageVersion := "latest"
return k3SImage + ":" + K3SVersion(cluster)
}
// K3SVersion returns the rancher/k3s specified version.
// If empty it will return the k3s version of the Kubernetes version of the host cluster, stored in the Status object.
// Returns the latest version as fallback.
func K3SVersion(cluster *v1beta1.Cluster) string {
if cluster.Spec.Version != "" {
imageVersion = cluster.Spec.Version
} else if cluster.Status.HostVersion != "" {
imageVersion = cluster.Status.HostVersion
return cluster.Spec.Version
}
return image + ":" + imageVersion
if cluster.Status.HostVersion != "" {
return cluster.Status.HostVersion + "-k3s1"
}
return "latest"
}
// SafeConcatNameWithPrefix runs the SafeConcatName with extra prefix.

View File

@@ -51,7 +51,7 @@ func Test_K3S_Image(t *testing.T) {
},
},
},
expectedData: "rancher/k3s:v4.5.6",
expectedData: "rancher/k3s:v4.5.6-k3s1",
},
{
name: "cluster with empty version spec and empty hostVersion status",

View File

@@ -7,10 +7,6 @@ import (
"time"
"k8s.io/apimachinery/pkg/util/rand"
"sigs.k8s.io/controller-runtime/pkg/client"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
@@ -47,12 +43,7 @@ var _ = When("using the k3kcli", Label("cli"), func() {
clusterNamespace := "k3k-" + clusterName
DeferCleanup(func() {
err := k8sClient.Delete(context.Background(), &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: clusterNamespace,
},
})
Expect(client.IgnoreNotFound(err)).To(Not(HaveOccurred()))
DeleteNamespaces(clusterNamespace)
})
_, stderr, err = K3kcli("cluster", "create", clusterName)
@@ -78,6 +69,24 @@ var _ = When("using the k3kcli", Label("cli"), func() {
WithPolling(time.Second).
Should(BeEmpty())
})
It("can create a cluster with the specified kubernetes version", func() {
var (
stderr string
err error
)
clusterName := "cluster-" + rand.String(5)
clusterNamespace := "k3k-" + clusterName
DeferCleanup(func() {
DeleteNamespaces(clusterNamespace)
})
_, stderr, err = K3kcli("cluster", "create", "--version", "v1.33.6-k3s1", clusterName)
Expect(err).To(Not(HaveOccurred()), string(stderr))
Expect(stderr).To(ContainSubstring("You can start using the cluster"))
})
})
When("trying the policy commands", func() {
@@ -122,12 +131,7 @@ var _ = When("using the k3kcli", Label("cli"), func() {
clusterNamespace := "k3k-" + clusterName
DeferCleanup(func() {
err := k8sClient.Delete(context.Background(), &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: clusterNamespace,
},
})
Expect(client.IgnoreNotFound(err)).To(Not(HaveOccurred()))
DeleteNamespaces(clusterNamespace)
})
_, stderr, err = K3kcli("cluster", "create", clusterName)

View File

@@ -13,7 +13,7 @@ import (
. "github.com/onsi/gomega"
)
var _ = When("a cluster with custom certificates is installed with individual cert secrets", Label("e2e"), Label(certificatesTestsLabel), func() {
var _ = When("a cluster with custom certificates is installed with individual cert secrets", Label(e2eTestLabel), Label(certificatesTestsLabel), func() {
var virtualCluster *VirtualCluster
BeforeEach(func() {

View File

@@ -5,7 +5,7 @@ import (
. "github.com/onsi/gomega"
)
var _ = When("two virtual clusters are installed", Label("e2e"), Label(networkingTestsLabel), func() {
var _ = When("two virtual clusters are installed", Label(e2eTestLabel), Label(networkingTestsLabel), func() {
var (
cluster1 *VirtualCluster
cluster2 *VirtualCluster

View File

@@ -19,7 +19,7 @@ import (
. "github.com/onsi/gomega"
)
var _ = When("an ephemeral cluster is installed", Label("e2e"), Label(persistenceTestsLabel), func() {
var _ = When("an ephemeral cluster is installed", Label(e2eTestLabel), Label(persistenceTestsLabel), func() {
var virtualCluster *VirtualCluster
BeforeEach(func() {
@@ -110,7 +110,7 @@ var _ = When("an ephemeral cluster is installed", Label("e2e"), Label(persistenc
})
})
var _ = When("a dynamic cluster is installed", Label("e2e"), Label(persistenceTestsLabel), func() {
var _ = When("a dynamic cluster is installed", Label(e2eTestLabel), Label(persistenceTestsLabel), func() {
var virtualCluster *VirtualCluster
BeforeEach(func() {

176
tests/cluster_pod_test.go Normal file
View File

@@ -0,0 +1,176 @@
package k3k_test
import (
"context"
"time"
"k8s.io/kubernetes/pkg/api/v1/pod"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/rancher/k3k/k3k-kubelet/translate"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
var _ = When("a cluster creates a pod with an invalid configuration", Label(e2eTestLabel), func() {
var (
virtualCluster *VirtualCluster
virtualPod *v1.Pod
)
BeforeEach(func() {
virtualCluster = NewVirtualCluster()
DeferCleanup(func() {
DeleteNamespaces(virtualCluster.Cluster.Namespace)
})
p := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "nginx-",
Namespace: "default",
Labels: map[string]string{
"name": "var-expansion-test",
},
Annotations: map[string]string{
"notmysubpath": "mypath",
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "nginx",
Image: "nginx",
Env: []v1.EnvVar{
{
Name: "POD_NAME",
Value: "nginx",
},
{
Name: "ANNOTATION",
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
FieldPath: "metadata.annotations['mysubpath']",
},
},
},
},
VolumeMounts: []v1.VolumeMount{
{
Name: "workdir",
MountPath: "/volume_mount",
},
{
Name: "workdir",
MountPath: "/subpath_mount",
SubPathExpr: "$(ANNOTATION)/$(POD_NAME)",
},
},
},
},
Volumes: []v1.Volume{{
Name: "workdir",
VolumeSource: v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{}},
}},
},
}
ctx := context.Background()
var err error
virtualPod, err = virtualCluster.Client.CoreV1().Pods(p.Namespace).Create(ctx, p, metav1.CreateOptions{})
Expect(err).To(Not(HaveOccurred()))
})
It("should be in Pending status with the CreateContainerConfigError until we fix the annotation", func() {
ctx := context.Background()
By("Checking the container status of the Pod in the Virtual Cluster")
Eventually(func(g Gomega) {
pod, err := virtualCluster.Client.CoreV1().Pods(virtualPod.Namespace).Get(ctx, virtualPod.Name, metav1.GetOptions{})
g.Expect(err).NotTo(HaveOccurred())
g.Expect(pod.Status.Phase).To(Equal(v1.PodPending))
containerStatuses := pod.Status.ContainerStatuses
g.Expect(containerStatuses).To(HaveLen(1))
waitingState := containerStatuses[0].State.Waiting
g.Expect(waitingState).NotTo(BeNil())
g.Expect(waitingState.Reason).To(Equal("CreateContainerConfigError"))
}).
WithPolling(time.Second).
WithTimeout(time.Minute).
Should(Succeed())
By("Checking the container status of the Pod in the Host Cluster")
Eventually(func(g Gomega) {
translator := translate.NewHostTranslator(virtualCluster.Cluster)
hostPodName := translator.NamespacedName(virtualPod)
pod, err := k8s.CoreV1().Pods(hostPodName.Namespace).Get(ctx, hostPodName.Name, metav1.GetOptions{})
g.Expect(err).NotTo(HaveOccurred())
g.Expect(pod.Status.Phase).To(Equal(v1.PodPending))
containerStatuses := pod.Status.ContainerStatuses
g.Expect(containerStatuses).To(HaveLen(1))
waitingState := containerStatuses[0].State.Waiting
g.Expect(waitingState).NotTo(BeNil())
g.Expect(waitingState.Reason).To(Equal("CreateContainerConfigError"))
}).
WithPolling(time.Second).
WithTimeout(time.Minute).
Should(Succeed())
By("Fixing the annotation")
var err error
virtualPod, err = virtualCluster.Client.CoreV1().Pods(virtualPod.Namespace).Get(ctx, virtualPod.Name, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
virtualPod.Annotations["mysubpath"] = virtualPod.Annotations["notmysubpath"]
delete(virtualPod.Annotations, "notmysubpath")
virtualPod, err = virtualCluster.Client.CoreV1().Pods(virtualPod.Namespace).Update(ctx, virtualPod, metav1.UpdateOptions{})
Expect(err).NotTo(HaveOccurred())
By("Checking the status of the Pod in the Virtual Cluster")
Eventually(func(g Gomega) {
vPod, err := virtualCluster.Client.CoreV1().Pods(virtualPod.Namespace).Get(ctx, virtualPod.Name, metav1.GetOptions{})
g.Expect(err).NotTo(HaveOccurred())
_, cond := pod.GetPodCondition(&vPod.Status, v1.PodReady)
g.Expect(cond).NotTo(BeNil())
g.Expect(cond.Status).To(BeEquivalentTo(metav1.ConditionTrue))
}).
WithPolling(time.Second).
WithTimeout(time.Minute).
Should(Succeed())
By("Checking the status of the Pod in the Host Cluster")
Eventually(func(g Gomega) {
translator := translate.NewHostTranslator(virtualCluster.Cluster)
hostPodName := translator.NamespacedName(virtualPod)
hPod, err := k8s.CoreV1().Pods(hostPodName.Namespace).Get(ctx, hostPodName.Name, metav1.GetOptions{})
g.Expect(err).NotTo(HaveOccurred())
_, cond := pod.GetPodCondition(&hPod.Status, v1.PodReady)
g.Expect(cond).NotTo(BeNil())
g.Expect(cond.Status).To(BeEquivalentTo(metav1.ConditionTrue))
}).
WithPolling(time.Second).
WithTimeout(time.Minute).
Should(Succeed())
})
})

View File

@@ -18,7 +18,7 @@ import (
. "github.com/onsi/gomega"
)
var _ = When("a cluster's status is tracked", Label("e2e"), Label(statusTestsLabel), func() {
var _ = When("a cluster's status is tracked", Label(e2eTestLabel), Label(statusTestsLabel), func() {
var (
namespace *corev1.Namespace
vcp *v1beta1.VirtualClusterPolicy

View File

@@ -14,7 +14,7 @@ import (
. "github.com/onsi/gomega"
)
var _ = When("a shared mode cluster is created", Ordered, Label("e2e"), func() {
var _ = When("a shared mode cluster is created", Ordered, Label(e2eTestLabel), func() {
var (
virtualCluster *VirtualCluster
virtualConfigMap *corev1.ConfigMap

View File

@@ -18,7 +18,7 @@ import (
. "github.com/onsi/gomega"
)
var _ = When("a shared mode cluster update its envs", Label("e2e"), Label(updateTestsLabel), Label(slowTestsLabel), func() {
var _ = When("a shared mode cluster update its envs", Label(e2eTestLabel), Label(updateTestsLabel), Label(slowTestsLabel), func() {
var virtualCluster *VirtualCluster
ctx := context.Background()
BeforeEach(func() {
@@ -162,7 +162,7 @@ var _ = When("a shared mode cluster update its envs", Label("e2e"), Label(update
})
})
var _ = When("a shared mode cluster update its server args", Label("e2e"), Label(updateTestsLabel), Label(slowTestsLabel), func() {
var _ = When("a shared mode cluster update its server args", Label(e2eTestLabel), Label(updateTestsLabel), Label(slowTestsLabel), func() {
var virtualCluster *VirtualCluster
ctx := context.Background()
BeforeEach(func() {
@@ -221,7 +221,7 @@ var _ = When("a shared mode cluster update its server args", Label("e2e"), Label
})
})
var _ = When("a virtual mode cluster update its envs", Label("e2e"), Label(updateTestsLabel), Label(slowTestsLabel), func() {
var _ = When("a virtual mode cluster update its envs", Label(e2eTestLabel), Label(updateTestsLabel), Label(slowTestsLabel), func() {
var virtualCluster *VirtualCluster
ctx := context.Background()
BeforeEach(func() {
@@ -362,7 +362,7 @@ var _ = When("a virtual mode cluster update its envs", Label("e2e"), Label(updat
})
})
var _ = When("a virtual mode cluster update its server args", Label("e2e"), Label(updateTestsLabel), Label(slowTestsLabel), func() {
var _ = When("a virtual mode cluster update its server args", Label(e2eTestLabel), Label(updateTestsLabel), Label(slowTestsLabel), func() {
var virtualCluster *VirtualCluster
ctx := context.Background()
BeforeEach(func() {
@@ -424,7 +424,7 @@ var _ = When("a virtual mode cluster update its server args", Label("e2e"), Labe
})
})
var _ = When("a shared mode cluster update its version", Label("e2e"), Label(updateTestsLabel), Label(slowTestsLabel), func() {
var _ = When("a shared mode cluster update its version", Label(e2eTestLabel), Label(updateTestsLabel), Label(slowTestsLabel), func() {
var (
virtualCluster *VirtualCluster
nginxPod *v1.Pod
@@ -510,7 +510,7 @@ var _ = When("a shared mode cluster update its version", Label("e2e"), Label(upd
})
})
var _ = When("a virtual mode cluster update its version", Label("e2e"), Label(updateTestsLabel), Label(slowTestsLabel), func() {
var _ = When("a virtual mode cluster update its version", Label(e2eTestLabel), Label(updateTestsLabel), Label(slowTestsLabel), func() {
var (
virtualCluster *VirtualCluster
nginxPod *v1.Pod
@@ -611,7 +611,7 @@ var _ = When("a virtual mode cluster update its version", Label("e2e"), Label(up
})
})
var _ = When("a shared mode cluster scales up servers", Label("e2e"), Label(updateTestsLabel), Label(slowTestsLabel), func() {
var _ = When("a shared mode cluster scales up servers", Label(e2eTestLabel), Label(updateTestsLabel), Label(slowTestsLabel), func() {
var (
virtualCluster *VirtualCluster
nginxPod *v1.Pod
@@ -696,7 +696,7 @@ var _ = When("a shared mode cluster scales up servers", Label("e2e"), Label(upda
})
})
var _ = When("a shared mode cluster scales down servers", Label("e2e"), Label(updateTestsLabel), Label(slowTestsLabel), func() {
var _ = When("a shared mode cluster scales down servers", Label(e2eTestLabel), Label(updateTestsLabel), Label(slowTestsLabel), func() {
var (
virtualCluster *VirtualCluster
nginxPod *v1.Pod
@@ -783,7 +783,7 @@ var _ = When("a shared mode cluster scales down servers", Label("e2e"), Label(up
})
})
var _ = When("a virtual mode cluster scales up servers", Label("e2e"), Label(updateTestsLabel), Label(slowTestsLabel), func() {
var _ = When("a virtual mode cluster scales up servers", Label(e2eTestLabel), Label(updateTestsLabel), Label(slowTestsLabel), func() {
var (
virtualCluster *VirtualCluster
nginxPod *v1.Pod
@@ -868,7 +868,7 @@ var _ = When("a virtual mode cluster scales up servers", Label("e2e"), Label(upd
})
})
var _ = When("a virtual mode cluster scales down servers", Label("e2e"), Label(updateTestsLabel), Label(slowTestsLabel), func() {
var _ = When("a virtual mode cluster scales down servers", Label(e2eTestLabel), Label(updateTestsLabel), Label(slowTestsLabel), func() {
var (
virtualCluster *VirtualCluster
nginxPod *v1.Pod

View File

@@ -17,6 +17,7 @@ import (
"k8s.io/kubectl/pkg/scheme"
"k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -113,24 +114,18 @@ func DeleteNamespaces(names ...string) {
defer wg.Done()
defer GinkgoRecover()
deleteNamespace(name)
By(fmt.Sprintf("Deleting namespace %s", name))
err := k8s.CoreV1().Namespaces().Delete(context.Background(), name, metav1.DeleteOptions{
GracePeriodSeconds: ptr.To[int64](0),
})
Expect(client.IgnoreNotFound(err)).To(Not(HaveOccurred()))
}()
}
wg.Wait()
}
func deleteNamespace(name string) {
GinkgoHelper()
By(fmt.Sprintf("Deleting namespace %s", name))
err := k8s.CoreV1().Namespaces().Delete(context.Background(), name, metav1.DeleteOptions{
GracePeriodSeconds: ptr.To[int64](0),
})
Expect(err).To(Not(HaveOccurred()))
}
func NewCluster(namespace string) *v1beta1.Cluster {
return &v1beta1.Cluster{
ObjectMeta: metav1.ObjectMeta{

View File

@@ -42,6 +42,7 @@ import (
const (
k3kNamespace = "k3k-system"
e2eTestLabel = "e2e"
slowTestsLabel = "slow"
updateTestsLabel = "update"
persistenceTestsLabel = "persistence"