Fix create events rbac (#575)

* cleanup logs in kubelet provider

* added events create rbac to kubelet

* fix lint, moved fetch pod logic in separate func
This commit is contained in:
Enrico Candino
2025-11-25 13:48:04 +01:00
committed by GitHub
parent 521ff17ef6
commit 5c49c3d6b7
2 changed files with 168 additions and 75 deletions

View File

@@ -86,7 +86,7 @@ func New(hostConfig rest.Config, hostMgr, virtualMgr manager.Manager, logger log
CoreClient: coreClient,
ClusterNamespace: namespace,
ClusterName: name,
logger: logger,
logger: logger.WithValues("cluster", name),
serverIP: serverIP,
dnsIP: dnsIP,
}
@@ -95,8 +95,12 @@ func New(hostConfig rest.Config, hostMgr, virtualMgr manager.Manager, logger log
}
// GetContainerLogs retrieves the logs of a container by name from the provider.
func (p *Provider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) {
hostPodName := p.Translator.TranslateName(namespace, podName)
func (p *Provider) GetContainerLogs(ctx context.Context, namespace, name, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) {
hostPodName := p.Translator.TranslateName(namespace, name)
logger := p.logger.WithValues("namespace", namespace, "name", name, "pod", hostPodName, "container", containerName)
logger.V(1).Info("GetContainerLogs")
options := corev1.PodLogOptions{
Container: containerName,
Timestamps: opts.Timestamps,
@@ -125,20 +129,27 @@ func (p *Provider) GetContainerLogs(ctx context.Context, namespace, podName, con
}
closer, err := p.CoreClient.Pods(p.ClusterNamespace).GetLogs(hostPodName, &options).Stream(ctx)
p.logger.Error(err, fmt.Sprintf("got error when getting logs for %s in %s", hostPodName, p.ClusterNamespace))
if err != nil {
logger.Error(err, "Error getting logs from container")
}
return closer, err
}
// RunInContainer executes a command in a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
func (p *Provider) RunInContainer(ctx context.Context, namespace, podName, containerName string, cmd []string, attach api.AttachIO) error {
hostPodName := p.Translator.TranslateName(namespace, podName)
func (p *Provider) RunInContainer(ctx context.Context, namespace, name, containerName string, cmd []string, attach api.AttachIO) error {
hostPodName := p.Translator.TranslateName(namespace, name)
logger := p.logger.WithValues("namespace", namespace, "name", name, "pod", hostPodName, "container", containerName)
logger.V(1).Info("RunInContainer")
req := p.CoreClient.RESTClient().Post().
Resource("pods").
Name(hostPodName).
Namespace(p.ClusterNamespace).
SubResource("exec")
req.VersionedParams(&corev1.PodExecOptions{
Container: containerName,
Command: cmd,
@@ -150,10 +161,11 @@ func (p *Provider) RunInContainer(ctx context.Context, namespace, podName, conta
exec, err := remotecommand.NewSPDYExecutor(&p.ClientConfig, http.MethodPost, req.URL())
if err != nil {
logger.Error(err, "Error creating SPDY executor")
return err
}
return exec.StreamWithContext(ctx, remotecommand.StreamOptions{
if err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdin: attach.Stdin(),
Stdout: attach.Stdout(),
Stderr: attach.Stderr(),
@@ -161,18 +173,28 @@ func (p *Provider) RunInContainer(ctx context.Context, namespace, podName, conta
TerminalSizeQueue: &translatorSizeQueue{
resizeChan: attach.Resize(),
},
})
}); err != nil {
logger.Error(err, "Error while executing command in container")
return err
}
return nil
}
// AttachToContainer attaches to the executing process of a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
func (p *Provider) AttachToContainer(ctx context.Context, namespace, podName, containerName string, attach api.AttachIO) error {
hostPodName := p.Translator.TranslateName(namespace, podName)
func (p *Provider) AttachToContainer(ctx context.Context, namespace, name, containerName string, attach api.AttachIO) error {
hostPodName := p.Translator.TranslateName(namespace, name)
logger := p.logger.WithValues("namespace", namespace, "name", name, "pod", hostPodName, "container", containerName)
logger.V(1).Info("AttachToContainer")
req := p.CoreClient.RESTClient().Post().
Resource("pods").
Name(hostPodName).
Namespace(p.ClusterNamespace).
SubResource("attach")
req.VersionedParams(&corev1.PodAttachOptions{
Container: containerName,
TTY: attach.TTY(),
@@ -183,10 +205,11 @@ func (p *Provider) AttachToContainer(ctx context.Context, namespace, podName, co
exec, err := remotecommand.NewSPDYExecutor(&p.ClientConfig, http.MethodPost, req.URL())
if err != nil {
logger.Error(err, "Error creating SPDY executor")
return err
}
return exec.StreamWithContext(ctx, remotecommand.StreamOptions{
if err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdin: attach.Stdin(),
Stdout: attach.Stdout(),
Stderr: attach.Stderr(),
@@ -194,7 +217,12 @@ func (p *Provider) AttachToContainer(ctx context.Context, namespace, podName, co
TerminalSizeQueue: &translatorSizeQueue{
resizeChan: attach.Resize(),
},
})
}); err != nil {
logger.Error(err, "Error while attaching to container")
return err
}
return nil
}
// GetStatsSummary gets the stats for the node, including running pods
@@ -203,7 +231,8 @@ func (p *Provider) GetStatsSummary(ctx context.Context) (*stats.Summary, error)
nodeList := &corev1.NodeList{}
if err := p.CoreClient.RESTClient().Get().Resource("nodes").Do(ctx).Into(nodeList); err != nil {
return nil, fmt.Errorf("unable to get nodes of cluster %s in namespace %s: %w", p.ClusterName, p.ClusterNamespace, err)
p.logger.Error(err, "Unable to get nodes of cluster")
return nil, err
}
// fetch the stats from all the nodes
@@ -221,14 +250,13 @@ func (p *Provider) GetStatsSummary(ctx context.Context) (*stats.Summary, error)
Suffix("stats/summary").
DoRaw(ctx)
if err != nil {
return nil, fmt.Errorf(
"unable to get stats of node '%s', from cluster %s in namespace %s: %w",
n.Name, p.ClusterName, p.ClusterNamespace, err,
)
p.logger.Error(err, "Unable to get stats/summary from cluster node", "node", n.Name)
return nil, err
}
stats := &stats.Summary{}
if err := json.Unmarshal(res, stats); err != nil {
p.logger.Error(err, "Error unmarshaling stats/summary from cluster node", "node", n.Name)
return nil, err
}
@@ -241,6 +269,7 @@ func (p *Provider) GetStatsSummary(ctx context.Context) (*stats.Summary, error)
pods, err := p.GetPods(ctx)
if err != nil {
p.logger.Error(err, "Error getting pods from cluster for stats")
return nil, err
}
@@ -278,9 +307,12 @@ func (p *Provider) GetStatsSummary(ctx context.Context) (*stats.Summary, error)
// GetMetricsResource gets the metrics for the node, including running pods
func (p *Provider) GetMetricsResource(ctx context.Context) ([]*dto.MetricFamily, error) {
p.logger.V(1).Info("GetMetricsResource")
statsSummary, err := p.GetStatsSummary(ctx)
if err != nil {
return nil, errors.Join(err, errors.New("error fetching MetricsResource"))
p.logger.Error(err, "Error getting stats summary from cluster for metrics")
return nil, err
}
registry := compbasemetrics.NewKubeRegistry()
@@ -288,15 +320,20 @@ func (p *Provider) GetMetricsResource(ctx context.Context) ([]*dto.MetricFamily,
metricFamily, err := registry.Gather()
if err != nil {
return nil, errors.Join(err, errors.New("error gathering metrics from collector"))
p.logger.Error(err, "Error gathering metrics from collector")
return nil, err
}
return metricFamily, nil
}
// PortForward forwards a local port to a port on the pod
func (p *Provider) PortForward(ctx context.Context, namespace, pod string, port int32, stream io.ReadWriteCloser) error {
hostPodName := p.Translator.TranslateName(namespace, pod)
func (p *Provider) PortForward(ctx context.Context, namespace, name string, port int32, stream io.ReadWriteCloser) error {
hostPodName := p.Translator.TranslateName(namespace, name)
logger := p.logger.WithValues("namespace", namespace, "name", name, "pod", hostPodName, "port", port)
logger.V(1).Info("PortForward")
req := p.CoreClient.RESTClient().Post().
Resource("pods").
Name(hostPodName).
@@ -305,6 +342,7 @@ func (p *Provider) PortForward(ctx context.Context, namespace, pod string, port
transport, upgrader, err := spdy.RoundTripperFor(&p.ClientConfig)
if err != nil {
logger.Error(err, "Error creating RoundTripper for PortForward")
return err
}
@@ -318,10 +356,16 @@ func (p *Provider) PortForward(ctx context.Context, namespace, pod string, port
// so more work is needed to detect a close and handle that appropriately.
fw, err := portforward.New(dialer, []string{portAsString}, stopChannel, readyChannel, stream, stream)
if err != nil {
logger.Error(err, "Error creating new PortForward")
return err
}
return fw.ForwardPorts()
if err := fw.ForwardPorts(); err != nil {
logger.Error(err, "Error forwarding ports")
return err
}
return nil
}
// CreatePod executes createPod with retry
@@ -331,16 +375,22 @@ func (p *Provider) CreatePod(ctx context.Context, pod *corev1.Pod) error {
// createPod takes a Kubernetes Pod and deploys it within the provider.
func (p *Provider) createPod(ctx context.Context, pod *corev1.Pod) error {
logger := p.logger.WithValues("namespace", pod.Namespace, "name", pod.Name)
logger.V(1).Info("CreatePod")
// fieldPath envs are not being translated correctly using the virtual kubelet pod controller
// as a workaround we will try to fetch the pod from the virtual cluster and copy over the envSource
var sourcePod corev1.Pod
if err := p.VirtualClient.Get(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, &sourcePod); err != nil {
logger.Error(err, "Error getting Pod from Virtual Cluster")
return err
}
tPod := sourcePod.DeepCopy()
p.Translator.TranslateTo(tPod)
logger = p.logger.WithValues("pod", tPod.Name)
// get Cluster definition
clusterKey := types.NamespacedName{
Namespace: p.ClusterNamespace,
@@ -350,7 +400,8 @@ func (p *Provider) createPod(ctx context.Context, pod *corev1.Pod) error {
var cluster v1beta1.Cluster
if err := p.HostClient.Get(ctx, clusterKey, &cluster); err != nil {
return fmt.Errorf("unable to get cluster %s in namespace %s: %w", p.ClusterName, p.ClusterNamespace, err)
logger.Error(err, "Error getting Virtual Cluster definition")
return err
}
// these values shouldn't be set on create
@@ -384,16 +435,18 @@ func (p *Provider) createPod(ctx context.Context, pod *corev1.Pod) error {
// fieldpath annotations
if err := p.configureFieldPathEnv(&sourcePod, tPod); err != nil {
return fmt.Errorf("unable to fetch fieldpath annotations for pod %s/%s: %w", pod.Namespace, pod.Name, err)
}
// volumes will often refer to resources in the virtual cluster, but instead need to refer to the sync'd
// host cluster version
if err := p.transformVolumes(pod.Namespace, tPod.Spec.Volumes); err != nil {
return fmt.Errorf("unable to sync volumes for pod %s/%s: %w", pod.Namespace, pod.Name, err)
logger.Error(err, "Unable to fetch fieldpath annotations for pod")
return err
}
// volumes will often refer to resources in the virtual cluster
// but instead need to refer to the synced host cluster version
p.transformVolumes(pod.Namespace, tPod.Spec.Volumes)
// sync serviceaccount token to a the host cluster
if err := p.transformTokens(ctx, pod, tPod); err != nil {
return fmt.Errorf("unable to transform tokens for pod %s/%s: %w", pod.Namespace, pod.Name, err)
logger.Error(err, "Unable to transform tokens for pod")
return err
}
for i, imagePullSecret := range tPod.Spec.ImagePullSecrets {
@@ -403,17 +456,20 @@ func (p *Provider) createPod(ctx context.Context, pod *corev1.Pod) error {
// inject networking information to the pod including the virtual cluster controlplane endpoint
configureNetworking(tPod, pod.Name, pod.Namespace, p.serverIP, p.dnsIP)
p.logger.Info("creating pod",
"host_namespace", tPod.Namespace, "host_name", tPod.Name,
"virtual_namespace", pod.Namespace, "virtual_name", pod.Name,
)
// set ownerReference to the cluster object
if err := controllerutil.SetControllerReference(&cluster, tPod, p.HostClient.Scheme()); err != nil {
logger.Error(err, "Unable to set owner reference for pod")
return err
}
return p.HostClient.Create(ctx, tPod)
if err := p.HostClient.Create(ctx, tPod); err != nil {
logger.Error(err, "Error creating pod on host cluster")
return err
}
logger.Info("Pod created on host cluster")
return nil
}
// withRetry retries passed function with interval and timeout
@@ -445,7 +501,7 @@ func (p *Provider) withRetry(ctx context.Context, f func(context.Context, *corev
// transformVolumes changes the volumes to the representation in the host cluster. Will return an error
// if one/more volumes couldn't be transformed
func (p *Provider) transformVolumes(podNamespace string, volumes []corev1.Volume) error {
func (p *Provider) transformVolumes(podNamespace string, volumes []corev1.Volume) {
for _, volume := range volumes {
if strings.HasPrefix(volume.Name, kubeAPIAccessPrefix) {
continue
@@ -479,8 +535,6 @@ func (p *Provider) transformVolumes(podNamespace string, volumes []corev1.Volume
}
}
}
return nil
}
// UpdatePod executes updatePod with retry
@@ -489,7 +543,10 @@ func (p *Provider) UpdatePod(ctx context.Context, pod *corev1.Pod) error {
}
func (p *Provider) updatePod(ctx context.Context, pod *corev1.Pod) error {
p.logger.V(1).Info("got a request for update pod")
hostPodName := p.Translator.TranslateName(pod.Namespace, pod.Name)
logger := p.logger.WithValues("namespace", pod.Namespace, "name", pod.Name, "pod", hostPodName)
logger.V(1).Info("UpdatePod")
// Once scheduled a Pod cannot update other fields than the image of the containers, initcontainers and a few others
// See: https://kubernetes.io/docs/concepts/workloads/pods/#pod-update-and-replacement
@@ -498,28 +555,31 @@ func (p *Provider) updatePod(ctx context.Context, pod *corev1.Pod) error {
var currentVirtualPod corev1.Pod
if err := p.VirtualClient.Get(ctx, client.ObjectKeyFromObject(pod), &currentVirtualPod); err != nil {
return fmt.Errorf("unable to get pod to update from virtual cluster: %w", err)
logger.Error(err, "Unable to get pod to update from virtual cluster")
return err
}
hostNamespaceName := types.NamespacedName{
Namespace: p.ClusterNamespace,
Name: p.Translator.TranslateName(pod.Namespace, pod.Name),
Name: hostPodName,
}
var currentHostPod corev1.Pod
logger = logger.WithValues()
var currentHostPod corev1.Pod
if err := p.HostClient.Get(ctx, hostNamespaceName, &currentHostPod); err != nil {
return fmt.Errorf("unable to get pod to update from host cluster: %w", err)
logger.Error(err, "Unable to get Pod to update from host cluster")
return err
}
// Handle ephemeral containers
if !cmp.Equal(currentHostPod.Spec.EphemeralContainers, pod.Spec.EphemeralContainers) {
p.logger.Info("Updating ephemeral containers")
logger.V(1).Info("Updating ephemeral containers")
currentHostPod.Spec.EphemeralContainers = pod.Spec.EphemeralContainers
if _, err := p.CoreClient.Pods(p.ClusterNamespace).UpdateEphemeralContainers(ctx, currentHostPod.Name, &currentHostPod, metav1.UpdateOptions{}); err != nil {
p.logger.Error(err, "error when updating ephemeral containers")
logger.Error(err, "error when updating ephemeral containers")
return err
}
@@ -528,7 +588,8 @@ func (p *Provider) updatePod(ctx context.Context, pod *corev1.Pod) error {
// fieldpath annotations
if err := p.configureFieldPathEnv(&currentVirtualPod, &currentHostPod); err != nil {
return fmt.Errorf("unable to fetch fieldpath annotations for pod %s/%s: %w", pod.Namespace, pod.Name, err)
logger.Error(err, "Unable to fetch fieldpath annotations for Pod")
return err
}
currentVirtualPod.Spec.Containers = updateContainerImages(currentVirtualPod.Spec.Containers, pod.Spec.Containers)
@@ -542,7 +603,8 @@ func (p *Provider) updatePod(ctx context.Context, pod *corev1.Pod) error {
currentVirtualPod.Labels = pod.Labels
if err := p.VirtualClient.Update(ctx, &currentVirtualPod); err != nil {
return fmt.Errorf("unable to update pod in the virtual cluster: %w", err)
logger.Error(err, "Unable to update Pod in virtual cluster")
return err
}
// Update Pod in the host cluster
@@ -558,9 +620,12 @@ func (p *Provider) updatePod(ctx context.Context, pod *corev1.Pod) error {
maps.Copy(currentHostPod.Labels, pod.Labels)
if err := p.HostClient.Update(ctx, &currentHostPod); err != nil {
return fmt.Errorf("unable to update pod in the host cluster: %w", err)
logger.Error(err, "Unable to update Pod in host cluster")
return err
}
logger.Info("Pod updated in virtual and host cluster")
return nil
}
@@ -590,20 +655,24 @@ func (p *Provider) DeletePod(ctx context.Context, pod *corev1.Pod) error {
// expected to call the NotifyPods callback with a terminal pod status where all the containers are in a terminal
// state, as well as the pod. DeletePod may be called multiple times for the same pod.
func (p *Provider) deletePod(ctx context.Context, pod *corev1.Pod) error {
p.logger.Info(fmt.Sprintf("got request to delete pod %s/%s", pod.Namespace, pod.Name))
hostName := p.Translator.TranslateName(pod.Namespace, pod.Name)
hostPodName := p.Translator.TranslateName(pod.Namespace, pod.Name)
err := p.CoreClient.Pods(p.ClusterNamespace).Delete(ctx, hostName, metav1.DeleteOptions{})
logger := p.logger.WithValues("namespace", pod.Namespace, "name", pod.Name, "pod", hostPodName)
logger.V(1).Info("DeletePod")
err := p.CoreClient.Pods(p.ClusterNamespace).Delete(ctx, hostPodName, metav1.DeleteOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
p.logger.Info(fmt.Sprintf("pod %s/%s already deleted from host cluster", p.ClusterNamespace, hostName))
logger.Info("Pod to delete not found in host cluster")
return nil
}
return fmt.Errorf("unable to delete pod %s/%s: %w", pod.Namespace, pod.Name, err)
logger.Error(err, "Error trying to delete pod from host cluster")
return err
}
p.logger.Info(fmt.Sprintf("pod %s/%s deleted from host cluster", p.ClusterNamespace, hostName))
logger.Info("Pod deleted from host cluster")
return nil
}
@@ -613,21 +682,18 @@ func (p *Provider) deletePod(ctx context.Context, pod *corev1.Pod) error {
// concurrently outside of the calling goroutine. Therefore it is recommended
// to return a version after DeepCopy.
func (p *Provider) GetPod(ctx context.Context, namespace, name string) (*corev1.Pod, error) {
p.logger.V(1).Info("got a request for get pod", "namespace", namespace, "name", name)
hostNamespaceName := types.NamespacedName{
Namespace: p.ClusterNamespace,
Name: p.Translator.TranslateName(namespace, name),
hostPodName := p.Translator.TranslateName(namespace, name)
logger := p.logger.WithValues("namespace", namespace, "name", name, "pod", hostPodName)
logger.V(1).Info("GetPod")
pod, err := p.getPodFromHostCluster(ctx, hostPodName)
if err != nil {
logger.Error(err, "Error getting pod from host cluster for GetPod")
return nil, err
}
var pod corev1.Pod
if err := p.HostClient.Get(ctx, hostNamespaceName, &pod); err != nil {
return nil, fmt.Errorf("error when retrieving pod: %w", err)
}
p.Translator.TranslateFrom(&pod)
return &pod, nil
return pod, nil
}
// GetPodStatus retrieves the status of a pod by name from the provider.
@@ -635,28 +701,49 @@ func (p *Provider) GetPod(ctx context.Context, namespace, name string) (*corev1.
// concurrently outside of the calling goroutine. Therefore it is recommended
// to return a version after DeepCopy.
func (p *Provider) GetPodStatus(ctx context.Context, namespace, name string) (*corev1.PodStatus, error) {
p.logger.V(1).Info("got a request for pod status", "namespace", namespace, "name", name)
hostPodName := p.Translator.TranslateName(namespace, name)
pod, err := p.GetPod(ctx, namespace, name)
logger := p.logger.WithValues("namespace", namespace, "name", name, "pod", hostPodName)
logger.V(1).Info("GetPodStatus")
pod, err := p.getPodFromHostCluster(ctx, hostPodName)
if err != nil {
return nil, fmt.Errorf("unable to get pod for status: %w", err)
logger.Error(err, "Error getting pod from host cluster for PodStatus")
return nil, err
}
p.logger.V(1).Info("got pod status", "namespace", namespace, "name", name, "status", pod.Status)
return pod.Status.DeepCopy(), nil
}
func (p *Provider) getPodFromHostCluster(ctx context.Context, hostPodName string) (*corev1.Pod, error) {
key := types.NamespacedName{
Namespace: p.ClusterNamespace,
Name: hostPodName,
}
var pod corev1.Pod
if err := p.HostClient.Get(ctx, key, &pod); err != nil {
return nil, err
}
p.Translator.TranslateFrom(&pod)
return &pod, nil
}
// GetPods retrieves a list of all pods running on the provider (can be cached).
// The Pods returned are expected to be immutable, and may be accessed
// concurrently outside of the calling goroutine. Therefore it is recommended
// to return a version after DeepCopy.
func (p *Provider) GetPods(ctx context.Context) ([]*corev1.Pod, error) {
p.logger.V(1).Info("GetPods")
selector := labels.NewSelector()
requirement, err := labels.NewRequirement(translate.ClusterNameLabel, selection.Equals, []string{p.ClusterName})
if err != nil {
return nil, fmt.Errorf("unable to create label selector: %w", err)
p.logger.Error(err, "Error creating label selector for GetPods")
return nil, err
}
selector = selector.Add(*requirement)
@@ -665,7 +752,8 @@ func (p *Provider) GetPods(ctx context.Context) ([]*corev1.Pod, error) {
err = p.HostClient.List(ctx, &podList, &client.ListOptions{LabelSelector: selector})
if err != nil {
return nil, fmt.Errorf("unable to list pods: %w", err)
p.logger.Error(err, "Error listing pods from host cluster")
return nil, err
}
retPods := []*corev1.Pod{}

View File

@@ -381,6 +381,11 @@ func (s *SharedAgent) role(ctx context.Context) error {
Resources: []string{"persistentvolumeclaims", "pods", "pods/log", "pods/attach", "pods/exec", "pods/ephemeralcontainers", "secrets", "configmaps", "services"},
Verbs: []string{"*"},
},
{
APIGroups: []string{""},
Resources: []string{"events"},
Verbs: []string{"create"},
},
{
APIGroups: []string{"networking.k8s.io"},
Resources: []string{"ingresses"},