From af4cc8a08a369acf4328ff595f5d0148c64cc6a3 Mon Sep 17 00:00:00 2001 From: Ethan Mosbaugh Date: Wed, 17 May 2023 23:43:47 -0700 Subject: [PATCH] fix: simplify logs collector by removing goroutine which had a concurrency bug (#1155) * fix: logs collector timeout prevent concurrent map iteration and map write * remove goroutine * log pod logs timeout --- pkg/collect/logs.go | 138 +++++++++++++++++++------------------------- 1 file changed, 60 insertions(+), 78 deletions(-) diff --git a/pkg/collect/logs.go b/pkg/collect/logs.go index df5fc938..bf854d10 100644 --- a/pkg/collect/logs.go +++ b/pkg/collect/logs.go @@ -52,90 +52,72 @@ func (c *CollectLogs) Collect(progressChan chan<- interface{}) (CollectorResult, // client for collectors or leave the implementation as is. // Ref: https://github.com/replicatedhq/troubleshoot/pull/821#discussion_r1026258904 func (c *CollectLogs) CollectWithClient(progressChan chan<- interface{}, client kubernetes.Interface) (CollectorResult, error) { - out := NewResult() + output := NewResult() ctx, cancel := context.WithTimeout(c.Context, constants.DEFAULT_LOGS_COLLECTOR_TIMEOUT) defer cancel() - errCh := make(chan error, 1) - done := make(chan struct{}, 1) - - // Collect logs in a go routine to allow timing out of long running operations - // If a timeout occurs, the passed in collector result will contain logs collected - // prior. We want this to be the case so as to have some logs in the support bundle - // even if not from all expected pods. - // TODO: In future all collectors will have a timeout. This will be implemented in the - // framework level (caller of Collect function). Remove this code when we get there. - go func(output CollectorResult) { - if c.SinceTime != nil { - if c.Collector.Limits == nil { - c.Collector.Limits = new(troubleshootv1beta2.LogLimits) - } - c.Collector.Limits.SinceTime = metav1.NewTime(*c.SinceTime) + if c.SinceTime != nil { + if c.Collector.Limits == nil { + c.Collector.Limits = new(troubleshootv1beta2.LogLimits) } - - pods, podsErrors := listPodsInSelectors(ctx, client, c.Collector.Namespace, c.Collector.Selector) - if len(podsErrors) > 0 { - output.SaveResult(c.BundlePath, getLogsErrorsFileName(c.Collector), marshalErrors(podsErrors)) - } - - if len(pods) > 0 { - for _, pod := range pods { - if len(c.Collector.ContainerNames) == 0 { - // make a list of all the containers in the pod, so that we can get logs from all of them - containerNames := []string{} - for _, container := range pod.Spec.Containers { - containerNames = append(containerNames, container.Name) - } - for _, container := range pod.Spec.InitContainers { - containerNames = append(containerNames, container.Name) - } - - for _, containerName := range containerNames { - podLogs, err := savePodLogs(ctx, c.BundlePath, client, &pod, c.Collector.Name, containerName, c.Collector.Limits, false, true) - if err != nil { - key := fmt.Sprintf("%s/%s-errors.json", c.Collector.Name, pod.Name) - if containerName != "" { - key = fmt.Sprintf("%s/%s/%s-errors.json", c.Collector.Name, pod.Name, containerName) - } - err := output.SaveResult(c.BundlePath, key, marshalErrors([]string{err.Error()})) - if err != nil { - errCh <- err - } - continue - } - output.AddResult(podLogs) - } - } else { - for _, container := range c.Collector.ContainerNames { - containerLogs, err := savePodLogs(ctx, c.BundlePath, client, &pod, c.Collector.Name, container, c.Collector.Limits, false, true) - if err != nil { - key := fmt.Sprintf("%s/%s/%s-errors.json", c.Collector.Name, pod.Name, container) - err := output.SaveResult(c.BundlePath, key, marshalErrors([]string{err.Error()})) - if err != nil { - errCh <- err - } - continue - } - output.AddResult(containerLogs) - } - } - } - } - - // Send a signal to indicate that we are done collecting logs - done <- struct{}{} - }(out) - - select { - case <-ctx.Done(): - // When we timeout, return the logs we have collected so far - return out, fmt.Errorf("%s (%s) collector timeout exceeded", c.Title(), c.Collector.CollectorName) - case <-done: - return out, nil - case err := <-errCh: - return nil, err + c.Collector.Limits.SinceTime = metav1.NewTime(*c.SinceTime) } + + pods, podsErrors := listPodsInSelectors(ctx, client, c.Collector.Namespace, c.Collector.Selector) + if len(podsErrors) > 0 { + output.SaveResult(c.BundlePath, getLogsErrorsFileName(c.Collector), marshalErrors(podsErrors)) + } + + for _, pod := range pods { + if len(c.Collector.ContainerNames) == 0 { + // make a list of all the containers in the pod, so that we can get logs from all of them + containerNames := []string{} + for _, container := range pod.Spec.Containers { + containerNames = append(containerNames, container.Name) + } + for _, container := range pod.Spec.InitContainers { + containerNames = append(containerNames, container.Name) + } + + for _, containerName := range containerNames { + podLogs, err := savePodLogs(ctx, c.BundlePath, client, &pod, c.Collector.Name, containerName, c.Collector.Limits, false, true) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + klog.Errorf("Pod logs timed out for pod %s and container %s: %v", pod.Name, containerName, err) + } + key := fmt.Sprintf("%s/%s-errors.json", c.Collector.Name, pod.Name) + if containerName != "" { + key = fmt.Sprintf("%s/%s/%s-errors.json", c.Collector.Name, pod.Name, containerName) + } + err := output.SaveResult(c.BundlePath, key, marshalErrors([]string{err.Error()})) + if err != nil { + klog.Errorf("Failed to save pod logs result for pod %s and container %s: %v", pod.Name, containerName, err) + } + continue + } + output.AddResult(podLogs) + } + } else { + for _, containerName := range c.Collector.ContainerNames { + containerLogs, err := savePodLogs(ctx, c.BundlePath, client, &pod, c.Collector.Name, containerName, c.Collector.Limits, false, true) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + klog.Errorf("Pod logs timed out for pod %s and container %s: %v", pod.Name, containerName, err) + } + key := fmt.Sprintf("%s/%s/%s-errors.json", c.Collector.Name, pod.Name, containerName) + err := output.SaveResult(c.BundlePath, key, marshalErrors([]string{err.Error()})) + if err != nil { + klog.Errorf("Failed to save pod logs result for pod %s and container %s: %v", pod.Name, containerName, err) + } + continue + } + output.AddResult(containerLogs) + } + } + } + + return output, nil } func listPodsInSelectors(ctx context.Context, client kubernetes.Interface, namespace string, selector []string) ([]corev1.Pod, []string) {