Files
troubleshoot/pkg/collect/runner.go
Simon Croome 977fc438ea Remote host collectors (#392)
* Add collect command and remote host collectors

Adds the ability to run a host collector on a set of remote k8s nodes.
Target nodes can be filtered using the --selector flag, with the same
syntax as kubectl.  Existing flags for --collector-image,
--collector-pullpolicy and --request-timeout are used.  To run on a
specified node, --selector="kubernetes.io/hostname=kind-worker2" could
be used.

The collect command is used by the remote collector to output the
results using a "raw" format, which uses the filename as the key, and
the value the output as a escaped json string.  When run manually it
defaults to fully decoded json. The existing block devices,
ipv4interfaces and services host collectors don't decode properly - the
fix is to convert their slice output to a map (fix not included as
unsure what depends on the existing format).

The collect command is also useful for troubleshooting preflight issues.

Examples are included to show remote collector usage.

```
bin/collect --collector-image=croomes/troubleshoot:latest  examples/collect/remote/memory.yaml --namespace test
{
  "kind-control-plane": {
    "system/memory.json": {
      "total": 1304207360
    }
  },
  "kind-worker": {
    "system/memory.json": {
      "total": 1695780864
    }
  },
  "kind-worker2": {
    "system/memory.json": {
      "total": 1726353408
    }
  }
}
```

The preflight command has been updated to run remote collectors.  To run
a host collector remotely it must be specified in the spec as a
`remoteCollector`:

```
apiVersion: troubleshoot.sh/v1beta2
kind: HostPreflight
metadata:
  name: memory
spec:
  remoteCollectors:
    - memory:
        collectorName: memory
  analyzers:
    - memory:
        outcomes:
          - fail:
              when: "< 8Gi"
              message: At least 8Gi of memory is required
          - warn:
              when: "< 32Gi"
              message: At least 32Gi of memory is recommended
          - pass:
              message: The system has as sufficient memory
```

Results for each node are analyzed separately, with the node name
appended to the title:

```
bin/preflight --interactive=false --collector-image=croomes/troubleshoot:latest examples/preflight/remote/memory.yaml --format=json
{memory running 0 1}
{memory completed 1 1}
{
  "fail": [
    {
      "title": "Amount of Memory (kind-worker2)",
      "message": "At least 8Gi of memory is required"
    },
    {
      "title": "Amount of Memory (kind-worker)",
      "message": "At least 8Gi of memory is required"
    },
    {
      "title": "Amount of Memory (kind-control-plane)",
      "message": "At least 8Gi of memory is required"
    }
  ]
}
```

Also added a host collector to allow preflight checks of required kernel
modules, which is the main driver for this change.
2021-10-06 09:03:53 -05:00

363 lines
10 KiB
Go

package collect
import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
"github.com/pkg/errors"
troubleshootv1beta2 "github.com/replicatedhq/troubleshoot/pkg/apis/troubleshoot/v1beta2"
"github.com/replicatedhq/troubleshoot/pkg/logger"
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
kuberneteserrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)
const (
runnerContainerName = "collector"
runnerJobType = "remote-collector"
runnerServiceAccountName = ""
)
type podRunner struct {
client *kubernetes.Clientset
scheme *runtime.Scheme
image string
pullPolicy string
waitInterval time.Duration
}
func (r *podRunner) run(ctx context.Context, collector *troubleshootv1beta2.HostCollect, namespace string, name string, nodeName string, results chan<- map[string][]byte) error {
cm, pod, err := CreateCollector(r.client, r.scheme, nil, name, namespace, nodeName, runnerServiceAccountName, runnerJobType, collector, r.image, r.pullPolicy)
if err != nil {
return errors.Wrap(err, "failed to create collector")
}
defer func() {
if err := r.client.CoreV1().Pods(namespace).Delete(context.Background(), pod.Name, metav1.DeleteOptions{}); err != nil {
logger.Printf("Failed to delete pod %s: %v\n", pod.Name, err)
}
if err := r.client.CoreV1().ConfigMaps(namespace).Delete(context.Background(), cm.Name, metav1.DeleteOptions{}); err != nil {
logger.Printf("Failed to delete configmap %s: %v\n", pod.Name, err)
}
}()
logs, err := GetContainerLogs(ctx, r.client, namespace, pod.Name, runnerContainerName, true, r.waitInterval)
if err != nil {
return errors.Wrap(err, "failed to retrieve collector logs")
}
results <- map[string][]byte{
nodeName: []byte(logs),
}
return nil
}
func CreateCollector(client *kubernetes.Clientset, scheme *runtime.Scheme, ownerRef metav1.Object, name string, namespace string, nodeName string, serviceAccountName string, jobType string, collect *troubleshootv1beta2.HostCollect, image string, pullPolicy string) (*corev1.ConfigMap, *corev1.Pod, error) {
configMap, err := createCollectorConfigMap(client, scheme, ownerRef, name, namespace, collect)
if err != nil {
return nil, nil, err
}
pod, err := createCollectorPod(client, scheme, ownerRef, name, namespace, nodeName, serviceAccountName, jobType, collect, configMap, image, pullPolicy)
if err != nil {
return nil, nil, err
}
return configMap, pod, nil
}
func createCollectorConfigMap(client *kubernetes.Clientset, scheme *runtime.Scheme, ownerRef metav1.Object, name string, namespace string, collect *troubleshootv1beta2.HostCollect) (*corev1.ConfigMap, error) {
_, err := client.CoreV1().ConfigMaps(namespace).Get(context.Background(), name, metav1.GetOptions{})
if err == nil || !kuberneteserrors.IsNotFound(err) {
return nil, err
}
collector := troubleshootv1beta2.HostCollector{
TypeMeta: metav1.TypeMeta{
APIVersion: "troubleshoot.sh/v1beta2",
Kind: "HostCollector",
},
ObjectMeta: metav1.ObjectMeta{
Name: "collector",
},
Spec: troubleshootv1beta2.HostCollectorSpec{
Collectors: []*troubleshootv1beta2.HostCollect{collect},
},
}
// Use json as TypeMeta and ObjectMeta don't have tags for yaml, so
// capitalization (e.g. apiVersion) is not preserved.
contents, err := json.Marshal(collector)
if err != nil {
return nil, err
}
specData := make(map[string]string)
specData["collector.json"] = string(contents)
configMap := corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "ConfigMap",
},
Data: specData,
}
if ownerRef != nil && scheme != nil {
if err := controllerutil.SetControllerReference(ownerRef, &configMap, scheme); err != nil {
return nil, err
}
}
var created *corev1.ConfigMap
createFn := func() error {
created, err = client.CoreV1().ConfigMaps(namespace).Create(context.Background(), &configMap, metav1.CreateOptions{})
if err != nil && !kerrors.IsAlreadyExists(err) {
return err
}
return nil
}
retryableFn := func(error) bool {
return true
}
err = retry.OnError(retry.DefaultBackoff, retryableFn, createFn)
if err != nil {
return nil, err
}
return created, nil
}
func createCollectorPod(client kubernetes.Interface, scheme *runtime.Scheme, ownerRef metav1.Object, name string, namespace string, nodeName string, serviceAccountName string, jobType string, collect *troubleshootv1beta2.HostCollect, configMap *corev1.ConfigMap, image string, pullPolicy string) (*corev1.Pod, error) {
if serviceAccountName == "" {
serviceAccountName = "default"
}
_, err := client.CoreV1().Pods(namespace).Get(context.Background(), name, metav1.GetOptions{})
if err == nil {
return nil, fmt.Errorf("pod %q already exists", name)
} else if !kuberneteserrors.IsNotFound(err) {
return nil, err
}
imageName := "replicated/troubleshoot:latest"
imagePullPolicy := corev1.PullAlways
if image != "" {
imageName = image
}
if pullPolicy != "" {
imagePullPolicy = corev1.PullPolicy(pullPolicy)
}
podLabels := make(map[string]string)
podLabels[jobType] = name
podLabels["troubleshoot-role"] = jobType
nodeSelector := map[string]string{
"kubernetes.io/hostname": nodeName,
}
pod := corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: podLabels,
},
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Pod",
},
Spec: corev1.PodSpec{
NodeSelector: nodeSelector,
ServiceAccountName: serviceAccountName,
RestartPolicy: corev1.RestartPolicyNever,
Containers: []corev1.Container{
{
Image: imageName,
ImagePullPolicy: imagePullPolicy,
Name: runnerContainerName,
Command: []string{"collect"},
Args: []string{
"--collect-without-permissions",
"--format=raw",
"/troubleshoot/specs/collector.json",
},
VolumeMounts: []corev1.VolumeMount{
{
Name: "collector",
MountPath: "/troubleshoot/specs",
ReadOnly: true,
},
{
Name: "kernel-modules",
MountPath: "/lib/modules",
ReadOnly: true,
},
{
Name: "ntp",
MountPath: "/run/dbus",
ReadOnly: true,
},
},
},
},
Volumes: []corev1.Volume{
{
Name: "collector",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: configMap.Name,
},
},
},
},
{
Name: "kernel-modules",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "/lib/modules",
},
},
},
{
Name: "ntp",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "/run/dbus",
},
},
},
},
},
}
if ownerRef != nil && scheme != nil {
if err := controllerutil.SetControllerReference(ownerRef, &pod, scheme); err != nil {
return nil, err
}
}
var created *corev1.Pod
createFn := func() error {
created, err = client.CoreV1().Pods(namespace).Create(context.Background(), &pod, metav1.CreateOptions{})
if err != nil && !kerrors.IsAlreadyExists(err) {
return err
}
return nil
}
retryableFn := func(error) bool {
return true
}
err = retry.OnError(retry.DefaultBackoff, retryableFn, createFn)
if err != nil {
return nil, err
}
return created, nil
}
type podCondition func(pod *corev1.Pod) (bool, error)
// WaitForPodCondition waits for a pod to match the given condition.
func WaitForPodCondition(ctx context.Context, client kubernetes.Interface, namespace string, podName string, interval time.Duration, condition podCondition) error {
ticker := time.NewTicker(interval)
for {
select {
case <-ticker.C:
pod, err := client.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
if kuberneteserrors.IsNotFound(err) {
return err
}
continue
}
if done, err := condition(pod); done {
if err == nil {
return nil
}
return err
}
case <-ctx.Done():
return ctx.Err()
}
}
}
// WaitForPodCompleted returns nil if the pod reached state success, or an error if it reached failure or ran too long.
func WaitForPodCompleted(ctx context.Context, client kubernetes.Interface, namespace string, podName string, interval time.Duration) error {
return WaitForPodCondition(ctx, client, namespace, podName, interval, func(pod *corev1.Pod) (bool, error) {
if pod.Spec.RestartPolicy == corev1.RestartPolicyAlways {
return true, fmt.Errorf("pod %q will never terminate with a succeeded state since its restart policy is Always", podName)
}
switch pod.Status.Phase {
case corev1.PodSucceeded, corev1.PodFailed:
return true, nil
default:
return false, nil
}
})
}
func GetContainerLogs(ctx context.Context, client kubernetes.Interface, namespace string, podName string, containerName string, waitForComplete bool, interval time.Duration) (string, error) {
if waitForComplete {
if err := WaitForPodCompleted(ctx, client, namespace, podName, interval); err != nil {
return "", err
}
}
return getContainerLogsInternal(ctx, client, namespace, podName, containerName, false)
}
func getContainerLogsInternal(ctx context.Context, client kubernetes.Interface, namespace string, podName string, containerName string, previous bool) (string, error) {
var logs []byte
var err error
logsFn := func() error {
logs, err = client.CoreV1().RESTClient().Get().
Resource("pods").
Namespace(namespace).
Name(podName).SubResource("log").
Param("container", containerName).
Param("previous", strconv.FormatBool(previous)).
Do(ctx).
Raw()
if err != nil {
return err
}
return nil
}
// Don't retry if pod not found.
retryableFn := func(error) bool {
return !kerrors.IsNotFound(err)
}
err = retry.OnError(retry.DefaultBackoff, retryableFn, logsFn)
if err != nil {
return "", err
}
if strings.Contains(string(logs), "Internal Error") {
return "", fmt.Errorf("Fetched log contains \"Internal Error\": %q", string(logs))
}
return string(logs), nil
}