mirror of
https://github.com/rancher/k3k.git
synced 2026-04-01 00:06:56 +00:00
Compare commits
5 Commits
v0.2.2-rc2
...
v0.2.2-rc3
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c93cdd0333 | ||
|
|
958d515a59 | ||
|
|
9d0c907df2 | ||
|
|
1691d48875 | ||
|
|
960afe9504 |
@@ -3,4 +3,4 @@ name: k3k
|
||||
description: A Helm chart for K3K
|
||||
type: application
|
||||
version: 0.1.5-r5
|
||||
appVersion: 0.2.0
|
||||
appVersion: v0.2.2-rc2
|
||||
|
||||
@@ -4,7 +4,7 @@ metadata:
|
||||
name: {{ include "k3k.fullname" . }}
|
||||
labels:
|
||||
{{- include "k3k.labels" . | nindent 4 }}
|
||||
namespace: {{ .Values.namespace }}
|
||||
namespace: {{ .Release.Namespace }}
|
||||
spec:
|
||||
replicas: {{ .Values.image.replicaCount }}
|
||||
selector:
|
||||
@@ -16,14 +16,14 @@ spec:
|
||||
{{- include "k3k.selectorLabels" . | nindent 8 }}
|
||||
spec:
|
||||
containers:
|
||||
- image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
|
||||
- image: "{{ .Values.image.repository }}:{{ default .Chart.AppVersion .Values.image.tag }}"
|
||||
imagePullPolicy: {{ .Values.image.pullPolicy }}
|
||||
name: {{ .Chart.Name }}
|
||||
env:
|
||||
- name: CLUSTER_CIDR
|
||||
value: {{ .Values.host.clusterCIDR }}
|
||||
- name: SHARED_AGENT_IMAGE
|
||||
value: "{{ .Values.sharedAgent.image.repository }}:{{ .Values.sharedAgent.image.tag }}"
|
||||
value: "{{ .Values.sharedAgent.image.repository }}:{{ default .Chart.AppVersion .Values.sharedAgent.image.tag }}"
|
||||
ports:
|
||||
- containerPort: 8080
|
||||
name: https
|
||||
|
||||
@@ -1,4 +0,0 @@
|
||||
apiVersion: v1
|
||||
kind: Namespace
|
||||
metadata:
|
||||
name: {{ .Values.namespace }}
|
||||
@@ -11,7 +11,7 @@ roleRef:
|
||||
subjects:
|
||||
- kind: ServiceAccount
|
||||
name: {{ include "k3k.serviceAccountName" . }}
|
||||
namespace: {{ .Values.namespace }}
|
||||
namespace: {{ .Release.Namespace }}
|
||||
---
|
||||
apiVersion: rbac.authorization.k8s.io/v1
|
||||
kind: ClusterRole
|
||||
|
||||
@@ -4,7 +4,7 @@ metadata:
|
||||
name: k3k-webhook
|
||||
labels:
|
||||
{{- include "k3k.labels" . | nindent 4 }}
|
||||
namespace: {{ .Values.namespace }}
|
||||
namespace: {{ .Release.Namespace }}
|
||||
spec:
|
||||
ports:
|
||||
- port: 443
|
||||
|
||||
@@ -5,5 +5,5 @@ metadata:
|
||||
name: {{ include "k3k.serviceAccountName" . }}
|
||||
labels:
|
||||
{{- include "k3k.labels" . | nindent 4 }}
|
||||
namespace: {{ .Values.namespace }}
|
||||
{{- end }}
|
||||
namespace: {{ .Release.Namespace }}
|
||||
{{- end }}
|
||||
|
||||
@@ -1,19 +1,17 @@
|
||||
replicaCount: 1
|
||||
namespace: k3k-system
|
||||
|
||||
image:
|
||||
repository: rancher/k3k
|
||||
pullPolicy: Always
|
||||
# Overrides the image tag whose default is the chart appVersion.
|
||||
tag: "v0.2.1"
|
||||
repository: rancher/k3k
|
||||
tag: ""
|
||||
pullPolicy: ""
|
||||
|
||||
imagePullSecrets: []
|
||||
nameOverride: ""
|
||||
fullnameOverride: ""
|
||||
|
||||
host:
|
||||
# clusterCIDR specifies the clusterCIDR that will be added to the default networkpolicy for clustersets, if not set
|
||||
# the controller will collect the PodCIDRs of all the nodes on the system.
|
||||
# clusterCIDR specifies the clusterCIDR that will be added to the default networkpolicy for clustersets, if not set
|
||||
# the controller will collect the PodCIDRs of all the nodes on the system.
|
||||
clusterCIDR: ""
|
||||
|
||||
serviceAccount:
|
||||
@@ -26,5 +24,5 @@ serviceAccount:
|
||||
# configuration related to the shared agent mode in k3k
|
||||
sharedAgent:
|
||||
image:
|
||||
repository: "rancher/k3k"
|
||||
tag: "k3k-kubelet-dev"
|
||||
repository: "rancher/k3k-kubelet"
|
||||
tag: ""
|
||||
|
||||
2
go.mod
2
go.mod
@@ -16,7 +16,6 @@ require (
|
||||
github.com/go-logr/zapr v1.3.0
|
||||
github.com/onsi/ginkgo/v2 v2.20.1
|
||||
github.com/onsi/gomega v1.36.0
|
||||
github.com/pkg/errors v0.9.1
|
||||
github.com/prometheus/client_model v0.6.1
|
||||
github.com/rancher/dynamiclistener v1.27.5
|
||||
github.com/sirupsen/logrus v1.9.3
|
||||
@@ -81,6 +80,7 @@ require (
|
||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
||||
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/prometheus/client_golang v1.19.1 // indirect
|
||||
github.com/prometheus/common v0.55.0 // indirect
|
||||
github.com/prometheus/procfs v0.15.1 // indirect
|
||||
|
||||
@@ -4,6 +4,8 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/rancher/k3k/pkg/controller/cluster/agent"
|
||||
"github.com/rancher/k3k/pkg/log"
|
||||
@@ -19,10 +21,11 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
webhookName = "nodename.podmutator.k3k.io"
|
||||
webhookName = "podmutator.k3k.io"
|
||||
webhookTimeout = int32(10)
|
||||
webhookPort = "9443"
|
||||
webhookPath = "/mutate--v1-pod"
|
||||
FieldpathField = "k3k.io/fieldpath"
|
||||
)
|
||||
|
||||
type webhookHandler struct {
|
||||
@@ -36,6 +39,7 @@ type webhookHandler struct {
|
||||
|
||||
// AddPodMutatorWebhook will add a mutator webhook to the virtual cluster to
|
||||
// modify the nodeName of the created pods with the name of the virtual kubelet node name
|
||||
// as well as remove any status fields of the downward apis env fields
|
||||
func AddPodMutatorWebhook(ctx context.Context, mgr manager.Manager, hostClient ctrlruntimeclient.Client, clusterName, clusterNamespace, nodeName string, logger *log.Logger) error {
|
||||
handler := webhookHandler{
|
||||
client: mgr.GetClient(),
|
||||
@@ -63,10 +67,28 @@ func (w *webhookHandler) Default(ctx context.Context, obj runtime.Object) error
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid request: object was type %t not cluster", obj)
|
||||
}
|
||||
w.logger.Infow("recieved request", "Pod", pod.Name, "Namespace", pod.Namespace)
|
||||
w.logger.Infow("mutator webhook request", "Pod", pod.Name, "Namespace", pod.Namespace)
|
||||
if pod.Spec.NodeName == "" {
|
||||
pod.Spec.NodeName = w.nodeName
|
||||
}
|
||||
// look for status.* fields in the env
|
||||
if pod.Annotations == nil {
|
||||
pod.Annotations = make(map[string]string)
|
||||
}
|
||||
for i, container := range pod.Spec.Containers {
|
||||
for j, env := range container.Env {
|
||||
if env.ValueFrom == nil || env.ValueFrom.FieldRef == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
fieldPath := env.ValueFrom.FieldRef.FieldPath
|
||||
if strings.Contains(fieldPath, "status.") {
|
||||
annotationKey := fmt.Sprintf("%s_%d_%s", FieldpathField, i, env.Name)
|
||||
pod.Annotations[annotationKey] = fieldPath
|
||||
pod.Spec.Containers[i].Env = removeEnv(pod.Spec.Containers[i].Env, j)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -118,3 +140,22 @@ func (w *webhookHandler) configuration(ctx context.Context, hostClient ctrlrunti
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func removeEnv(envs []v1.EnvVar, i int) []v1.EnvVar {
|
||||
envs[i] = envs[len(envs)-1]
|
||||
return envs[:len(envs)-1]
|
||||
}
|
||||
|
||||
func ParseFieldPathAnnotationKey(annotationKey string) (int, string, error) {
|
||||
s := strings.SplitN(annotationKey, "_", 3)
|
||||
if len(s) != 3 {
|
||||
return -1, "", errors.New("fieldpath annotation is not set correctly")
|
||||
}
|
||||
containerIndex, err := strconv.Atoi(s[1])
|
||||
if err != nil {
|
||||
return -1, "", err
|
||||
}
|
||||
envName := s[2]
|
||||
|
||||
return containerIndex, envName, nil
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ import (
|
||||
"github.com/virtual-kubelet/virtual-kubelet/node/nodeutil"
|
||||
"go.uber.org/zap"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apiserver/pkg/authentication/user"
|
||||
@@ -128,7 +129,9 @@ func newKubelet(ctx context.Context, c *config, logger *k3klog.Logger) (*kubelet
|
||||
}
|
||||
logger.Info("adding pod mutator webhook")
|
||||
if err := k3kwebhook.AddPodMutatorWebhook(ctx, virtualMgr, hostClient, c.ClusterName, c.ClusterNamespace, c.NodeName, logger); err != nil {
|
||||
return nil, errors.New("unable to add pod mutator webhook for virtual cluster: " + err.Error())
|
||||
if !apierrors.IsAlreadyExists(err) {
|
||||
return nil, errors.New("unable to add pod mutator webhook for virtual cluster: " + err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
logger.Info("adding service syncer controller")
|
||||
|
||||
@@ -8,16 +8,18 @@ import (
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
"github.com/rancher/k3k/k3k-kubelet/controller"
|
||||
"github.com/rancher/k3k/k3k-kubelet/controller/webhook"
|
||||
"github.com/rancher/k3k/k3k-kubelet/provider/collectors"
|
||||
"github.com/rancher/k3k/k3k-kubelet/translate"
|
||||
"github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1"
|
||||
k3klog "github.com/rancher/k3k/pkg/log"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/node/api"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/node/nodeutil"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
@@ -26,9 +28,12 @@ import (
|
||||
"k8s.io/apimachinery/pkg/selection"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
cv1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
|
||||
"errors"
|
||||
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/portforward"
|
||||
"k8s.io/client-go/tools/remotecommand"
|
||||
@@ -38,6 +43,9 @@ import (
|
||||
"sigs.k8s.io/controller-runtime/pkg/manager"
|
||||
)
|
||||
|
||||
// check at compile time if the Provider implements the nodeutil.Provider interface
|
||||
var _ nodeutil.Provider = (*Provider)(nil)
|
||||
|
||||
// Provider implements nodetuil.Provider from virtual Kubelet.
|
||||
// TODO: Implement NotifyPods and the required usage so that this can be an async provider
|
||||
type Provider struct {
|
||||
@@ -54,6 +62,10 @@ type Provider struct {
|
||||
logger *k3klog.Logger
|
||||
}
|
||||
|
||||
var (
|
||||
ErrRetryTimeout = errors.New("provider timed out")
|
||||
)
|
||||
|
||||
func New(hostConfig rest.Config, hostMgr, virtualMgr manager.Manager, logger *k3klog.Logger, namespace, name, serverIP, dnsIP string) (*Provider, error) {
|
||||
coreClient, err := cv1.NewForConfig(&hostConfig)
|
||||
if err != nil {
|
||||
@@ -262,7 +274,7 @@ func (p *Provider) GetStatsSummary(ctx context.Context) (*statsv1alpha1.Summary,
|
||||
func (p *Provider) GetMetricsResource(ctx context.Context) ([]*dto.MetricFamily, error) {
|
||||
statsSummary, err := p.GetStatsSummary(ctx)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "error fetching MetricsResource")
|
||||
return nil, errors.Join(err, errors.New("error fetching MetricsResource"))
|
||||
}
|
||||
|
||||
registry := compbasemetrics.NewKubeRegistry()
|
||||
@@ -270,7 +282,7 @@ func (p *Provider) GetMetricsResource(ctx context.Context) ([]*dto.MetricFamily,
|
||||
|
||||
metricFamily, err := registry.Gather()
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "error gathering metrics from collector")
|
||||
return nil, errors.Join(err, errors.New("error gathering metrics from collector"))
|
||||
}
|
||||
return metricFamily, nil
|
||||
}
|
||||
@@ -306,8 +318,13 @@ func (p *Provider) PortForward(ctx context.Context, namespace, pod string, port
|
||||
return fw.ForwardPorts()
|
||||
}
|
||||
|
||||
// CreatePod takes a Kubernetes Pod and deploys it within the provider.
|
||||
// CreatePod executes createPod with retry
|
||||
func (p *Provider) CreatePod(ctx context.Context, pod *corev1.Pod) error {
|
||||
return p.withRetry(ctx, p.createPod, pod)
|
||||
}
|
||||
|
||||
// createPod takes a Kubernetes Pod and deploys it within the provider.
|
||||
func (p *Provider) createPod(ctx context.Context, pod *corev1.Pod) error {
|
||||
tPod := pod.DeepCopy()
|
||||
p.Translater.TranslateTo(tPod)
|
||||
|
||||
@@ -338,6 +355,10 @@ func (p *Provider) CreatePod(ctx context.Context, pod *corev1.Pod) error {
|
||||
tPod.Spec.Priority = nil
|
||||
}
|
||||
|
||||
// fieldpath annotations
|
||||
if err := p.configureFieldPathEnv(pod, tPod); err != nil {
|
||||
return fmt.Errorf("unable to fetch fieldpath annotations for pod %s/%s: %w", pod.Namespace, pod.Name, err)
|
||||
}
|
||||
// volumes will often refer to resources in the virtual cluster, but instead need to refer to the sync'd
|
||||
// host cluster version
|
||||
if err := p.transformVolumes(ctx, pod.Namespace, tPod.Spec.Volumes); err != nil {
|
||||
@@ -355,6 +376,28 @@ func (p *Provider) CreatePod(ctx context.Context, pod *corev1.Pod) error {
|
||||
return p.HostClient.Create(ctx, tPod)
|
||||
}
|
||||
|
||||
// withRetry retries passed function with interval and timeout
|
||||
func (p *Provider) withRetry(ctx context.Context, f func(context.Context, *v1.Pod) error, pod *v1.Pod) error {
|
||||
const (
|
||||
interval = 2 * time.Second
|
||||
timeout = 10 * time.Second
|
||||
)
|
||||
var allErrors error
|
||||
// retryFn will retry until the operation succeed, or the timeout occurs
|
||||
retryFn := func(ctx context.Context) (bool, error) {
|
||||
if lastErr := f(ctx, pod); lastErr != nil {
|
||||
// log that the retry failed?
|
||||
allErrors = errors.Join(allErrors, lastErr)
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
if err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, retryFn); err != nil {
|
||||
return errors.Join(allErrors, ErrRetryTimeout)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// transformVolumes changes the volumes to the representation in the host cluster. Will return an error
|
||||
// if one/more volumes couldn't be transformed
|
||||
func (p *Provider) transformVolumes(ctx context.Context, podNamespace string, volumes []corev1.Volume) error {
|
||||
@@ -408,6 +451,7 @@ func (p *Provider) transformVolumes(ctx context.Context, podNamespace string, vo
|
||||
return nil
|
||||
}
|
||||
|
||||
// syncConfigmap will add the configmap object to the queue of the syncer controller to be synced to the host cluster
|
||||
func (p *Provider) syncConfigmap(ctx context.Context, podNamespace string, configMapName string, optional bool) error {
|
||||
var configMap corev1.ConfigMap
|
||||
nsName := types.NamespacedName{
|
||||
@@ -429,6 +473,7 @@ func (p *Provider) syncConfigmap(ctx context.Context, podNamespace string, confi
|
||||
return nil
|
||||
}
|
||||
|
||||
// syncSecret will add the secret object to the queue of the syncer controller to be synced to the host cluster
|
||||
func (p *Provider) syncSecret(ctx context.Context, podNamespace string, secretName string, optional bool) error {
|
||||
var secret corev1.Secret
|
||||
nsName := types.NamespacedName{
|
||||
@@ -444,38 +489,95 @@ func (p *Provider) syncSecret(ctx context.Context, podNamespace string, secretNa
|
||||
}
|
||||
err = p.Handler.AddResource(ctx, &secret)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to add configmap to sync %s/%s: %w", nsName.Namespace, nsName.Name, err)
|
||||
return fmt.Errorf("unable to add secret to sync %s/%s: %w", nsName.Namespace, nsName.Name, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdatePod takes a Kubernetes Pod and updates it within the provider.
|
||||
// UpdatePod executes updatePod with retry
|
||||
func (p *Provider) UpdatePod(ctx context.Context, pod *corev1.Pod) error {
|
||||
hostName := p.Translater.TranslateName(pod.Namespace, pod.Name)
|
||||
currentPod, err := p.GetPod(ctx, p.ClusterNamespace, hostName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to get current pod for update: %w", err)
|
||||
}
|
||||
tPod := pod.DeepCopy()
|
||||
p.Translater.TranslateTo(tPod)
|
||||
tPod.UID = currentPod.UID
|
||||
// this is a bit dangerous since another process could have made changes that the user didn't know about
|
||||
tPod.ResourceVersion = currentPod.ResourceVersion
|
||||
|
||||
// Volumes may refer to resources (configmaps/secrets) from the host cluster
|
||||
// So we need the configuration as calculated during create time
|
||||
tPod.Spec.Volumes = currentPod.Spec.Volumes
|
||||
tPod.Spec.Containers = currentPod.Spec.Containers
|
||||
tPod.Spec.InitContainers = currentPod.Spec.InitContainers
|
||||
tPod.Spec.NodeName = currentPod.Spec.NodeName
|
||||
|
||||
return p.HostClient.Update(ctx, tPod)
|
||||
return p.withRetry(ctx, p.updatePod, pod)
|
||||
}
|
||||
|
||||
// DeletePod takes a Kubernetes Pod and deletes it from the provider. Once a pod is deleted, the provider is
|
||||
func (p *Provider) updatePod(ctx context.Context, pod *v1.Pod) error {
|
||||
p.logger.Debugw("got a request for update pod")
|
||||
|
||||
// Once scheduled a Pod cannot update other fields than the image of the containers, initcontainers and a few others
|
||||
// See: https://kubernetes.io/docs/concepts/workloads/pods/#pod-update-and-replacement
|
||||
|
||||
// Update Pod in the virtual cluster
|
||||
|
||||
var currentVirtualPod v1.Pod
|
||||
if err := p.VirtualClient.Get(ctx, client.ObjectKeyFromObject(pod), ¤tVirtualPod); err != nil {
|
||||
return fmt.Errorf("unable to get pod to update from virtual cluster: %w", err)
|
||||
}
|
||||
|
||||
currentVirtualPod.Spec.Containers = updateContainerImages(currentVirtualPod.Spec.Containers, pod.Spec.Containers)
|
||||
currentVirtualPod.Spec.InitContainers = updateContainerImages(currentVirtualPod.Spec.InitContainers, pod.Spec.InitContainers)
|
||||
|
||||
currentVirtualPod.Spec.ActiveDeadlineSeconds = pod.Spec.ActiveDeadlineSeconds
|
||||
currentVirtualPod.Spec.Tolerations = pod.Spec.Tolerations
|
||||
|
||||
// in the virtual cluster we can update also the labels and annotations
|
||||
currentVirtualPod.Annotations = pod.Annotations
|
||||
currentVirtualPod.Labels = pod.Labels
|
||||
|
||||
if err := p.VirtualClient.Update(ctx, ¤tVirtualPod); err != nil {
|
||||
return fmt.Errorf("unable to update pod in the virtual cluster: %w", err)
|
||||
}
|
||||
|
||||
// Update Pod in the host cluster
|
||||
|
||||
hostNamespaceName := types.NamespacedName{
|
||||
Namespace: p.ClusterNamespace,
|
||||
Name: p.Translater.TranslateName(pod.Namespace, pod.Name),
|
||||
}
|
||||
|
||||
var currentHostPod corev1.Pod
|
||||
if err := p.HostClient.Get(ctx, hostNamespaceName, ¤tHostPod); err != nil {
|
||||
return fmt.Errorf("unable to get pod to update from host cluster: %w", err)
|
||||
}
|
||||
|
||||
currentHostPod.Spec.Containers = updateContainerImages(currentHostPod.Spec.Containers, pod.Spec.Containers)
|
||||
currentHostPod.Spec.InitContainers = updateContainerImages(currentHostPod.Spec.InitContainers, pod.Spec.InitContainers)
|
||||
|
||||
// update ActiveDeadlineSeconds and Tolerations
|
||||
currentHostPod.Spec.ActiveDeadlineSeconds = pod.Spec.ActiveDeadlineSeconds
|
||||
currentHostPod.Spec.Tolerations = pod.Spec.Tolerations
|
||||
|
||||
if err := p.HostClient.Update(ctx, ¤tHostPod); err != nil {
|
||||
return fmt.Errorf("unable to update pod in the host cluster: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateContainerImages will update the images of the original container images with the same name
|
||||
func updateContainerImages(original, updated []v1.Container) []v1.Container {
|
||||
newImages := make(map[string]string)
|
||||
|
||||
for _, c := range updated {
|
||||
newImages[c.Name] = c.Image
|
||||
}
|
||||
|
||||
for i, c := range original {
|
||||
if updatedImage, found := newImages[c.Name]; found {
|
||||
original[i].Image = updatedImage
|
||||
}
|
||||
}
|
||||
|
||||
return original
|
||||
}
|
||||
|
||||
// DeletePod executes deletePod with retry
|
||||
func (p *Provider) DeletePod(ctx context.Context, pod *corev1.Pod) error {
|
||||
return p.withRetry(ctx, p.deletePod, pod)
|
||||
}
|
||||
|
||||
// deletePod takes a Kubernetes Pod and deletes it from the provider. Once a pod is deleted, the provider is
|
||||
// expected to call the NotifyPods callback with a terminal pod status where all the containers are in a terminal
|
||||
// state, as well as the pod. DeletePod may be called multiple times for the same pod.
|
||||
func (p *Provider) DeletePod(ctx context.Context, pod *corev1.Pod) error {
|
||||
func (p *Provider) deletePod(ctx context.Context, pod *corev1.Pod) error {
|
||||
p.logger.Infof("Got request to delete pod %s", pod.Name)
|
||||
hostName := p.Translater.TranslateName(pod.Namespace, pod.Name)
|
||||
err := p.CoreClient.Pods(p.ClusterNamespace).Delete(ctx, hostName, metav1.DeleteOptions{})
|
||||
@@ -602,6 +704,9 @@ func (p *Provider) GetPods(ctx context.Context) ([]*corev1.Pod, error) {
|
||||
return retPods, nil
|
||||
}
|
||||
|
||||
// configureNetworking will inject network information to each pod to connect them to the
|
||||
// virtual cluster api server, as well as confiugre DNS information to connect them to the
|
||||
// synced coredns on the host cluster.
|
||||
func (p *Provider) configureNetworking(podName, podNamespace string, pod *corev1.Pod) {
|
||||
// inject networking information to the pod's environment variables
|
||||
for i := range pod.Spec.Containers {
|
||||
@@ -640,7 +745,6 @@ func (p *Provider) configureNetworking(podName, podNamespace string, pod *corev1
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// getSecretsAndConfigmaps retrieves a list of all secrets/configmaps that are in use by a given pod. Useful
|
||||
@@ -665,3 +769,28 @@ func getSecretsAndConfigmaps(pod *corev1.Pod) ([]string, []string) {
|
||||
}
|
||||
return secrets, configMaps
|
||||
}
|
||||
|
||||
// fetchFieldPathAnnotations will retrieve all annotations created by the pod mutator webhook
|
||||
// to assign env fieldpaths to pods
|
||||
func (p *Provider) configureFieldPathEnv(pod, tPod *v1.Pod) error {
|
||||
for name, value := range pod.Annotations {
|
||||
if strings.Contains(name, webhook.FieldpathField) {
|
||||
containerIndex, envName, err := webhook.ParseFieldPathAnnotationKey(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// re-adding these envs to the pod
|
||||
tPod.Spec.Containers[containerIndex].Env = append(tPod.Spec.Containers[containerIndex].Env, v1.EnvVar{
|
||||
Name: envName,
|
||||
ValueFrom: &v1.EnvVarSource{
|
||||
FieldRef: &v1.ObjectFieldSelector{
|
||||
FieldPath: value,
|
||||
},
|
||||
},
|
||||
})
|
||||
// removing the annotation from the pod
|
||||
delete(tPod.Annotations, name)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user