Files
troubleshoot/pkg/collect/runner.go
Ash 544a700062 [sc-114813] copy HostCollector fails to copy binary files when run in cluster (#1669)
* Don't convert output bytes to string

This prevents binary files getting mangled when the collector ourput is being passed around between functions

* Update pkg/collect/runner.go

Co-authored-by: Evans Mungai <evans@replicated.com>

* organise imports

---------

Co-authored-by: Evans Mungai <evans@replicated.com>
2024-10-31 10:44:35 +00:00

360 lines
10 KiB
Go

package collect
import (
"bytes"
"context"
"encoding/json"
"fmt"
"strconv"
"time"
"github.com/pkg/errors"
troubleshootv1beta2 "github.com/replicatedhq/troubleshoot/pkg/apis/troubleshoot/v1beta2"
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
kuberneteserrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)
const (
runnerContainerName = "collector"
runnerJobType = "remote-collector"
runnerServiceAccountName = ""
)
type podRunner struct {
client *kubernetes.Clientset
scheme *runtime.Scheme
image string
pullPolicy string
waitInterval time.Duration
}
func (r *podRunner) run(ctx context.Context, collector *troubleshootv1beta2.HostCollect, namespace string, name string, nodeName string, results chan<- map[string][]byte) error {
cm, pod, err := CreateCollector(ctx, r.client, r.scheme, nil, name, namespace, nodeName, runnerServiceAccountName, runnerJobType, collector, r.image, r.pullPolicy)
if err != nil {
return errors.Wrap(err, "failed to create collector")
}
defer func() {
if err := r.client.CoreV1().Pods(namespace).Delete(context.Background(), pod.Name, metav1.DeleteOptions{}); err != nil {
klog.Errorf("Failed to delete pod %s: %v\n", pod.Name, err)
}
if err := r.client.CoreV1().ConfigMaps(namespace).Delete(context.Background(), cm.Name, metav1.DeleteOptions{}); err != nil {
klog.Errorf("Failed to delete configmap %s: %v\n", pod.Name, err)
}
}()
logs, err := GetContainerLogs(ctx, r.client, namespace, pod.Name, runnerContainerName, true, r.waitInterval)
if err != nil {
return errors.Wrap(err, "failed to retrieve collector logs")
}
results <- map[string][]byte{
nodeName: logs,
}
return nil
}
func CreateCollector(ctx context.Context, client *kubernetes.Clientset, scheme *runtime.Scheme, ownerRef metav1.Object, name string, namespace string, nodeName string, serviceAccountName string, jobType string, collect *troubleshootv1beta2.HostCollect, image string, pullPolicy string) (*corev1.ConfigMap, *corev1.Pod, error) {
configMap, err := createCollectorConfigMap(client, scheme, ownerRef, name, namespace, collect)
if err != nil {
return nil, nil, err
}
pod, err := createCollectorPod(ctx, client, scheme, ownerRef, name, namespace, nodeName, serviceAccountName, jobType, collect, configMap, image, pullPolicy)
if err != nil {
return nil, nil, err
}
return configMap, pod, nil
}
func createCollectorConfigMap(client *kubernetes.Clientset, scheme *runtime.Scheme, ownerRef metav1.Object, name string, namespace string, collect *troubleshootv1beta2.HostCollect) (*corev1.ConfigMap, error) {
_, err := client.CoreV1().ConfigMaps(namespace).Get(context.Background(), name, metav1.GetOptions{})
if err == nil || !kuberneteserrors.IsNotFound(err) {
return nil, err
}
collector := troubleshootv1beta2.HostCollector{
TypeMeta: metav1.TypeMeta{
APIVersion: "troubleshoot.sh/v1beta2",
Kind: "HostCollector",
},
ObjectMeta: metav1.ObjectMeta{
Name: "collector",
},
Spec: troubleshootv1beta2.HostCollectorSpec{
Collectors: []*troubleshootv1beta2.HostCollect{collect},
},
}
// Use json as TypeMeta and ObjectMeta don't have tags for yaml, so
// capitalization (e.g. apiVersion) is not preserved.
contents, err := json.Marshal(collector)
if err != nil {
return nil, err
}
specData := make(map[string]string)
specData["collector.json"] = string(contents)
configMap := corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "ConfigMap",
},
Data: specData,
}
if ownerRef != nil && scheme != nil {
if err := controllerutil.SetControllerReference(ownerRef, &configMap, scheme); err != nil {
return nil, err
}
}
var created *corev1.ConfigMap
createFn := func() error {
created, err = client.CoreV1().ConfigMaps(namespace).Create(context.Background(), &configMap, metav1.CreateOptions{})
if err != nil && !kerrors.IsAlreadyExists(err) {
return err
}
return nil
}
retryableFn := func(error) bool {
return true
}
err = retry.OnError(retry.DefaultBackoff, retryableFn, createFn)
if err != nil {
return nil, err
}
return created, nil
}
func createCollectorPod(ctx context.Context, client kubernetes.Interface, scheme *runtime.Scheme, ownerRef metav1.Object, name string, namespace string, nodeName string, serviceAccountName string, jobType string, collect *troubleshootv1beta2.HostCollect, configMap *corev1.ConfigMap, image string, pullPolicy string) (*corev1.Pod, error) {
if serviceAccountName == "" {
serviceAccountName = "default"
}
_, err := client.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{})
if err == nil {
return nil, fmt.Errorf("pod %q already exists", name)
} else if !kuberneteserrors.IsNotFound(err) {
return nil, err
}
if err := checkForExistingServiceAccount(ctx, client, namespace, serviceAccountName); err != nil {
return nil, err
}
imageName := "replicated/troubleshoot:latest"
imagePullPolicy := corev1.PullAlways
if image != "" {
imageName = image
}
if pullPolicy != "" {
imagePullPolicy = corev1.PullPolicy(pullPolicy)
}
podLabels := make(map[string]string)
podLabels[jobType] = name
podLabels["troubleshoot-role"] = jobType
nodeSelector := map[string]string{
"kubernetes.io/hostname": nodeName,
}
pod := corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: podLabels,
},
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Pod",
},
Spec: corev1.PodSpec{
HostNetwork: true,
HostPID: true,
HostIPC: true,
NodeSelector: nodeSelector,
ServiceAccountName: serviceAccountName,
RestartPolicy: corev1.RestartPolicyNever,
Containers: []corev1.Container{
{
Image: imageName,
ImagePullPolicy: imagePullPolicy,
Name: runnerContainerName,
Command: []string{"/bin/bash", "-c"},
Args: []string{
`cp /troubleshoot/collect /host/collect &&
cp /troubleshoot/specs/collector.json /host/collector.json &&
chroot /host /bin/bash -c './collect --collect-without-permissions --format=raw collector.json'`,
},
SecurityContext: &corev1.SecurityContext{
Privileged: ptr.To(true),
},
VolumeMounts: []corev1.VolumeMount{
{
Name: "collector",
MountPath: "/troubleshoot/specs",
ReadOnly: true,
},
{
Name: "host-root",
MountPath: "/host",
},
},
},
},
Volumes: []corev1.Volume{
{
Name: "collector",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: configMap.Name,
},
},
},
},
{
Name: "host-root",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "/",
},
},
},
},
},
}
if ownerRef != nil && scheme != nil {
if err := controllerutil.SetControllerReference(ownerRef, &pod, scheme); err != nil {
return nil, err
}
}
var created *corev1.Pod
createFn := func() error {
created, err = client.CoreV1().Pods(namespace).Create(ctx, &pod, metav1.CreateOptions{})
if err != nil && !kerrors.IsAlreadyExists(err) {
return err
}
return nil
}
retryableFn := func(error) bool {
return true
}
err = retry.OnError(retry.DefaultBackoff, retryableFn, createFn)
if err != nil {
return nil, err
}
return created, nil
}
type podCondition func(pod *corev1.Pod) (bool, error)
// WaitForPodCondition waits for a pod to match the given condition.
func WaitForPodCondition(ctx context.Context, client kubernetes.Interface, namespace string, podName string, interval time.Duration, condition podCondition) error {
ticker := time.NewTicker(interval)
for {
select {
case <-ticker.C:
pod, err := client.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
if kuberneteserrors.IsNotFound(err) {
return err
}
continue
}
if done, err := condition(pod); done {
if err == nil {
return nil
}
return err
}
case <-ctx.Done():
return ctx.Err()
}
}
}
// WaitForPodCompleted returns nil if the pod reached state success, or an error if it reached failure or ran too long.
func WaitForPodCompleted(ctx context.Context, client kubernetes.Interface, namespace string, podName string, interval time.Duration) error {
return WaitForPodCondition(ctx, client, namespace, podName, interval, func(pod *corev1.Pod) (bool, error) {
if pod.Spec.RestartPolicy == corev1.RestartPolicyAlways {
return true, fmt.Errorf("pod %q will never terminate with a succeeded state since its restart policy is Always", podName)
}
switch pod.Status.Phase {
case corev1.PodSucceeded, corev1.PodFailed:
return true, nil
default:
return false, nil
}
})
}
func GetContainerLogs(ctx context.Context, client kubernetes.Interface, namespace string, podName string, containerName string, waitForComplete bool, interval time.Duration) ([]byte, error) {
if waitForComplete {
if err := WaitForPodCompleted(ctx, client, namespace, podName, interval); err != nil {
return nil, err
}
}
return getContainerLogsInternal(ctx, client, namespace, podName, containerName, false)
}
func getContainerLogsInternal(ctx context.Context, client kubernetes.Interface, namespace string, podName string, containerName string, previous bool) ([]byte, error) {
var logs []byte
var err error
logsFn := func() error {
logs, err = client.CoreV1().RESTClient().Get().
Resource("pods").
Namespace(namespace).
Name(podName).SubResource("log").
Param("container", containerName).
Param("previous", strconv.FormatBool(previous)).
Do(ctx).
Raw()
if err != nil {
return err
}
return nil
}
// Don't retry if pod not found.
retryableFn := func(error) bool {
return !kerrors.IsNotFound(err)
}
err = retry.OnError(retry.DefaultBackoff, retryableFn, logsFn)
if err != nil {
return nil, err
}
if bytes.Contains(logs, []byte("Internal Error")) {
return nil, fmt.Errorf("Fetched log contains \"Internal Error\": %q", string(logs))
}
return logs, nil
}