diff --git a/probe/kubernetes/client.go b/probe/kubernetes/client.go index 6f2d31f94..da340bb19 100644 --- a/probe/kubernetes/client.go +++ b/probe/kubernetes/client.go @@ -39,7 +39,7 @@ type Client interface { WatchPods(f func(Event, Pod)) - GetLogs(namespaceID, podID string) (io.ReadCloser, error) + GetLogs(namespaceID, podID string, containerNames []string) (io.ReadCloser, error) DeletePod(namespaceID, podID string) error ScaleUp(resource, namespaceID, id string) error ScaleDown(resource, namespaceID, id string) error @@ -328,15 +328,28 @@ func (c *client) WalkNamespaces(f func(NamespaceResource) error) error { return nil } -func (c *client) GetLogs(namespaceID, podID string) (io.ReadCloser, error) { - req := c.client.CoreV1().Pods(namespaceID).GetLogs( - podID, - &apiv1.PodLogOptions{ - Follow: true, - Timestamps: true, - }, - ) - return req.Stream() +func (c *client) GetLogs(namespaceID, podID string, containerNames []string) (io.ReadCloser, error) { + readClosersWithLabel := map[io.ReadCloser]string{} + for _, container := range containerNames { + req := c.client.CoreV1().Pods(namespaceID).GetLogs( + podID, + &apiv1.PodLogOptions{ + Follow: true, + Timestamps: true, + Container: container, + }, + ) + readCloser, err := req.Stream() + if err != nil { + for rc := range readClosersWithLabel { + rc.Close() + } + return nil, err + } + readClosersWithLabel[readCloser] = container + } + + return NewLogReadCloser(readClosersWithLabel), nil } func (c *client) DeletePod(namespaceID, podID string) error { diff --git a/probe/kubernetes/controls.go b/probe/kubernetes/controls.go index 62ddb806e..55f31e004 100644 --- a/probe/kubernetes/controls.go +++ b/probe/kubernetes/controls.go @@ -18,8 +18,8 @@ const ( ) // GetLogs is the control to get the logs for a kubernetes pod -func (r *Reporter) GetLogs(req xfer.Request, namespaceID, podID string) xfer.Response { - readCloser, err := r.client.GetLogs(namespaceID, podID) +func (r *Reporter) GetLogs(req xfer.Request, namespaceID, podID string, containerNames []string) xfer.Response { + readCloser, err := r.client.GetLogs(namespaceID, podID, containerNames) if err != nil { return xfer.ResponseError(err) } @@ -43,7 +43,7 @@ func (r *Reporter) GetLogs(req xfer.Request, namespaceID, podID string) xfer.Res } } -func (r *Reporter) deletePod(req xfer.Request, namespaceID, podID string) xfer.Response { +func (r *Reporter) deletePod(req xfer.Request, namespaceID, podID string, _ []string) xfer.Response { if err := r.client.DeletePod(namespaceID, podID); err != nil { return xfer.ResponseError(err) } @@ -53,7 +53,7 @@ func (r *Reporter) deletePod(req xfer.Request, namespaceID, podID string) xfer.R } // CapturePod is exported for testing -func (r *Reporter) CapturePod(f func(xfer.Request, string, string) xfer.Response) func(xfer.Request) xfer.Response { +func (r *Reporter) CapturePod(f func(xfer.Request, string, string, []string) xfer.Response) func(xfer.Request) xfer.Response { return func(req xfer.Request) xfer.Response { uid, ok := report.ParsePodNodeID(req.NodeID) if !ok { @@ -70,7 +70,7 @@ func (r *Reporter) CapturePod(f func(xfer.Request, string, string) xfer.Response if pod == nil { return xfer.ResponseErrorf("Pod not found: %s", uid) } - return f(req, pod.Namespace(), pod.Name()) + return f(req, pod.Namespace(), pod.Name(), pod.ContainerNames()) } } diff --git a/probe/kubernetes/logreadcloser.go b/probe/kubernetes/logreadcloser.go new file mode 100644 index 000000000..740bd1b61 --- /dev/null +++ b/probe/kubernetes/logreadcloser.go @@ -0,0 +1,159 @@ +package kubernetes + +import ( + "bufio" + "bytes" + "fmt" + "io" + "math" + "sync" +) + +type logReadCloser struct { + labels []string + labelLength int + readClosers []io.ReadCloser + eof []bool + buffer bytes.Buffer + dataChannel chan []byte + eofChannel chan int + wg sync.WaitGroup +} + +// NewLogReadCloser reads from multiple io.ReadCloser, where data is available, +// and annotates each line with the reader's label +func NewLogReadCloser(readClosersWithLabel map[io.ReadCloser]string) io.ReadCloser { + n := len(readClosersWithLabel) + labels := make([]string, n) + readClosers := make([]io.ReadCloser, n) + + i := 0 + labelLength := 0 + for readCloser, label := range readClosersWithLabel { + readClosers[i] = readCloser + labels[i] = label + labelLength = int(math.Max(float64(labelLength), float64(len(label)))) + i++ + } + + l := logReadCloser{ + readClosers: readClosers, + labels: labels, + labelLength: labelLength, + dataChannel: make(chan []byte), + eofChannel: make(chan int), + eof: make([]bool, len(readClosers)), + } + + l.wg.Add(n) + + for idx := range l.readClosers { + go l.readInput(idx) + } + + return &l +} + +func (l *logReadCloser) Read(p []byte) (int, error) { + if len(p) <= l.buffer.Len() { + return l.readInternalBuffer(p) + } + + // if there's data available to read, read it, + // otherwise block + byteCount := 0 + if l.buffer.Len() > 0 { + n, err := l.readInternalBuffer(p) + if err != nil { + return n, err + } + byteCount += n + } else { + // block on read or EOF + received := false + for !received && !l.isEOF() { + select { + case data := <-l.dataChannel: + l.buffer.Write(data) + received = true + case idx := <-l.eofChannel: + l.eof[idx] = true + } + } + } + + // check if there's more data to read, without blocking + empty := false + for !empty && l.buffer.Len() < len(p) { + select { + case data := <-l.dataChannel: + l.buffer.Write(data) + case idx := <-l.eofChannel: + l.eof[idx] = true + default: + empty = true + } + } + + return l.readInternalBuffer(p[byteCount:]) +} + +func (l *logReadCloser) Close() error { + for _, rc := range l.readClosers { + err := rc.Close() + if err != nil { + return err + } + } + + l.wg.Wait() + + close(l.dataChannel) + close(l.eofChannel) + return nil +} + +func (l *logReadCloser) readInternalBuffer(p []byte) (int, error) { + n, err := l.buffer.Read(p) + if err == io.EOF && !l.isEOF() { + return n, nil + } + return n, err +} + +func (l *logReadCloser) readInput(idx int) { + defer l.wg.Done() + reader := bufio.NewReader(l.readClosers[idx]) + for { + line, err := reader.ReadBytes('\n') + if err == io.EOF { + if len(line) > 0 { + l.dataChannel <- l.annotateLine(idx, line) + } + l.eofChannel <- idx + break + } + if err != nil { + // error, exit + break + } + l.dataChannel <- l.annotateLine(idx, line) + } +} + +func (l *logReadCloser) annotateLine(idx int, line []byte) []byte { + // do not annotate if it's the only reader + if len(l.labels) == 1 { + return line + } + return []byte(fmt.Sprintf("[%-*s] %v", l.labelLength, l.labels[idx], string(line))) +} + +func (l *logReadCloser) isEOF() bool { + for _, e := range l.eof { + if !e { + return false + } + } + return true +} diff --git a/probe/kubernetes/logreadcloser_test.go b/probe/kubernetes/logreadcloser_test.go new file mode 100644 index 000000000..66806cfe8 --- /dev/null +++ b/probe/kubernetes/logreadcloser_test.go @@ -0,0 +1,87 @@ +package kubernetes_test + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "strings" + "testing" + + "github.com/weaveworks/scope/probe/kubernetes" +) + +func TestLogReadCloser(t *testing.T) { + data0 := []byte("abcdefghijklmno\npqrstuvwxyz\n") + data1 := []byte("ABCDEFGHI\nJKLMNOPQRSTUVWXYZ\n") + data2 := []byte("012345678901\n2345\n\n678\n") + + label0 := "zero" + label1 := "one" + label2 := "two" + longestlabelLength := len(label0) + + readClosersWithLabel := map[io.ReadCloser]string{} + r0 := ioutil.NopCloser(bytes.NewReader(data0)) + readClosersWithLabel[r0] = label0 + r1 := ioutil.NopCloser(bytes.NewReader(data1)) + readClosersWithLabel[r1] = label1 + r2 := ioutil.NopCloser(bytes.NewReader(data2)) + readClosersWithLabel[r2] = label2 + + l := kubernetes.NewLogReadCloser(readClosersWithLabel) + + buf := make([]byte, 3000) + count := 0 + for { + n, err := l.Read(buf[count:]) + if err == io.EOF { + break + } + if err != nil { + t.Error(err) + } + count += n + } + + // convert to string for easier comparison + result := map[string]int{} + lineCounter(result, longestlabelLength, label0, data0) + lineCounter(result, longestlabelLength, label1, data1) + lineCounter(result, longestlabelLength, label2, data2) + + str := string(buf[:count]) + for _, line := range strings.SplitAfter(str, "\n") { + v, ok := result[line] + if ok { + result[line] = v - 1 + } + } + + for line, v := range result { + if v != 0 { + t.Errorf("Line %v has not be read from reader", line) + } + } + + err := l.Close() + if err != nil { + t.Errorf("Close must not return an error: %v", err) + } +} + +func lineCounter(counter map[string]int, pad int, label string, data []byte) { + for _, str := range strings.SplitAfter(string(data), "\n") { + if len(str) == 0 { + // SplitAfter ends with an empty string if the last character is '\n' + continue + } + line := fmt.Sprintf("[%-*s] %v", pad, label, str) + v, ok := counter[line] + if !ok { + v = 0 + } + v++ + counter[line] = v + } +} diff --git a/probe/kubernetes/pod.go b/probe/kubernetes/pod.go index d96336d90..789e8ecea 100644 --- a/probe/kubernetes/pod.go +++ b/probe/kubernetes/pod.go @@ -24,6 +24,7 @@ type Pod interface { NodeName() string GetNode(probeID string) report.Node RestartCount() uint + ContainerNames() []string } type pod struct { @@ -86,3 +87,11 @@ func (p *pod) GetNode(probeID string) report.Node { WithParents(p.parents). WithLatestActiveControls(GetLogs, DeletePod) } + +func (p *pod) ContainerNames() []string { + containerNames := make([]string, 0, len(p.Pod.Spec.Containers)) + for _, c := range p.Pod.Spec.Containers { + containerNames = append(containerNames, c.Name) + } + return containerNames +} diff --git a/probe/kubernetes/reporter_test.go b/probe/kubernetes/reporter_test.go index b6bb7778c..aede98383 100644 --- a/probe/kubernetes/reporter_test.go +++ b/probe/kubernetes/reporter_test.go @@ -155,7 +155,7 @@ func (c *mockClient) WalkNamespaces(f func(kubernetes.NamespaceResource) error) return nil } func (*mockClient) WatchPods(func(kubernetes.Event, kubernetes.Pod)) {} -func (c *mockClient) GetLogs(namespaceID, podName string) (io.ReadCloser, error) { +func (c *mockClient) GetLogs(namespaceID, podName string, _ []string) (io.ReadCloser, error) { r, ok := c.logs[namespaceID+";"+podName] if !ok { return nil, fmt.Errorf("Not found")