From 0b86c65e668328efd6611ca612abd1cf1139112e Mon Sep 17 00:00:00 2001 From: Roberto Bruggemann Date: Tue, 2 Jan 2018 17:48:01 +0000 Subject: [PATCH 1/3] Reading pod logs returns all container logs This is achieved by issuing an http request for each container to kubernetes' API, which yields one Reader for the corresponding container. `logReadCloser' then reads from the above readers in parallel as data is available, buffering when necessary, forwarding it to clients by implementing the io.ReadCloser interface. --- probe/kubernetes/client.go | 32 ++++-- probe/kubernetes/controls.go | 10 +- probe/kubernetes/logreadcloser.go | 148 +++++++++++++++++++++++++ probe/kubernetes/logreadcloser_test.go | 77 +++++++++++++ probe/kubernetes/pod.go | 9 ++ probe/kubernetes/reporter_test.go | 2 +- 6 files changed, 262 insertions(+), 16 deletions(-) create mode 100644 probe/kubernetes/logreadcloser.go create mode 100644 probe/kubernetes/logreadcloser_test.go diff --git a/probe/kubernetes/client.go b/probe/kubernetes/client.go index c629ae0e0..7e3f2f156 100644 --- a/probe/kubernetes/client.go +++ b/probe/kubernetes/client.go @@ -39,7 +39,7 @@ type Client interface { WatchPods(f func(Event, Pod)) - GetLogs(namespaceID, podID string) (io.ReadCloser, error) + GetLogs(namespaceID, podID string, containerNames []string) (io.ReadCloser, error) DeletePod(namespaceID, podID string) error ScaleUp(resource, namespaceID, id string) error ScaleDown(resource, namespaceID, id string) error @@ -326,15 +326,27 @@ func (c *client) WalkNodes(f func(*apiv1.Node) error) error { return nil } -func (c *client) GetLogs(namespaceID, podID string) (io.ReadCloser, error) { - req := c.client.CoreV1().Pods(namespaceID).GetLogs( - podID, - &apiv1.PodLogOptions{ - Follow: true, - Timestamps: true, - }, - ) - return req.Stream() +func (c *client) GetLogs(namespaceID, podID string, containerNames []string) (io.ReadCloser, error) { + readClosers := make([]io.ReadCloser, len(containerNames)) + for i, container := range containerNames { + req := c.client.CoreV1().Pods(namespaceID).GetLogs( + podID, + &apiv1.PodLogOptions{ + Follow: true, + Timestamps: true, + Container: container, + }, + ) + readCloser, err := req.Stream() + if err != nil { + for _, rc := range readClosers { + rc.Close() + } + return nil, err + } + readClosers[i] = readCloser + } + return NewLogReadCloser(readClosers...), nil } func (c *client) DeletePod(namespaceID, podID string) error { diff --git a/probe/kubernetes/controls.go b/probe/kubernetes/controls.go index 62ddb806e..55f31e004 100644 --- a/probe/kubernetes/controls.go +++ b/probe/kubernetes/controls.go @@ -18,8 +18,8 @@ const ( ) // GetLogs is the control to get the logs for a kubernetes pod -func (r *Reporter) GetLogs(req xfer.Request, namespaceID, podID string) xfer.Response { - readCloser, err := r.client.GetLogs(namespaceID, podID) +func (r *Reporter) GetLogs(req xfer.Request, namespaceID, podID string, containerNames []string) xfer.Response { + readCloser, err := r.client.GetLogs(namespaceID, podID, containerNames) if err != nil { return xfer.ResponseError(err) } @@ -43,7 +43,7 @@ func (r *Reporter) GetLogs(req xfer.Request, namespaceID, podID string) xfer.Res } } -func (r *Reporter) deletePod(req xfer.Request, namespaceID, podID string) xfer.Response { +func (r *Reporter) deletePod(req xfer.Request, namespaceID, podID string, _ []string) xfer.Response { if err := r.client.DeletePod(namespaceID, podID); err != nil { return xfer.ResponseError(err) } @@ -53,7 +53,7 @@ func (r *Reporter) deletePod(req xfer.Request, namespaceID, podID string) xfer.R } // CapturePod is exported for testing -func (r *Reporter) CapturePod(f func(xfer.Request, string, string) xfer.Response) func(xfer.Request) xfer.Response { +func (r *Reporter) CapturePod(f func(xfer.Request, string, string, []string) xfer.Response) func(xfer.Request) xfer.Response { return func(req xfer.Request) xfer.Response { uid, ok := report.ParsePodNodeID(req.NodeID) if !ok { @@ -70,7 +70,7 @@ func (r *Reporter) CapturePod(f func(xfer.Request, string, string) xfer.Response if pod == nil { return xfer.ResponseErrorf("Pod not found: %s", uid) } - return f(req, pod.Namespace(), pod.Name()) + return f(req, pod.Namespace(), pod.Name(), pod.ContainerNames()) } } diff --git a/probe/kubernetes/logreadcloser.go b/probe/kubernetes/logreadcloser.go new file mode 100644 index 000000000..865ce73f4 --- /dev/null +++ b/probe/kubernetes/logreadcloser.go @@ -0,0 +1,148 @@ +package kubernetes + +import ( + "bytes" + "io" + + log "github.com/Sirupsen/logrus" +) + +const ( + internalBufferSize = 1024 +) + +type logReadCloser struct { + readClosers []io.ReadCloser + eof []bool + buffer bytes.Buffer + dataChannel chan []byte + stopChannels []chan struct{} + eofChannel chan int +} + +// NewLogReadCloser takes multiple io.ReadCloser and reads where data is available. +func NewLogReadCloser(readClosers ...io.ReadCloser) io.ReadCloser { + stopChannels := make([]chan struct{}, len(readClosers)) + for i := range readClosers { + stopChannels[i] = make(chan struct{}) + } + + l := logReadCloser{ + readClosers: readClosers, + dataChannel: make(chan []byte), + stopChannels: stopChannels, + eofChannel: make(chan int), + eof: make([]bool, len(readClosers)), + } + + for idx := range l.readClosers { + go l.readInput(idx) + } + + return &l +} + +func (l *logReadCloser) Read(p []byte) (int, error) { + if len(p) <= l.buffer.Len() { + return l.readInternalBuffer(p) + } + + // if there's data available to read, read it, + // otherwise block + byteCount := 0 + if l.buffer.Len() > 0 { + n, err := l.readInternalBuffer(p) + if err != nil { + return n, err + } + byteCount += n + } else { + // block on read or EOF + received := false + for !received && !l.isEOF() { + select { + case data := <-l.dataChannel: + l.buffer.Write(data) + received = true + case idx := <-l.eofChannel: + l.eof[idx] = true + } + } + } + + // check if there's more data to read, without blocking + empty := false + for !empty && l.buffer.Len() < len(p) { + select { + case data := <-l.dataChannel: + l.buffer.Write(data) + case idx := <-l.eofChannel: + l.eof[idx] = true + default: + empty = true + } + } + + return l.readInternalBuffer(p[byteCount:]) +} + +func (l *logReadCloser) Close() error { + for i, rc := range l.readClosers { + err := rc.Close() + if err != nil { + return err + } + + // synchronous stop: + // the routines write to dataChannel which will be closed by this thread + select { + case <-l.stopChannels[i]: + break + } + close(l.stopChannels[i]) + } + + close(l.dataChannel) + close(l.eofChannel) + return nil +} + +func (l *logReadCloser) readInternalBuffer(p []byte) (int, error) { + n, err := l.buffer.Read(p) + if err == io.EOF && !l.isEOF() { + return n, nil + } + + return n, err +} + +func (l *logReadCloser) readInput(idx int) { + tmpBuffer := make([]byte, internalBufferSize) + for { + n, err := l.readClosers[idx].Read(tmpBuffer) + if err == io.EOF { + if n > 0 { + l.dataChannel <- tmpBuffer[:n] + } + l.eofChannel <- idx + break + } + if err != nil { + log.Errorf("Failed to read: %v", err) + break + } + l.dataChannel <- tmpBuffer[:n] + } + + // signal the routine won't write to dataChannel + l.stopChannels[idx] <- struct{}{} +} + +func (l *logReadCloser) isEOF() bool { + for _, e := range l.eof { + if !e { + return false + } + } + return true +} diff --git a/probe/kubernetes/logreadcloser_test.go b/probe/kubernetes/logreadcloser_test.go new file mode 100644 index 000000000..55801376b --- /dev/null +++ b/probe/kubernetes/logreadcloser_test.go @@ -0,0 +1,77 @@ +package kubernetes_test + +import ( + "bytes" + "io" + "io/ioutil" + "testing" + + "github.com/weaveworks/scope/probe/kubernetes" +) + +func TestLogReadCloser(t *testing.T) { + s0 := []byte("abcdefghijklmnopqrstuvwxyz") + s1 := []byte("ABCDEFGHIJKLMNOPQRSTUVWXYZ") + s2 := []byte("0123456789012345") + + r0 := ioutil.NopCloser(bytes.NewReader(s0)) + r1 := ioutil.NopCloser(bytes.NewReader(s1)) + r2 := ioutil.NopCloser(bytes.NewReader(s2)) + + l := kubernetes.NewLogReadCloser(r0, r1, r2) + + buf := make([]byte, 3000) + count := 0 + for { + n, err := l.Read(buf[count:]) + if err == io.EOF { + break + } + if err != nil { + t.Error(err) + } + count += n + } + + total := len(s0) + len(s1) + len(s2) + if count != total { + t.Errorf("Must read %v characters, but got %v", total, count) + } + + // check every byte + byteCounter := map[byte]int{} + byteCount(byteCounter, s0) + byteCount(byteCounter, s1) + byteCount(byteCounter, s2) + + for i := 0; i < count; i++ { + b := buf[i] + v, ok := byteCounter[b] + if ok { + v-- + byteCounter[b] = v + } + } + + for b, c := range byteCounter { + if c != 0 { + t.Errorf("%v should be 0 instead of %v", b, c) + } + } + + err := l.Close() + if err != nil { + t.Errorf("Close must not return an error: %v", err) + } +} + +func byteCount(accumulator map[byte]int, s []byte) { + for _, b := range s { + v, ok := accumulator[b] + if !ok { + v = 0 + } + v++ + accumulator[b] = v + } +} diff --git a/probe/kubernetes/pod.go b/probe/kubernetes/pod.go index d96336d90..789e8ecea 100644 --- a/probe/kubernetes/pod.go +++ b/probe/kubernetes/pod.go @@ -24,6 +24,7 @@ type Pod interface { NodeName() string GetNode(probeID string) report.Node RestartCount() uint + ContainerNames() []string } type pod struct { @@ -86,3 +87,11 @@ func (p *pod) GetNode(probeID string) report.Node { WithParents(p.parents). WithLatestActiveControls(GetLogs, DeletePod) } + +func (p *pod) ContainerNames() []string { + containerNames := make([]string, 0, len(p.Pod.Spec.Containers)) + for _, c := range p.Pod.Spec.Containers { + containerNames = append(containerNames, c.Name) + } + return containerNames +} diff --git a/probe/kubernetes/reporter_test.go b/probe/kubernetes/reporter_test.go index f1bd1d6e5..bd37cbcc7 100644 --- a/probe/kubernetes/reporter_test.go +++ b/probe/kubernetes/reporter_test.go @@ -155,7 +155,7 @@ func (*mockClient) WalkNodes(f func(*apiv1.Node) error) error { return nil } func (*mockClient) WatchPods(func(kubernetes.Event, kubernetes.Pod)) {} -func (c *mockClient) GetLogs(namespaceID, podName string) (io.ReadCloser, error) { +func (c *mockClient) GetLogs(namespaceID, podName string, _ []string) (io.ReadCloser, error) { r, ok := c.logs[namespaceID+";"+podName] if !ok { return nil, fmt.Errorf("Not found") From b4e3f85e89cf18324d0f1d7fd9dfaca1ae5f1c74 Mon Sep 17 00:00:00 2001 From: Roberto Bruggemann Date: Fri, 5 Jan 2018 14:40:52 +0000 Subject: [PATCH 2/3] LogReadCloser interleaves 'by line' for each container Also, prepend each line with '[ContainerName]'. --- probe/kubernetes/client.go | 11 +++-- probe/kubernetes/logreadcloser.go | 52 +++++++++++++------- probe/kubernetes/logreadcloser_test.go | 68 +++++++++++++++----------- 3 files changed, 80 insertions(+), 51 deletions(-) diff --git a/probe/kubernetes/client.go b/probe/kubernetes/client.go index 7e3f2f156..da53ceccf 100644 --- a/probe/kubernetes/client.go +++ b/probe/kubernetes/client.go @@ -327,8 +327,8 @@ func (c *client) WalkNodes(f func(*apiv1.Node) error) error { } func (c *client) GetLogs(namespaceID, podID string, containerNames []string) (io.ReadCloser, error) { - readClosers := make([]io.ReadCloser, len(containerNames)) - for i, container := range containerNames { + readClosersWithLabel := map[io.ReadCloser]string{} + for _, container := range containerNames { req := c.client.CoreV1().Pods(namespaceID).GetLogs( podID, &apiv1.PodLogOptions{ @@ -339,14 +339,15 @@ func (c *client) GetLogs(namespaceID, podID string, containerNames []string) (io ) readCloser, err := req.Stream() if err != nil { - for _, rc := range readClosers { + for rc := range readClosersWithLabel { rc.Close() } return nil, err } - readClosers[i] = readCloser + readClosersWithLabel[readCloser] = container } - return NewLogReadCloser(readClosers...), nil + + return NewLogReadCloser(readClosersWithLabel), nil } func (c *client) DeletePod(namespaceID, podID string) error { diff --git a/probe/kubernetes/logreadcloser.go b/probe/kubernetes/logreadcloser.go index 865ce73f4..d2e780bd4 100644 --- a/probe/kubernetes/logreadcloser.go +++ b/probe/kubernetes/logreadcloser.go @@ -1,17 +1,16 @@ package kubernetes import ( + "bufio" "bytes" + "fmt" "io" - - log "github.com/Sirupsen/logrus" -) - -const ( - internalBufferSize = 1024 + "math" ) type logReadCloser struct { + labels []string + labelLength int readClosers []io.ReadCloser eof []bool buffer bytes.Buffer @@ -20,15 +19,27 @@ type logReadCloser struct { eofChannel chan int } -// NewLogReadCloser takes multiple io.ReadCloser and reads where data is available. -func NewLogReadCloser(readClosers ...io.ReadCloser) io.ReadCloser { - stopChannels := make([]chan struct{}, len(readClosers)) - for i := range readClosers { +// NewLogReadCloser reads from multiple io.ReadCloser, where data is available, +// and annotates each line with the reader's label +func NewLogReadCloser(readClosersWithLabel map[io.ReadCloser]string) io.ReadCloser { + stopChannels := make([]chan struct{}, len(readClosersWithLabel)) + labels := make([]string, len(readClosersWithLabel)) + readClosers := make([]io.ReadCloser, len(readClosersWithLabel)) + + i := 0 + labelLength := 0 + for readCloser, label := range readClosersWithLabel { stopChannels[i] = make(chan struct{}) + readClosers[i] = readCloser + labels[i] = label + labelLength = int(math.Max(float64(labelLength), float64(len(label)))) + i++ } l := logReadCloser{ readClosers: readClosers, + labels: labels, + labelLength: labelLength, dataChannel: make(chan []byte), stopChannels: stopChannels, eofChannel: make(chan int), @@ -112,32 +123,39 @@ func (l *logReadCloser) readInternalBuffer(p []byte) (int, error) { if err == io.EOF && !l.isEOF() { return n, nil } - return n, err } func (l *logReadCloser) readInput(idx int) { - tmpBuffer := make([]byte, internalBufferSize) + reader := bufio.NewReader(l.readClosers[idx]) for { - n, err := l.readClosers[idx].Read(tmpBuffer) + line, err := reader.ReadBytes('\n') if err == io.EOF { - if n > 0 { - l.dataChannel <- tmpBuffer[:n] + if len(line) > 0 { + l.dataChannel <- l.annotateLine(idx, line) } l.eofChannel <- idx break } if err != nil { - log.Errorf("Failed to read: %v", err) + // error, exit break } - l.dataChannel <- tmpBuffer[:n] + l.dataChannel <- l.annotateLine(idx, line) } // signal the routine won't write to dataChannel l.stopChannels[idx] <- struct{}{} } +func (l *logReadCloser) annotateLine(idx int, line []byte) []byte { + // do not annotate if it's the only reader + if len(l.labels) == 1 { + return line + } + return []byte(fmt.Sprintf("[%-*s] %v", l.labelLength, l.labels[idx], string(line))) +} + func (l *logReadCloser) isEOF() bool { for _, e := range l.eof { if !e { diff --git a/probe/kubernetes/logreadcloser_test.go b/probe/kubernetes/logreadcloser_test.go index 55801376b..66806cfe8 100644 --- a/probe/kubernetes/logreadcloser_test.go +++ b/probe/kubernetes/logreadcloser_test.go @@ -2,23 +2,34 @@ package kubernetes_test import ( "bytes" + "fmt" "io" "io/ioutil" + "strings" "testing" "github.com/weaveworks/scope/probe/kubernetes" ) func TestLogReadCloser(t *testing.T) { - s0 := []byte("abcdefghijklmnopqrstuvwxyz") - s1 := []byte("ABCDEFGHIJKLMNOPQRSTUVWXYZ") - s2 := []byte("0123456789012345") + data0 := []byte("abcdefghijklmno\npqrstuvwxyz\n") + data1 := []byte("ABCDEFGHI\nJKLMNOPQRSTUVWXYZ\n") + data2 := []byte("012345678901\n2345\n\n678\n") - r0 := ioutil.NopCloser(bytes.NewReader(s0)) - r1 := ioutil.NopCloser(bytes.NewReader(s1)) - r2 := ioutil.NopCloser(bytes.NewReader(s2)) + label0 := "zero" + label1 := "one" + label2 := "two" + longestlabelLength := len(label0) - l := kubernetes.NewLogReadCloser(r0, r1, r2) + readClosersWithLabel := map[io.ReadCloser]string{} + r0 := ioutil.NopCloser(bytes.NewReader(data0)) + readClosersWithLabel[r0] = label0 + r1 := ioutil.NopCloser(bytes.NewReader(data1)) + readClosersWithLabel[r1] = label1 + r2 := ioutil.NopCloser(bytes.NewReader(data2)) + readClosersWithLabel[r2] = label2 + + l := kubernetes.NewLogReadCloser(readClosersWithLabel) buf := make([]byte, 3000) count := 0 @@ -33,29 +44,23 @@ func TestLogReadCloser(t *testing.T) { count += n } - total := len(s0) + len(s1) + len(s2) - if count != total { - t.Errorf("Must read %v characters, but got %v", total, count) - } + // convert to string for easier comparison + result := map[string]int{} + lineCounter(result, longestlabelLength, label0, data0) + lineCounter(result, longestlabelLength, label1, data1) + lineCounter(result, longestlabelLength, label2, data2) - // check every byte - byteCounter := map[byte]int{} - byteCount(byteCounter, s0) - byteCount(byteCounter, s1) - byteCount(byteCounter, s2) - - for i := 0; i < count; i++ { - b := buf[i] - v, ok := byteCounter[b] + str := string(buf[:count]) + for _, line := range strings.SplitAfter(str, "\n") { + v, ok := result[line] if ok { - v-- - byteCounter[b] = v + result[line] = v - 1 } } - for b, c := range byteCounter { - if c != 0 { - t.Errorf("%v should be 0 instead of %v", b, c) + for line, v := range result { + if v != 0 { + t.Errorf("Line %v has not be read from reader", line) } } @@ -65,13 +70,18 @@ func TestLogReadCloser(t *testing.T) { } } -func byteCount(accumulator map[byte]int, s []byte) { - for _, b := range s { - v, ok := accumulator[b] +func lineCounter(counter map[string]int, pad int, label string, data []byte) { + for _, str := range strings.SplitAfter(string(data), "\n") { + if len(str) == 0 { + // SplitAfter ends with an empty string if the last character is '\n' + continue + } + line := fmt.Sprintf("[%-*s] %v", pad, label, str) + v, ok := counter[line] if !ok { v = 0 } v++ - accumulator[b] = v + counter[line] = v } } From 00639d9476117d7c5012032f3f207f86e3746535 Mon Sep 17 00:00:00 2001 From: Roberto Bruggemann Date: Mon, 8 Jan 2018 18:11:44 +0000 Subject: [PATCH 3/3] logReaderCloser: remove `stopChannels` Replace them with sync.WaitGroup. --- probe/kubernetes/logreadcloser.go | 55 ++++++++++++++----------------- 1 file changed, 24 insertions(+), 31 deletions(-) diff --git a/probe/kubernetes/logreadcloser.go b/probe/kubernetes/logreadcloser.go index d2e780bd4..740bd1b61 100644 --- a/probe/kubernetes/logreadcloser.go +++ b/probe/kubernetes/logreadcloser.go @@ -6,30 +6,30 @@ import ( "fmt" "io" "math" + "sync" ) type logReadCloser struct { - labels []string - labelLength int - readClosers []io.ReadCloser - eof []bool - buffer bytes.Buffer - dataChannel chan []byte - stopChannels []chan struct{} - eofChannel chan int + labels []string + labelLength int + readClosers []io.ReadCloser + eof []bool + buffer bytes.Buffer + dataChannel chan []byte + eofChannel chan int + wg sync.WaitGroup } // NewLogReadCloser reads from multiple io.ReadCloser, where data is available, // and annotates each line with the reader's label func NewLogReadCloser(readClosersWithLabel map[io.ReadCloser]string) io.ReadCloser { - stopChannels := make([]chan struct{}, len(readClosersWithLabel)) - labels := make([]string, len(readClosersWithLabel)) - readClosers := make([]io.ReadCloser, len(readClosersWithLabel)) + n := len(readClosersWithLabel) + labels := make([]string, n) + readClosers := make([]io.ReadCloser, n) i := 0 labelLength := 0 for readCloser, label := range readClosersWithLabel { - stopChannels[i] = make(chan struct{}) readClosers[i] = readCloser labels[i] = label labelLength = int(math.Max(float64(labelLength), float64(len(label)))) @@ -37,15 +37,16 @@ func NewLogReadCloser(readClosersWithLabel map[io.ReadCloser]string) io.ReadClos } l := logReadCloser{ - readClosers: readClosers, - labels: labels, - labelLength: labelLength, - dataChannel: make(chan []byte), - stopChannels: stopChannels, - eofChannel: make(chan int), - eof: make([]bool, len(readClosers)), + readClosers: readClosers, + labels: labels, + labelLength: labelLength, + dataChannel: make(chan []byte), + eofChannel: make(chan int), + eof: make([]bool, len(readClosers)), } + l.wg.Add(n) + for idx := range l.readClosers { go l.readInput(idx) } @@ -98,21 +99,15 @@ func (l *logReadCloser) Read(p []byte) (int, error) { } func (l *logReadCloser) Close() error { - for i, rc := range l.readClosers { + for _, rc := range l.readClosers { err := rc.Close() if err != nil { return err } - - // synchronous stop: - // the routines write to dataChannel which will be closed by this thread - select { - case <-l.stopChannels[i]: - break - } - close(l.stopChannels[i]) } + l.wg.Wait() + close(l.dataChannel) close(l.eofChannel) return nil @@ -127,6 +122,7 @@ func (l *logReadCloser) readInternalBuffer(p []byte) (int, error) { } func (l *logReadCloser) readInput(idx int) { + defer l.wg.Done() reader := bufio.NewReader(l.readClosers[idx]) for { line, err := reader.ReadBytes('\n') @@ -143,9 +139,6 @@ func (l *logReadCloser) readInput(idx int) { } l.dataChannel <- l.annotateLine(idx, line) } - - // signal the routine won't write to dataChannel - l.stopChannels[idx] <- struct{}{} } func (l *logReadCloser) annotateLine(idx int, line []byte) []byte {