Files
troubleshoot/pkg/collect/exec.go
Andrew Lavery d016e3269c add global and per-collector redactors
add redact type, and begin wiring global redactors

use per-collector redactors

add a test of the 'data' collector and redaction

handle literal string replacements

remove redundant types and redact calls

add proper redactor type, foundations of global redactors

accept global redactors from the CLI, include sample redaction spec
2020-04-16 14:03:00 -04:00

146 lines
3.7 KiB
Go

package collect
import (
"bytes"
"errors"
"fmt"
"path/filepath"
"time"
troubleshootv1beta1 "github.com/replicatedhq/troubleshoot/pkg/apis/troubleshoot/v1beta1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/remotecommand"
)
func Exec(ctx *Context, execCollector *troubleshootv1beta1.Exec) (map[string][]byte, error) {
if execCollector.Timeout == "" {
return execWithoutTimeout(ctx, execCollector)
}
timeout, err := time.ParseDuration(execCollector.Timeout)
if err != nil {
return nil, err
}
errCh := make(chan error, 1)
resultCh := make(chan map[string][]byte, 1)
go func() {
b, err := execWithoutTimeout(ctx, execCollector)
if err != nil {
errCh <- err
} else {
resultCh <- b
}
}()
select {
case <-time.After(timeout):
return nil, errors.New("timeout")
case result := <-resultCh:
return result, nil
case err := <-errCh:
return nil, err
}
}
func execWithoutTimeout(ctx *Context, execCollector *troubleshootv1beta1.Exec) (map[string][]byte, error) {
client, err := kubernetes.NewForConfig(ctx.ClientConfig)
if err != nil {
return nil, err
}
execOutput := map[string][]byte{}
pods, podsErrors := listPodsInSelectors(client, execCollector.Namespace, execCollector.Selector)
if len(podsErrors) > 0 {
errorBytes, err := marshalNonNil(podsErrors)
if err != nil {
return nil, err
}
execOutput[getExecErrosFileName(execCollector)] = errorBytes
}
if len(pods) > 0 {
for _, pod := range pods {
stdout, stderr, execErrors := getExecOutputs(ctx, client, pod, execCollector)
bundlePath := filepath.Join(execCollector.Name, pod.Namespace, pod.Name)
if len(stdout) > 0 {
execOutput[filepath.Join(bundlePath, execCollector.CollectorName+"-stdout.txt")] = stdout
}
if len(stderr) > 0 {
execOutput[filepath.Join(bundlePath, execCollector.CollectorName+"-stderr.txt")] = stderr
}
if len(execErrors) > 0 {
errorBytes, err := marshalNonNil(execErrors)
if err != nil {
return nil, err
}
execOutput[filepath.Join(bundlePath, execCollector.CollectorName+"-errors.json")] = errorBytes
continue
}
}
}
return execOutput, nil
}
func getExecOutputs(ctx *Context, client *kubernetes.Clientset, pod corev1.Pod, execCollector *troubleshootv1beta1.Exec) ([]byte, []byte, []string) {
container := pod.Spec.Containers[0].Name
if execCollector.ContainerName != "" {
container = execCollector.ContainerName
}
req := client.CoreV1().RESTClient().Post().Resource("pods").Name(pod.Name).Namespace(pod.Namespace).SubResource("exec")
scheme := runtime.NewScheme()
if err := corev1.AddToScheme(scheme); err != nil {
return nil, nil, []string{err.Error()}
}
parameterCodec := runtime.NewParameterCodec(scheme)
req.VersionedParams(&corev1.PodExecOptions{
Command: append(execCollector.Command, execCollector.Args...),
Container: container,
Stdin: true,
Stdout: false,
Stderr: true,
TTY: false,
}, parameterCodec)
exec, err := remotecommand.NewSPDYExecutor(ctx.ClientConfig, "POST", req.URL())
if err != nil {
return nil, nil, []string{err.Error()}
}
stdout := new(bytes.Buffer)
stderr := new(bytes.Buffer)
err = exec.Stream(remotecommand.StreamOptions{
Stdin: nil,
Stdout: stdout,
Stderr: stderr,
Tty: false,
})
if err != nil {
return stdout.Bytes(), stderr.Bytes(), []string{err.Error()}
}
return stdout.Bytes(), stderr.Bytes(), nil
}
func getExecErrosFileName(execCollector *troubleshootv1beta1.Exec) string {
if len(execCollector.Name) > 0 {
return fmt.Sprintf("%s-errors.json", execCollector.Name)
}
if len(execCollector.CollectorName) > 0 {
return fmt.Sprintf("%s-errors.json", execCollector.CollectorName)
}
// TODO: random part
return "errors.json"
}