mirror of
https://github.com/replicatedhq/troubleshoot.git
synced 2026-02-14 18:29:53 +00:00
fix: simplify logs collector by removing goroutine which had a concurrency bug (#1155)
* fix: logs collector timeout prevent concurrent map iteration and map write * remove goroutine * log pod logs timeout
This commit is contained in:
@@ -52,90 +52,72 @@ func (c *CollectLogs) Collect(progressChan chan<- interface{}) (CollectorResult,
|
||||
// client for collectors or leave the implementation as is.
|
||||
// Ref: https://github.com/replicatedhq/troubleshoot/pull/821#discussion_r1026258904
|
||||
func (c *CollectLogs) CollectWithClient(progressChan chan<- interface{}, client kubernetes.Interface) (CollectorResult, error) {
|
||||
out := NewResult()
|
||||
output := NewResult()
|
||||
|
||||
ctx, cancel := context.WithTimeout(c.Context, constants.DEFAULT_LOGS_COLLECTOR_TIMEOUT)
|
||||
defer cancel()
|
||||
|
||||
errCh := make(chan error, 1)
|
||||
done := make(chan struct{}, 1)
|
||||
|
||||
// Collect logs in a go routine to allow timing out of long running operations
|
||||
// If a timeout occurs, the passed in collector result will contain logs collected
|
||||
// prior. We want this to be the case so as to have some logs in the support bundle
|
||||
// even if not from all expected pods.
|
||||
// TODO: In future all collectors will have a timeout. This will be implemented in the
|
||||
// framework level (caller of Collect function). Remove this code when we get there.
|
||||
go func(output CollectorResult) {
|
||||
if c.SinceTime != nil {
|
||||
if c.Collector.Limits == nil {
|
||||
c.Collector.Limits = new(troubleshootv1beta2.LogLimits)
|
||||
}
|
||||
c.Collector.Limits.SinceTime = metav1.NewTime(*c.SinceTime)
|
||||
if c.SinceTime != nil {
|
||||
if c.Collector.Limits == nil {
|
||||
c.Collector.Limits = new(troubleshootv1beta2.LogLimits)
|
||||
}
|
||||
|
||||
pods, podsErrors := listPodsInSelectors(ctx, client, c.Collector.Namespace, c.Collector.Selector)
|
||||
if len(podsErrors) > 0 {
|
||||
output.SaveResult(c.BundlePath, getLogsErrorsFileName(c.Collector), marshalErrors(podsErrors))
|
||||
}
|
||||
|
||||
if len(pods) > 0 {
|
||||
for _, pod := range pods {
|
||||
if len(c.Collector.ContainerNames) == 0 {
|
||||
// make a list of all the containers in the pod, so that we can get logs from all of them
|
||||
containerNames := []string{}
|
||||
for _, container := range pod.Spec.Containers {
|
||||
containerNames = append(containerNames, container.Name)
|
||||
}
|
||||
for _, container := range pod.Spec.InitContainers {
|
||||
containerNames = append(containerNames, container.Name)
|
||||
}
|
||||
|
||||
for _, containerName := range containerNames {
|
||||
podLogs, err := savePodLogs(ctx, c.BundlePath, client, &pod, c.Collector.Name, containerName, c.Collector.Limits, false, true)
|
||||
if err != nil {
|
||||
key := fmt.Sprintf("%s/%s-errors.json", c.Collector.Name, pod.Name)
|
||||
if containerName != "" {
|
||||
key = fmt.Sprintf("%s/%s/%s-errors.json", c.Collector.Name, pod.Name, containerName)
|
||||
}
|
||||
err := output.SaveResult(c.BundlePath, key, marshalErrors([]string{err.Error()}))
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
continue
|
||||
}
|
||||
output.AddResult(podLogs)
|
||||
}
|
||||
} else {
|
||||
for _, container := range c.Collector.ContainerNames {
|
||||
containerLogs, err := savePodLogs(ctx, c.BundlePath, client, &pod, c.Collector.Name, container, c.Collector.Limits, false, true)
|
||||
if err != nil {
|
||||
key := fmt.Sprintf("%s/%s/%s-errors.json", c.Collector.Name, pod.Name, container)
|
||||
err := output.SaveResult(c.BundlePath, key, marshalErrors([]string{err.Error()}))
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
continue
|
||||
}
|
||||
output.AddResult(containerLogs)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Send a signal to indicate that we are done collecting logs
|
||||
done <- struct{}{}
|
||||
}(out)
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// When we timeout, return the logs we have collected so far
|
||||
return out, fmt.Errorf("%s (%s) collector timeout exceeded", c.Title(), c.Collector.CollectorName)
|
||||
case <-done:
|
||||
return out, nil
|
||||
case err := <-errCh:
|
||||
return nil, err
|
||||
c.Collector.Limits.SinceTime = metav1.NewTime(*c.SinceTime)
|
||||
}
|
||||
|
||||
pods, podsErrors := listPodsInSelectors(ctx, client, c.Collector.Namespace, c.Collector.Selector)
|
||||
if len(podsErrors) > 0 {
|
||||
output.SaveResult(c.BundlePath, getLogsErrorsFileName(c.Collector), marshalErrors(podsErrors))
|
||||
}
|
||||
|
||||
for _, pod := range pods {
|
||||
if len(c.Collector.ContainerNames) == 0 {
|
||||
// make a list of all the containers in the pod, so that we can get logs from all of them
|
||||
containerNames := []string{}
|
||||
for _, container := range pod.Spec.Containers {
|
||||
containerNames = append(containerNames, container.Name)
|
||||
}
|
||||
for _, container := range pod.Spec.InitContainers {
|
||||
containerNames = append(containerNames, container.Name)
|
||||
}
|
||||
|
||||
for _, containerName := range containerNames {
|
||||
podLogs, err := savePodLogs(ctx, c.BundlePath, client, &pod, c.Collector.Name, containerName, c.Collector.Limits, false, true)
|
||||
if err != nil {
|
||||
if errors.Is(err, context.DeadlineExceeded) {
|
||||
klog.Errorf("Pod logs timed out for pod %s and container %s: %v", pod.Name, containerName, err)
|
||||
}
|
||||
key := fmt.Sprintf("%s/%s-errors.json", c.Collector.Name, pod.Name)
|
||||
if containerName != "" {
|
||||
key = fmt.Sprintf("%s/%s/%s-errors.json", c.Collector.Name, pod.Name, containerName)
|
||||
}
|
||||
err := output.SaveResult(c.BundlePath, key, marshalErrors([]string{err.Error()}))
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to save pod logs result for pod %s and container %s: %v", pod.Name, containerName, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
output.AddResult(podLogs)
|
||||
}
|
||||
} else {
|
||||
for _, containerName := range c.Collector.ContainerNames {
|
||||
containerLogs, err := savePodLogs(ctx, c.BundlePath, client, &pod, c.Collector.Name, containerName, c.Collector.Limits, false, true)
|
||||
if err != nil {
|
||||
if errors.Is(err, context.DeadlineExceeded) {
|
||||
klog.Errorf("Pod logs timed out for pod %s and container %s: %v", pod.Name, containerName, err)
|
||||
}
|
||||
key := fmt.Sprintf("%s/%s/%s-errors.json", c.Collector.Name, pod.Name, containerName)
|
||||
err := output.SaveResult(c.BundlePath, key, marshalErrors([]string{err.Error()}))
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to save pod logs result for pod %s and container %s: %v", pod.Name, containerName, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
output.AddResult(containerLogs)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return output, nil
|
||||
}
|
||||
|
||||
func listPodsInSelectors(ctx context.Context, client kubernetes.Interface, namespace string, selector []string) ([]corev1.Pod, []string) {
|
||||
|
||||
Reference in New Issue
Block a user