mirror of
https://github.com/replicatedhq/troubleshoot.git
synced 2026-04-15 07:16:34 +00:00
301 lines
9.3 KiB
Go
301 lines
9.3 KiB
Go
package collect
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"path/filepath"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
troubleshootv1beta2 "github.com/replicatedhq/troubleshoot/pkg/apis/troubleshoot/v1beta2"
|
|
"github.com/replicatedhq/troubleshoot/pkg/constants"
|
|
corev1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/rest"
|
|
"k8s.io/klog/v2"
|
|
)
|
|
|
|
type CollectLogs struct {
|
|
Collector *troubleshootv1beta2.Logs
|
|
BundlePath string
|
|
Namespace string // TODO: There is a Namespace parameter in troubleshootv1beta2.Logs. Should we remove this?
|
|
ClientConfig *rest.Config
|
|
Client kubernetes.Interface
|
|
Context context.Context
|
|
SinceTime *time.Time
|
|
RBACErrors
|
|
}
|
|
|
|
func (c *CollectLogs) Title() string {
|
|
return getCollectorName(c)
|
|
}
|
|
|
|
func (c *CollectLogs) IsExcluded() (bool, error) {
|
|
return isExcluded(c.Collector.Exclude)
|
|
}
|
|
|
|
func (c *CollectLogs) Collect(progressChan chan<- interface{}) (CollectorResult, error) {
|
|
client, err := kubernetes.NewForConfig(c.ClientConfig)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return c.CollectWithClient(progressChan, client)
|
|
|
|
}
|
|
|
|
// CollectWithClient is a helper function that allows passing in a kubernetes client
|
|
// It's a stopgap implementation before it's decided whether to either always use a single
|
|
// client for collectors or leave the implementation as is.
|
|
// Ref: https://github.com/replicatedhq/troubleshoot/pull/821#discussion_r1026258904
|
|
func (c *CollectLogs) CollectWithClient(progressChan chan<- interface{}, client kubernetes.Interface) (CollectorResult, error) {
|
|
out := NewResult()
|
|
|
|
ctx, cancel := context.WithTimeout(c.Context, constants.DEFAULT_LOGS_COLLECTOR_TIMEOUT)
|
|
defer cancel()
|
|
|
|
errCh := make(chan error, 1)
|
|
done := make(chan struct{}, 1)
|
|
|
|
// Collect logs in a go routine to allow timing out of long running operations
|
|
// If a timeout occurs, the passed in collector result will contain logs collected
|
|
// prior. We want this to be the case so as to have some logs in the support bundle
|
|
// even if not from all expected pods.
|
|
// TODO: In future all collectors will have a timeout. This will be implemented in the
|
|
// framework level (caller of Collect function). Remove this code when we get there.
|
|
go func(output CollectorResult) {
|
|
if c.SinceTime != nil {
|
|
if c.Collector.Limits == nil {
|
|
c.Collector.Limits = new(troubleshootv1beta2.LogLimits)
|
|
}
|
|
c.Collector.Limits.SinceTime = metav1.NewTime(*c.SinceTime)
|
|
}
|
|
|
|
pods, podsErrors := listPodsInSelectors(ctx, client, c.Collector.Namespace, c.Collector.Selector)
|
|
if len(podsErrors) > 0 {
|
|
output.SaveResult(c.BundlePath, getLogsErrorsFileName(c.Collector), marshalErrors(podsErrors))
|
|
}
|
|
|
|
if len(pods) > 0 {
|
|
for _, pod := range pods {
|
|
if len(c.Collector.ContainerNames) == 0 {
|
|
// make a list of all the containers in the pod, so that we can get logs from all of them
|
|
containerNames := []string{}
|
|
for _, container := range pod.Spec.Containers {
|
|
containerNames = append(containerNames, container.Name)
|
|
}
|
|
for _, container := range pod.Spec.InitContainers {
|
|
containerNames = append(containerNames, container.Name)
|
|
}
|
|
|
|
for _, containerName := range containerNames {
|
|
podLogs, err := savePodLogs(ctx, c.BundlePath, client, &pod, c.Collector.Name, containerName, c.Collector.Limits, false, true)
|
|
if err != nil {
|
|
key := fmt.Sprintf("%s/%s-errors.json", c.Collector.Name, pod.Name)
|
|
if containerName != "" {
|
|
key = fmt.Sprintf("%s/%s/%s-errors.json", c.Collector.Name, pod.Name, containerName)
|
|
}
|
|
err := output.SaveResult(c.BundlePath, key, marshalErrors([]string{err.Error()}))
|
|
if err != nil {
|
|
errCh <- err
|
|
}
|
|
continue
|
|
}
|
|
output.AddResult(podLogs)
|
|
}
|
|
} else {
|
|
for _, container := range c.Collector.ContainerNames {
|
|
containerLogs, err := savePodLogs(ctx, c.BundlePath, client, &pod, c.Collector.Name, container, c.Collector.Limits, false, true)
|
|
if err != nil {
|
|
key := fmt.Sprintf("%s/%s/%s-errors.json", c.Collector.Name, pod.Name, container)
|
|
err := output.SaveResult(c.BundlePath, key, marshalErrors([]string{err.Error()}))
|
|
if err != nil {
|
|
errCh <- err
|
|
}
|
|
continue
|
|
}
|
|
output.AddResult(containerLogs)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Send a signal to indicate that we are done collecting logs
|
|
done <- struct{}{}
|
|
}(out)
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
// When we timeout, return the logs we have collected so far
|
|
return out, fmt.Errorf("%s (%s) collector timeout exceeded", c.Title(), c.Collector.CollectorName)
|
|
case <-done:
|
|
return out, nil
|
|
case err := <-errCh:
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
func listPodsInSelectors(ctx context.Context, client kubernetes.Interface, namespace string, selector []string) ([]corev1.Pod, []string) {
|
|
serializedLabelSelector := strings.Join(selector, ",")
|
|
|
|
listOptions := metav1.ListOptions{
|
|
LabelSelector: serializedLabelSelector,
|
|
}
|
|
|
|
pods, err := client.CoreV1().Pods(namespace).List(ctx, listOptions)
|
|
if err != nil {
|
|
return nil, []string{err.Error()}
|
|
}
|
|
|
|
return pods.Items, nil
|
|
}
|
|
|
|
func savePodLogs(
|
|
ctx context.Context,
|
|
bundlePath string,
|
|
client kubernetes.Interface,
|
|
pod *corev1.Pod,
|
|
collectorName, container string,
|
|
limits *troubleshootv1beta2.LogLimits,
|
|
follow bool,
|
|
createSymLinks bool,
|
|
) (CollectorResult, error) {
|
|
podLogOpts := corev1.PodLogOptions{
|
|
Follow: follow,
|
|
Container: container,
|
|
}
|
|
|
|
result := NewResult()
|
|
|
|
// TODO: Abstract away hard coded directory structure paths
|
|
// Maybe create a FS provider or something similar
|
|
filePathPrefix := filepath.Join(
|
|
constants.CLUSTER_RESOURCES_DIR, constants.CLUSTER_RESOURCES_PODS_LOGS, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name,
|
|
)
|
|
|
|
// TODO: If collectorName is empty, the path is stored with a leading slash
|
|
// Retain this behavior otherwise analysers in the wild may break
|
|
// Analysers that need to find a file in the root of the bundle should
|
|
// prefix the path with a slash e.g /file.txt. This behavior should be
|
|
// properly deprecated in the future.
|
|
linkRelPathPrefix := fmt.Sprintf("%s/%s", collectorName, pod.Name)
|
|
if container != "" {
|
|
linkRelPathPrefix = fmt.Sprintf("%s/%s/%s", collectorName, pod.Name, container)
|
|
filePathPrefix = filepath.Join(
|
|
constants.CLUSTER_RESOURCES_DIR, constants.CLUSTER_RESOURCES_PODS_LOGS, pod.Namespace, pod.Name, container,
|
|
)
|
|
}
|
|
|
|
setLogLimits(&podLogOpts, limits, convertMaxAgeToTime)
|
|
|
|
req := client.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &podLogOpts)
|
|
podLogs, err := req.Stream(ctx)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to get log stream")
|
|
}
|
|
defer podLogs.Close()
|
|
|
|
logWriter, err := result.GetWriter(bundlePath, filePathPrefix+".log")
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to get log writer")
|
|
}
|
|
// NOTE: deferred calls are executed in LIFO order i.e called in reverse order
|
|
if createSymLinks {
|
|
defer result.SymLinkResult(bundlePath, linkRelPathPrefix+".log", filePathPrefix+".log")
|
|
}
|
|
defer result.CloseWriter(bundlePath, filePathPrefix+".log", logWriter)
|
|
|
|
_, err = io.Copy(logWriter, podLogs)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to copy log")
|
|
}
|
|
|
|
podLogOpts.Previous = true
|
|
req = client.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &podLogOpts)
|
|
podLogs, err = req.Stream(ctx)
|
|
if err != nil {
|
|
// maybe fail on !kuberneteserrors.IsNotFound(err)?
|
|
return result, nil
|
|
}
|
|
defer podLogs.Close()
|
|
|
|
prevLogWriter, err := result.GetWriter(bundlePath, filePathPrefix+"-previous.log")
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to get previous log writer")
|
|
}
|
|
// NOTE: deferred calls are executed in LIFO order i.e called in reverse order
|
|
if createSymLinks {
|
|
defer result.SymLinkResult(bundlePath, linkRelPathPrefix+"-previous.log", filePathPrefix+"-previous.log")
|
|
}
|
|
defer result.CloseWriter(bundlePath, filePathPrefix+"-previous.log", logWriter)
|
|
|
|
_, err = io.Copy(prevLogWriter, podLogs)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to copy previous log")
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func convertMaxAgeToTime(maxAge string) *metav1.Time {
|
|
parsedDuration, err := time.ParseDuration(maxAge)
|
|
if err != nil {
|
|
klog.Errorf("Failed to parse time duration %s", maxAge)
|
|
return nil
|
|
}
|
|
|
|
now := time.Now()
|
|
then := now.Add(0 - parsedDuration)
|
|
kthen := metav1.NewTime(then)
|
|
|
|
return &kthen
|
|
}
|
|
|
|
func setLogLimits(podLogOpts *corev1.PodLogOptions, limits *troubleshootv1beta2.LogLimits, maxAgeParser func(maxAge string) *metav1.Time) {
|
|
if podLogOpts == nil {
|
|
return
|
|
}
|
|
|
|
defaultMaxLines := int64(10000)
|
|
if limits == nil {
|
|
podLogOpts.TailLines = &defaultMaxLines
|
|
return
|
|
}
|
|
|
|
if !limits.SinceTime.IsZero() {
|
|
podLogOpts.SinceTime = &limits.SinceTime
|
|
return
|
|
}
|
|
|
|
if limits.MaxAge != "" {
|
|
podLogOpts.SinceTime = maxAgeParser(limits.MaxAge)
|
|
return
|
|
}
|
|
|
|
if limits.MaxLines == 0 {
|
|
podLogOpts.TailLines = &defaultMaxLines
|
|
} else {
|
|
podLogOpts.TailLines = &limits.MaxLines
|
|
}
|
|
|
|
defaultMaxBytes := int64(5000000)
|
|
if limits.MaxBytes == 0 {
|
|
podLogOpts.LimitBytes = &defaultMaxBytes
|
|
} else {
|
|
podLogOpts.LimitBytes = &limits.MaxBytes
|
|
}
|
|
}
|
|
|
|
func getLogsErrorsFileName(logsCollector *troubleshootv1beta2.Logs) string {
|
|
if len(logsCollector.Name) > 0 {
|
|
return fmt.Sprintf("%s/errors.json", logsCollector.Name)
|
|
} else if len(logsCollector.CollectorName) > 0 {
|
|
return fmt.Sprintf("%s/errors.json", logsCollector.CollectorName)
|
|
}
|
|
// TODO: random part
|
|
return "errors.json"
|
|
}
|