fix: Collect logs from all pods specified in logs collector spec (#952)

Fixes: #945
This commit is contained in:
Evans Mungai
2023-01-09 16:04:01 +00:00
committed by GitHub
parent 4601db58f9
commit 70af0ff3d0
3 changed files with 105 additions and 50 deletions

View File

@@ -10,6 +10,7 @@ import (
"github.com/pkg/errors"
troubleshootv1beta2 "github.com/replicatedhq/troubleshoot/pkg/apis/troubleshoot/v1beta2"
"github.com/replicatedhq/troubleshoot/pkg/constants"
"github.com/replicatedhq/troubleshoot/pkg/logger"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -20,7 +21,7 @@ import (
type CollectLogs struct {
Collector *troubleshootv1beta2.Logs
BundlePath string
Namespace string // There is a Namespace parameter in troubleshootv1beta2.Logs. Should we remove this?
Namespace string // TODO: There is a Namespace parameter in troubleshootv1beta2.Logs. Should we remove this?
ClientConfig *rest.Config
Client kubernetes.Interface
Context context.Context
@@ -42,24 +43,30 @@ func (c *CollectLogs) Collect(progressChan chan<- interface{}) (CollectorResult,
return nil, err
}
output := NewResult()
return c.CollectWithClient(progressChan, client)
ctx := context.Background()
}
const timeout = 60 //timeout in seconds used for context timeout value
// CollectWithClient is a helper function that allows passing in a kubernetes client
// It's a stopgap implementation before it's decided whether to either always use a single
// client for collectors or leave the implementation as is.
// Ref: https://github.com/replicatedhq/troubleshoot/pull/821#discussion_r1026258904
func (c *CollectLogs) CollectWithClient(progressChan chan<- interface{}, client kubernetes.Interface) (CollectorResult, error) {
out := NewResult()
// timeout context
ctxTimeout, cancel := context.WithTimeout(context.Background(), timeout*time.Second)
ctx, cancel := context.WithTimeout(c.Context, constants.DEFAULT_LOGS_COLLECTOR_TIMEOUT)
defer cancel()
errCh := make(chan error, 1)
resultCh := make(chan CollectorResult, 1)
//wrapped code go func for context timeout solution
go func() {
output := NewResult()
done := make(chan struct{}, 1)
// Collect logs in a go routine to allow timing out of long running operations
// If a timeout occurs, the passed in collector result will contain logs collected
// prior. We want this to be the case so as to have some logs in the support bundle
// even if not from all expected pods.
// TODO: In future all collectors will have a timeout. This will be implemented in the
// framework level (caller of Collect function). Remove this code when we get there.
go func(output CollectorResult) {
if c.SinceTime != nil {
if c.Collector.Limits == nil {
c.Collector.Limits = new(troubleshootv1beta2.LogLimits)
@@ -98,8 +105,6 @@ func (c *CollectLogs) Collect(progressChan chan<- interface{}) (CollectorResult,
continue
}
output.AddResult(podLogs)
resultCh <- output
}
} else {
for _, container := range c.Collector.ContainerNames {
@@ -113,24 +118,24 @@ func (c *CollectLogs) Collect(progressChan chan<- interface{}) (CollectorResult,
continue
}
output.AddResult(containerLogs)
resultCh <- output
}
}
}
} else {
resultCh <- output
}
}()
// Send a signal to indicate that we are done collecting logs
done <- struct{}{}
}(out)
select {
case <-ctxTimeout.Done():
return nil, fmt.Errorf("%s (%s) collector timeout exceeded", c.Title(), c.Collector.CollectorName)
case o := <-resultCh:
output = o
case <-ctx.Done():
// When we timeout, return the logs we have collected so far
return out, fmt.Errorf("%s (%s) collector timeout exceeded", c.Title(), c.Collector.CollectorName)
case <-done:
return out, nil
case err := <-errCh:
return nil, err
}
return output, nil
}
func listPodsInSelectors(ctx context.Context, client kubernetes.Interface, namespace string, selector []string) ([]corev1.Pod, []string) {
@@ -149,19 +154,6 @@ func listPodsInSelectors(ctx context.Context, client kubernetes.Interface, names
}
func savePodLogs(
ctx context.Context,
bundlePath string,
client *kubernetes.Clientset,
pod *corev1.Pod,
collectorName, container string,
limits *troubleshootv1beta2.LogLimits,
follow bool,
createSymLinks bool,
) (CollectorResult, error) {
return savePodLogsWithInterface(ctx, bundlePath, client, pod, collectorName, container, limits, follow, createSymLinks)
}
func savePodLogsWithInterface(
ctx context.Context,
bundlePath string,
client kubernetes.Interface,

View File

@@ -7,8 +7,10 @@ import (
troubleshootv1beta2 "github.com/replicatedhq/troubleshoot/pkg/apis/troubleshoot/v1beta2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
testclient "k8s.io/client-go/kubernetes/fake"
)
@@ -158,25 +160,82 @@ func Test_savePodLogs(t *testing.T) {
MaxLines: 500,
MaxBytes: 10000000,
}
pod, err := client.CoreV1().Pods("my-namespace").Create(ctx, &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: containerName,
},
},
},
}, metav1.CreateOptions{})
assert.NoError(t, err)
pod, err := createPod(client, containerName, "test-pod", "my-namespace")
require.NoError(t, err)
if !tt.withContainerName {
containerName = ""
}
got, err := savePodLogsWithInterface(ctx, "", client, pod, tt.collectorName, containerName, limits, false, tt.createSymLinks)
got, err := savePodLogs(ctx, "", client, pod, tt.collectorName, containerName, limits, false, tt.createSymLinks)
assert.NoError(t, err)
assert.Equal(t, tt.want, got)
})
}
}
func Test_CollectLogs(t *testing.T) {
tests := []struct {
name string
collectorName string
podNames []string
want CollectorResult
}{
{
name: "from multiple pods",
collectorName: "all-logs",
podNames: []string{
"firstPod",
"secondPod",
},
want: CollectorResult{
"all-logs/firstPod/nginx.log": []byte("fake logs"),
"all-logs/firstPod/nginx-previous.log": []byte("fake logs"),
"all-logs/secondPod/nginx.log": []byte("fake logs"),
"all-logs/secondPod/nginx-previous.log": []byte("fake logs"),
"cluster-resources/pods/logs/my-namespace/firstPod/nginx.log": []byte("fake logs"),
"cluster-resources/pods/logs/my-namespace/firstPod/nginx-previous.log": []byte("fake logs"),
"cluster-resources/pods/logs/my-namespace/secondPod/nginx.log": []byte("fake logs"),
"cluster-resources/pods/logs/my-namespace/secondPod/nginx-previous.log": []byte("fake logs"),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.TODO()
ns := "my-namespace"
client := testclient.NewSimpleClientset()
for _, podName := range tt.podNames {
_, err := createPod(client, "nginx", podName, ns)
require.NoError(t, err)
}
progresChan := make(chan any)
c := &CollectLogs{
Context: ctx,
Namespace: ns,
Collector: &troubleshootv1beta2.Logs{
Name: tt.collectorName,
},
}
got, err := c.CollectWithClient(progresChan, client)
assert.NoError(t, err)
assert.Equal(t, tt.want, got)
})
}
}
func createPod(client kubernetes.Interface, containerName, podName, ns string) (*corev1.Pod, error) {
return client.CoreV1().Pods(ns).Create(context.TODO(), &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: containerName,
},
},
},
}, metav1.CreateOptions{})
}

View File

@@ -1,5 +1,7 @@
package constants
import "time"
const (
// DEFAULT_CLIENT_QPS indicates the maximum QPS from troubleshoot client.
DEFAULT_CLIENT_QPS = 100
@@ -9,4 +11,6 @@ const (
DEFAULT_CLIENT_USER_AGENT = "ReplicatedTroubleshoot"
// VersionFilename is the name of the file that contains the support bundle version.
VersionFilename = "version.yaml"
// DEFAULT_LOGS_COLLECTOR_TIMEOUT is the default timeout for logs collector.
DEFAULT_LOGS_COLLECTOR_TIMEOUT = 60 * time.Second
)