Merge pull request #3013 from weaveworks/multi-container-log

Reading pod logs returns all container logs
This commit is contained in:
Roberto Bruggemann
2018-01-09 11:33:32 +00:00
committed by GitHub
6 changed files with 284 additions and 16 deletions

View File

@@ -39,7 +39,7 @@ type Client interface {
WatchPods(f func(Event, Pod))
GetLogs(namespaceID, podID string) (io.ReadCloser, error)
GetLogs(namespaceID, podID string, containerNames []string) (io.ReadCloser, error)
DeletePod(namespaceID, podID string) error
ScaleUp(resource, namespaceID, id string) error
ScaleDown(resource, namespaceID, id string) error
@@ -328,15 +328,28 @@ func (c *client) WalkNamespaces(f func(NamespaceResource) error) error {
return nil
}
func (c *client) GetLogs(namespaceID, podID string) (io.ReadCloser, error) {
req := c.client.CoreV1().Pods(namespaceID).GetLogs(
podID,
&apiv1.PodLogOptions{
Follow: true,
Timestamps: true,
},
)
return req.Stream()
func (c *client) GetLogs(namespaceID, podID string, containerNames []string) (io.ReadCloser, error) {
readClosersWithLabel := map[io.ReadCloser]string{}
for _, container := range containerNames {
req := c.client.CoreV1().Pods(namespaceID).GetLogs(
podID,
&apiv1.PodLogOptions{
Follow: true,
Timestamps: true,
Container: container,
},
)
readCloser, err := req.Stream()
if err != nil {
for rc := range readClosersWithLabel {
rc.Close()
}
return nil, err
}
readClosersWithLabel[readCloser] = container
}
return NewLogReadCloser(readClosersWithLabel), nil
}
func (c *client) DeletePod(namespaceID, podID string) error {

View File

@@ -18,8 +18,8 @@ const (
)
// GetLogs is the control to get the logs for a kubernetes pod
func (r *Reporter) GetLogs(req xfer.Request, namespaceID, podID string) xfer.Response {
readCloser, err := r.client.GetLogs(namespaceID, podID)
func (r *Reporter) GetLogs(req xfer.Request, namespaceID, podID string, containerNames []string) xfer.Response {
readCloser, err := r.client.GetLogs(namespaceID, podID, containerNames)
if err != nil {
return xfer.ResponseError(err)
}
@@ -43,7 +43,7 @@ func (r *Reporter) GetLogs(req xfer.Request, namespaceID, podID string) xfer.Res
}
}
func (r *Reporter) deletePod(req xfer.Request, namespaceID, podID string) xfer.Response {
func (r *Reporter) deletePod(req xfer.Request, namespaceID, podID string, _ []string) xfer.Response {
if err := r.client.DeletePod(namespaceID, podID); err != nil {
return xfer.ResponseError(err)
}
@@ -53,7 +53,7 @@ func (r *Reporter) deletePod(req xfer.Request, namespaceID, podID string) xfer.R
}
// CapturePod is exported for testing
func (r *Reporter) CapturePod(f func(xfer.Request, string, string) xfer.Response) func(xfer.Request) xfer.Response {
func (r *Reporter) CapturePod(f func(xfer.Request, string, string, []string) xfer.Response) func(xfer.Request) xfer.Response {
return func(req xfer.Request) xfer.Response {
uid, ok := report.ParsePodNodeID(req.NodeID)
if !ok {
@@ -70,7 +70,7 @@ func (r *Reporter) CapturePod(f func(xfer.Request, string, string) xfer.Response
if pod == nil {
return xfer.ResponseErrorf("Pod not found: %s", uid)
}
return f(req, pod.Namespace(), pod.Name())
return f(req, pod.Namespace(), pod.Name(), pod.ContainerNames())
}
}

View File

@@ -0,0 +1,159 @@
package kubernetes
import (
"bufio"
"bytes"
"fmt"
"io"
"math"
"sync"
)
type logReadCloser struct {
labels []string
labelLength int
readClosers []io.ReadCloser
eof []bool
buffer bytes.Buffer
dataChannel chan []byte
eofChannel chan int
wg sync.WaitGroup
}
// NewLogReadCloser reads from multiple io.ReadCloser, where data is available,
// and annotates each line with the reader's label
func NewLogReadCloser(readClosersWithLabel map[io.ReadCloser]string) io.ReadCloser {
n := len(readClosersWithLabel)
labels := make([]string, n)
readClosers := make([]io.ReadCloser, n)
i := 0
labelLength := 0
for readCloser, label := range readClosersWithLabel {
readClosers[i] = readCloser
labels[i] = label
labelLength = int(math.Max(float64(labelLength), float64(len(label))))
i++
}
l := logReadCloser{
readClosers: readClosers,
labels: labels,
labelLength: labelLength,
dataChannel: make(chan []byte),
eofChannel: make(chan int),
eof: make([]bool, len(readClosers)),
}
l.wg.Add(n)
for idx := range l.readClosers {
go l.readInput(idx)
}
return &l
}
func (l *logReadCloser) Read(p []byte) (int, error) {
if len(p) <= l.buffer.Len() {
return l.readInternalBuffer(p)
}
// if there's data available to read, read it,
// otherwise block
byteCount := 0
if l.buffer.Len() > 0 {
n, err := l.readInternalBuffer(p)
if err != nil {
return n, err
}
byteCount += n
} else {
// block on read or EOF
received := false
for !received && !l.isEOF() {
select {
case data := <-l.dataChannel:
l.buffer.Write(data)
received = true
case idx := <-l.eofChannel:
l.eof[idx] = true
}
}
}
// check if there's more data to read, without blocking
empty := false
for !empty && l.buffer.Len() < len(p) {
select {
case data := <-l.dataChannel:
l.buffer.Write(data)
case idx := <-l.eofChannel:
l.eof[idx] = true
default:
empty = true
}
}
return l.readInternalBuffer(p[byteCount:])
}
func (l *logReadCloser) Close() error {
for _, rc := range l.readClosers {
err := rc.Close()
if err != nil {
return err
}
}
l.wg.Wait()
close(l.dataChannel)
close(l.eofChannel)
return nil
}
func (l *logReadCloser) readInternalBuffer(p []byte) (int, error) {
n, err := l.buffer.Read(p)
if err == io.EOF && !l.isEOF() {
return n, nil
}
return n, err
}
func (l *logReadCloser) readInput(idx int) {
defer l.wg.Done()
reader := bufio.NewReader(l.readClosers[idx])
for {
line, err := reader.ReadBytes('\n')
if err == io.EOF {
if len(line) > 0 {
l.dataChannel <- l.annotateLine(idx, line)
}
l.eofChannel <- idx
break
}
if err != nil {
// error, exit
break
}
l.dataChannel <- l.annotateLine(idx, line)
}
}
func (l *logReadCloser) annotateLine(idx int, line []byte) []byte {
// do not annotate if it's the only reader
if len(l.labels) == 1 {
return line
}
return []byte(fmt.Sprintf("[%-*s] %v", l.labelLength, l.labels[idx], string(line)))
}
func (l *logReadCloser) isEOF() bool {
for _, e := range l.eof {
if !e {
return false
}
}
return true
}

View File

@@ -0,0 +1,87 @@
package kubernetes_test
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"strings"
"testing"
"github.com/weaveworks/scope/probe/kubernetes"
)
func TestLogReadCloser(t *testing.T) {
data0 := []byte("abcdefghijklmno\npqrstuvwxyz\n")
data1 := []byte("ABCDEFGHI\nJKLMNOPQRSTUVWXYZ\n")
data2 := []byte("012345678901\n2345\n\n678\n")
label0 := "zero"
label1 := "one"
label2 := "two"
longestlabelLength := len(label0)
readClosersWithLabel := map[io.ReadCloser]string{}
r0 := ioutil.NopCloser(bytes.NewReader(data0))
readClosersWithLabel[r0] = label0
r1 := ioutil.NopCloser(bytes.NewReader(data1))
readClosersWithLabel[r1] = label1
r2 := ioutil.NopCloser(bytes.NewReader(data2))
readClosersWithLabel[r2] = label2
l := kubernetes.NewLogReadCloser(readClosersWithLabel)
buf := make([]byte, 3000)
count := 0
for {
n, err := l.Read(buf[count:])
if err == io.EOF {
break
}
if err != nil {
t.Error(err)
}
count += n
}
// convert to string for easier comparison
result := map[string]int{}
lineCounter(result, longestlabelLength, label0, data0)
lineCounter(result, longestlabelLength, label1, data1)
lineCounter(result, longestlabelLength, label2, data2)
str := string(buf[:count])
for _, line := range strings.SplitAfter(str, "\n") {
v, ok := result[line]
if ok {
result[line] = v - 1
}
}
for line, v := range result {
if v != 0 {
t.Errorf("Line %v has not be read from reader", line)
}
}
err := l.Close()
if err != nil {
t.Errorf("Close must not return an error: %v", err)
}
}
func lineCounter(counter map[string]int, pad int, label string, data []byte) {
for _, str := range strings.SplitAfter(string(data), "\n") {
if len(str) == 0 {
// SplitAfter ends with an empty string if the last character is '\n'
continue
}
line := fmt.Sprintf("[%-*s] %v", pad, label, str)
v, ok := counter[line]
if !ok {
v = 0
}
v++
counter[line] = v
}
}

View File

@@ -24,6 +24,7 @@ type Pod interface {
NodeName() string
GetNode(probeID string) report.Node
RestartCount() uint
ContainerNames() []string
}
type pod struct {
@@ -86,3 +87,11 @@ func (p *pod) GetNode(probeID string) report.Node {
WithParents(p.parents).
WithLatestActiveControls(GetLogs, DeletePod)
}
func (p *pod) ContainerNames() []string {
containerNames := make([]string, 0, len(p.Pod.Spec.Containers))
for _, c := range p.Pod.Spec.Containers {
containerNames = append(containerNames, c.Name)
}
return containerNames
}

View File

@@ -155,7 +155,7 @@ func (c *mockClient) WalkNamespaces(f func(kubernetes.NamespaceResource) error)
return nil
}
func (*mockClient) WatchPods(func(kubernetes.Event, kubernetes.Pod)) {}
func (c *mockClient) GetLogs(namespaceID, podName string) (io.ReadCloser, error) {
func (c *mockClient) GetLogs(namespaceID, podName string, _ []string) (io.ReadCloser, error) {
r, ok := c.logs[namespaceID+";"+podName]
if !ok {
return nil, fmt.Errorf("Not found")