Compare commits

...

5 Commits

Author SHA1 Message Date
Hussein Galal
c93cdd0333 Add retry for k3k-kubelet provider functions (#188)
* Add retry for k3k kubelet provider functions

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* Add retry for k3k kubelet provider function

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* go mod tidy

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

---------

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>
2025-01-16 21:34:28 +02:00
Enrico Candino
958d515a59 removed Namespace creation from charts, edited default (#190) 2025-01-16 18:34:17 +01:00
Hussein Galal
9d0c907df2 Fix downward api for status fields in k3k-kubelet (#185)
* Fix downward api for status fields in k3k-kubelet

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* Fixes

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

---------

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>
2025-01-16 02:40:17 +02:00
Enrico Candino
1691d48875 Fix for UpdatePod (#187)
* fix for UpdatePod

* removed print
2025-01-15 18:50:21 +01:00
Enrico Candino
960afe9504 fix error for existing webhook (#186) 2025-01-15 18:43:12 +01:00
11 changed files with 220 additions and 53 deletions

View File

@@ -3,4 +3,4 @@ name: k3k
description: A Helm chart for K3K
type: application
version: 0.1.5-r5
appVersion: 0.2.0
appVersion: v0.2.2-rc2

View File

@@ -4,7 +4,7 @@ metadata:
name: {{ include "k3k.fullname" . }}
labels:
{{- include "k3k.labels" . | nindent 4 }}
namespace: {{ .Values.namespace }}
namespace: {{ .Release.Namespace }}
spec:
replicas: {{ .Values.image.replicaCount }}
selector:
@@ -16,14 +16,14 @@ spec:
{{- include "k3k.selectorLabels" . | nindent 8 }}
spec:
containers:
- image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
- image: "{{ .Values.image.repository }}:{{ default .Chart.AppVersion .Values.image.tag }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
name: {{ .Chart.Name }}
env:
- name: CLUSTER_CIDR
value: {{ .Values.host.clusterCIDR }}
- name: SHARED_AGENT_IMAGE
value: "{{ .Values.sharedAgent.image.repository }}:{{ .Values.sharedAgent.image.tag }}"
value: "{{ .Values.sharedAgent.image.repository }}:{{ default .Chart.AppVersion .Values.sharedAgent.image.tag }}"
ports:
- containerPort: 8080
name: https

View File

@@ -1,4 +0,0 @@
apiVersion: v1
kind: Namespace
metadata:
name: {{ .Values.namespace }}

View File

@@ -11,7 +11,7 @@ roleRef:
subjects:
- kind: ServiceAccount
name: {{ include "k3k.serviceAccountName" . }}
namespace: {{ .Values.namespace }}
namespace: {{ .Release.Namespace }}
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole

View File

@@ -4,7 +4,7 @@ metadata:
name: k3k-webhook
labels:
{{- include "k3k.labels" . | nindent 4 }}
namespace: {{ .Values.namespace }}
namespace: {{ .Release.Namespace }}
spec:
ports:
- port: 443

View File

@@ -5,5 +5,5 @@ metadata:
name: {{ include "k3k.serviceAccountName" . }}
labels:
{{- include "k3k.labels" . | nindent 4 }}
namespace: {{ .Values.namespace }}
{{- end }}
namespace: {{ .Release.Namespace }}
{{- end }}

View File

@@ -1,19 +1,17 @@
replicaCount: 1
namespace: k3k-system
image:
repository: rancher/k3k
pullPolicy: Always
# Overrides the image tag whose default is the chart appVersion.
tag: "v0.2.1"
repository: rancher/k3k
tag: ""
pullPolicy: ""
imagePullSecrets: []
nameOverride: ""
fullnameOverride: ""
host:
# clusterCIDR specifies the clusterCIDR that will be added to the default networkpolicy for clustersets, if not set
# the controller will collect the PodCIDRs of all the nodes on the system.
# clusterCIDR specifies the clusterCIDR that will be added to the default networkpolicy for clustersets, if not set
# the controller will collect the PodCIDRs of all the nodes on the system.
clusterCIDR: ""
serviceAccount:
@@ -26,5 +24,5 @@ serviceAccount:
# configuration related to the shared agent mode in k3k
sharedAgent:
image:
repository: "rancher/k3k"
tag: "k3k-kubelet-dev"
repository: "rancher/k3k-kubelet"
tag: ""

2
go.mod
View File

@@ -16,7 +16,6 @@ require (
github.com/go-logr/zapr v1.3.0
github.com/onsi/ginkgo/v2 v2.20.1
github.com/onsi/gomega v1.36.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_model v0.6.1
github.com/rancher/dynamiclistener v1.27.5
github.com/sirupsen/logrus v1.9.3
@@ -81,6 +80,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.19.1 // indirect
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect

View File

@@ -4,6 +4,8 @@ import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"github.com/rancher/k3k/pkg/controller/cluster/agent"
"github.com/rancher/k3k/pkg/log"
@@ -19,10 +21,11 @@ import (
)
const (
webhookName = "nodename.podmutator.k3k.io"
webhookName = "podmutator.k3k.io"
webhookTimeout = int32(10)
webhookPort = "9443"
webhookPath = "/mutate--v1-pod"
FieldpathField = "k3k.io/fieldpath"
)
type webhookHandler struct {
@@ -36,6 +39,7 @@ type webhookHandler struct {
// AddPodMutatorWebhook will add a mutator webhook to the virtual cluster to
// modify the nodeName of the created pods with the name of the virtual kubelet node name
// as well as remove any status fields of the downward apis env fields
func AddPodMutatorWebhook(ctx context.Context, mgr manager.Manager, hostClient ctrlruntimeclient.Client, clusterName, clusterNamespace, nodeName string, logger *log.Logger) error {
handler := webhookHandler{
client: mgr.GetClient(),
@@ -63,10 +67,28 @@ func (w *webhookHandler) Default(ctx context.Context, obj runtime.Object) error
if !ok {
return fmt.Errorf("invalid request: object was type %t not cluster", obj)
}
w.logger.Infow("recieved request", "Pod", pod.Name, "Namespace", pod.Namespace)
w.logger.Infow("mutator webhook request", "Pod", pod.Name, "Namespace", pod.Namespace)
if pod.Spec.NodeName == "" {
pod.Spec.NodeName = w.nodeName
}
// look for status.* fields in the env
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
}
for i, container := range pod.Spec.Containers {
for j, env := range container.Env {
if env.ValueFrom == nil || env.ValueFrom.FieldRef == nil {
continue
}
fieldPath := env.ValueFrom.FieldRef.FieldPath
if strings.Contains(fieldPath, "status.") {
annotationKey := fmt.Sprintf("%s_%d_%s", FieldpathField, i, env.Name)
pod.Annotations[annotationKey] = fieldPath
pod.Spec.Containers[i].Env = removeEnv(pod.Spec.Containers[i].Env, j)
}
}
}
return nil
}
@@ -118,3 +140,22 @@ func (w *webhookHandler) configuration(ctx context.Context, hostClient ctrlrunti
},
}, nil
}
func removeEnv(envs []v1.EnvVar, i int) []v1.EnvVar {
envs[i] = envs[len(envs)-1]
return envs[:len(envs)-1]
}
func ParseFieldPathAnnotationKey(annotationKey string) (int, string, error) {
s := strings.SplitN(annotationKey, "_", 3)
if len(s) != 3 {
return -1, "", errors.New("fieldpath annotation is not set correctly")
}
containerIndex, err := strconv.Atoi(s[1])
if err != nil {
return -1, "", err
}
envName := s[2]
return containerIndex, envName, nil
}

View File

@@ -25,6 +25,7 @@ import (
"github.com/virtual-kubelet/virtual-kubelet/node/nodeutil"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/authentication/user"
@@ -128,7 +129,9 @@ func newKubelet(ctx context.Context, c *config, logger *k3klog.Logger) (*kubelet
}
logger.Info("adding pod mutator webhook")
if err := k3kwebhook.AddPodMutatorWebhook(ctx, virtualMgr, hostClient, c.ClusterName, c.ClusterNamespace, c.NodeName, logger); err != nil {
return nil, errors.New("unable to add pod mutator webhook for virtual cluster: " + err.Error())
if !apierrors.IsAlreadyExists(err) {
return nil, errors.New("unable to add pod mutator webhook for virtual cluster: " + err.Error())
}
}
logger.Info("adding service syncer controller")

View File

@@ -8,16 +8,18 @@ import (
"net/http"
"strconv"
"strings"
"time"
"github.com/pkg/errors"
dto "github.com/prometheus/client_model/go"
"github.com/rancher/k3k/k3k-kubelet/controller"
"github.com/rancher/k3k/k3k-kubelet/controller/webhook"
"github.com/rancher/k3k/k3k-kubelet/provider/collectors"
"github.com/rancher/k3k/k3k-kubelet/translate"
"github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1"
k3klog "github.com/rancher/k3k/pkg/log"
"github.com/virtual-kubelet/virtual-kubelet/node/api"
"github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1"
"github.com/virtual-kubelet/virtual-kubelet/node/nodeutil"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -26,9 +28,12 @@ import (
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/scheme"
cv1 "k8s.io/client-go/kubernetes/typed/core/v1"
"errors"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/tools/remotecommand"
@@ -38,6 +43,9 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
)
// check at compile time if the Provider implements the nodeutil.Provider interface
var _ nodeutil.Provider = (*Provider)(nil)
// Provider implements nodetuil.Provider from virtual Kubelet.
// TODO: Implement NotifyPods and the required usage so that this can be an async provider
type Provider struct {
@@ -54,6 +62,10 @@ type Provider struct {
logger *k3klog.Logger
}
var (
ErrRetryTimeout = errors.New("provider timed out")
)
func New(hostConfig rest.Config, hostMgr, virtualMgr manager.Manager, logger *k3klog.Logger, namespace, name, serverIP, dnsIP string) (*Provider, error) {
coreClient, err := cv1.NewForConfig(&hostConfig)
if err != nil {
@@ -262,7 +274,7 @@ func (p *Provider) GetStatsSummary(ctx context.Context) (*statsv1alpha1.Summary,
func (p *Provider) GetMetricsResource(ctx context.Context) ([]*dto.MetricFamily, error) {
statsSummary, err := p.GetStatsSummary(ctx)
if err != nil {
return nil, errors.Wrapf(err, "error fetching MetricsResource")
return nil, errors.Join(err, errors.New("error fetching MetricsResource"))
}
registry := compbasemetrics.NewKubeRegistry()
@@ -270,7 +282,7 @@ func (p *Provider) GetMetricsResource(ctx context.Context) ([]*dto.MetricFamily,
metricFamily, err := registry.Gather()
if err != nil {
return nil, errors.Wrapf(err, "error gathering metrics from collector")
return nil, errors.Join(err, errors.New("error gathering metrics from collector"))
}
return metricFamily, nil
}
@@ -306,8 +318,13 @@ func (p *Provider) PortForward(ctx context.Context, namespace, pod string, port
return fw.ForwardPorts()
}
// CreatePod takes a Kubernetes Pod and deploys it within the provider.
// CreatePod executes createPod with retry
func (p *Provider) CreatePod(ctx context.Context, pod *corev1.Pod) error {
return p.withRetry(ctx, p.createPod, pod)
}
// createPod takes a Kubernetes Pod and deploys it within the provider.
func (p *Provider) createPod(ctx context.Context, pod *corev1.Pod) error {
tPod := pod.DeepCopy()
p.Translater.TranslateTo(tPod)
@@ -338,6 +355,10 @@ func (p *Provider) CreatePod(ctx context.Context, pod *corev1.Pod) error {
tPod.Spec.Priority = nil
}
// fieldpath annotations
if err := p.configureFieldPathEnv(pod, tPod); err != nil {
return fmt.Errorf("unable to fetch fieldpath annotations for pod %s/%s: %w", pod.Namespace, pod.Name, err)
}
// volumes will often refer to resources in the virtual cluster, but instead need to refer to the sync'd
// host cluster version
if err := p.transformVolumes(ctx, pod.Namespace, tPod.Spec.Volumes); err != nil {
@@ -355,6 +376,28 @@ func (p *Provider) CreatePod(ctx context.Context, pod *corev1.Pod) error {
return p.HostClient.Create(ctx, tPod)
}
// withRetry retries passed function with interval and timeout
func (p *Provider) withRetry(ctx context.Context, f func(context.Context, *v1.Pod) error, pod *v1.Pod) error {
const (
interval = 2 * time.Second
timeout = 10 * time.Second
)
var allErrors error
// retryFn will retry until the operation succeed, or the timeout occurs
retryFn := func(ctx context.Context) (bool, error) {
if lastErr := f(ctx, pod); lastErr != nil {
// log that the retry failed?
allErrors = errors.Join(allErrors, lastErr)
return false, nil
}
return true, nil
}
if err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, retryFn); err != nil {
return errors.Join(allErrors, ErrRetryTimeout)
}
return nil
}
// transformVolumes changes the volumes to the representation in the host cluster. Will return an error
// if one/more volumes couldn't be transformed
func (p *Provider) transformVolumes(ctx context.Context, podNamespace string, volumes []corev1.Volume) error {
@@ -408,6 +451,7 @@ func (p *Provider) transformVolumes(ctx context.Context, podNamespace string, vo
return nil
}
// syncConfigmap will add the configmap object to the queue of the syncer controller to be synced to the host cluster
func (p *Provider) syncConfigmap(ctx context.Context, podNamespace string, configMapName string, optional bool) error {
var configMap corev1.ConfigMap
nsName := types.NamespacedName{
@@ -429,6 +473,7 @@ func (p *Provider) syncConfigmap(ctx context.Context, podNamespace string, confi
return nil
}
// syncSecret will add the secret object to the queue of the syncer controller to be synced to the host cluster
func (p *Provider) syncSecret(ctx context.Context, podNamespace string, secretName string, optional bool) error {
var secret corev1.Secret
nsName := types.NamespacedName{
@@ -444,38 +489,95 @@ func (p *Provider) syncSecret(ctx context.Context, podNamespace string, secretNa
}
err = p.Handler.AddResource(ctx, &secret)
if err != nil {
return fmt.Errorf("unable to add configmap to sync %s/%s: %w", nsName.Namespace, nsName.Name, err)
return fmt.Errorf("unable to add secret to sync %s/%s: %w", nsName.Namespace, nsName.Name, err)
}
return nil
}
// UpdatePod takes a Kubernetes Pod and updates it within the provider.
// UpdatePod executes updatePod with retry
func (p *Provider) UpdatePod(ctx context.Context, pod *corev1.Pod) error {
hostName := p.Translater.TranslateName(pod.Namespace, pod.Name)
currentPod, err := p.GetPod(ctx, p.ClusterNamespace, hostName)
if err != nil {
return fmt.Errorf("unable to get current pod for update: %w", err)
}
tPod := pod.DeepCopy()
p.Translater.TranslateTo(tPod)
tPod.UID = currentPod.UID
// this is a bit dangerous since another process could have made changes that the user didn't know about
tPod.ResourceVersion = currentPod.ResourceVersion
// Volumes may refer to resources (configmaps/secrets) from the host cluster
// So we need the configuration as calculated during create time
tPod.Spec.Volumes = currentPod.Spec.Volumes
tPod.Spec.Containers = currentPod.Spec.Containers
tPod.Spec.InitContainers = currentPod.Spec.InitContainers
tPod.Spec.NodeName = currentPod.Spec.NodeName
return p.HostClient.Update(ctx, tPod)
return p.withRetry(ctx, p.updatePod, pod)
}
// DeletePod takes a Kubernetes Pod and deletes it from the provider. Once a pod is deleted, the provider is
func (p *Provider) updatePod(ctx context.Context, pod *v1.Pod) error {
p.logger.Debugw("got a request for update pod")
// Once scheduled a Pod cannot update other fields than the image of the containers, initcontainers and a few others
// See: https://kubernetes.io/docs/concepts/workloads/pods/#pod-update-and-replacement
// Update Pod in the virtual cluster
var currentVirtualPod v1.Pod
if err := p.VirtualClient.Get(ctx, client.ObjectKeyFromObject(pod), &currentVirtualPod); err != nil {
return fmt.Errorf("unable to get pod to update from virtual cluster: %w", err)
}
currentVirtualPod.Spec.Containers = updateContainerImages(currentVirtualPod.Spec.Containers, pod.Spec.Containers)
currentVirtualPod.Spec.InitContainers = updateContainerImages(currentVirtualPod.Spec.InitContainers, pod.Spec.InitContainers)
currentVirtualPod.Spec.ActiveDeadlineSeconds = pod.Spec.ActiveDeadlineSeconds
currentVirtualPod.Spec.Tolerations = pod.Spec.Tolerations
// in the virtual cluster we can update also the labels and annotations
currentVirtualPod.Annotations = pod.Annotations
currentVirtualPod.Labels = pod.Labels
if err := p.VirtualClient.Update(ctx, &currentVirtualPod); err != nil {
return fmt.Errorf("unable to update pod in the virtual cluster: %w", err)
}
// Update Pod in the host cluster
hostNamespaceName := types.NamespacedName{
Namespace: p.ClusterNamespace,
Name: p.Translater.TranslateName(pod.Namespace, pod.Name),
}
var currentHostPod corev1.Pod
if err := p.HostClient.Get(ctx, hostNamespaceName, &currentHostPod); err != nil {
return fmt.Errorf("unable to get pod to update from host cluster: %w", err)
}
currentHostPod.Spec.Containers = updateContainerImages(currentHostPod.Spec.Containers, pod.Spec.Containers)
currentHostPod.Spec.InitContainers = updateContainerImages(currentHostPod.Spec.InitContainers, pod.Spec.InitContainers)
// update ActiveDeadlineSeconds and Tolerations
currentHostPod.Spec.ActiveDeadlineSeconds = pod.Spec.ActiveDeadlineSeconds
currentHostPod.Spec.Tolerations = pod.Spec.Tolerations
if err := p.HostClient.Update(ctx, &currentHostPod); err != nil {
return fmt.Errorf("unable to update pod in the host cluster: %w", err)
}
return nil
}
// updateContainerImages will update the images of the original container images with the same name
func updateContainerImages(original, updated []v1.Container) []v1.Container {
newImages := make(map[string]string)
for _, c := range updated {
newImages[c.Name] = c.Image
}
for i, c := range original {
if updatedImage, found := newImages[c.Name]; found {
original[i].Image = updatedImage
}
}
return original
}
// DeletePod executes deletePod with retry
func (p *Provider) DeletePod(ctx context.Context, pod *corev1.Pod) error {
return p.withRetry(ctx, p.deletePod, pod)
}
// deletePod takes a Kubernetes Pod and deletes it from the provider. Once a pod is deleted, the provider is
// expected to call the NotifyPods callback with a terminal pod status where all the containers are in a terminal
// state, as well as the pod. DeletePod may be called multiple times for the same pod.
func (p *Provider) DeletePod(ctx context.Context, pod *corev1.Pod) error {
func (p *Provider) deletePod(ctx context.Context, pod *corev1.Pod) error {
p.logger.Infof("Got request to delete pod %s", pod.Name)
hostName := p.Translater.TranslateName(pod.Namespace, pod.Name)
err := p.CoreClient.Pods(p.ClusterNamespace).Delete(ctx, hostName, metav1.DeleteOptions{})
@@ -602,6 +704,9 @@ func (p *Provider) GetPods(ctx context.Context) ([]*corev1.Pod, error) {
return retPods, nil
}
// configureNetworking will inject network information to each pod to connect them to the
// virtual cluster api server, as well as confiugre DNS information to connect them to the
// synced coredns on the host cluster.
func (p *Provider) configureNetworking(podName, podNamespace string, pod *corev1.Pod) {
// inject networking information to the pod's environment variables
for i := range pod.Spec.Containers {
@@ -640,7 +745,6 @@ func (p *Provider) configureNetworking(podName, podNamespace string, pod *corev1
},
}
}
}
// getSecretsAndConfigmaps retrieves a list of all secrets/configmaps that are in use by a given pod. Useful
@@ -665,3 +769,28 @@ func getSecretsAndConfigmaps(pod *corev1.Pod) ([]string, []string) {
}
return secrets, configMaps
}
// fetchFieldPathAnnotations will retrieve all annotations created by the pod mutator webhook
// to assign env fieldpaths to pods
func (p *Provider) configureFieldPathEnv(pod, tPod *v1.Pod) error {
for name, value := range pod.Annotations {
if strings.Contains(name, webhook.FieldpathField) {
containerIndex, envName, err := webhook.ParseFieldPathAnnotationKey(name)
if err != nil {
return err
}
// re-adding these envs to the pod
tPod.Spec.Containers[containerIndex].Env = append(tPod.Spec.Containers[containerIndex].Env, v1.EnvVar{
Name: envName,
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
FieldPath: value,
},
},
})
// removing the annotation from the pod
delete(tPod.Annotations, name)
}
}
return nil
}