Reading pod logs returns all container logs

This is achieved by issuing an http request for each container to kubernetes' API, which yields one Reader for the corresponding container.
`logReadCloser' then reads from the above readers in parallel as data is available, buffering when necessary, forwarding it to clients by implementing the io.ReadCloser interface.
This commit is contained in:
Roberto Bruggemann
2018-01-02 17:48:01 +00:00
parent 90cbd8defe
commit 0b86c65e66
6 changed files with 262 additions and 16 deletions

View File

@@ -39,7 +39,7 @@ type Client interface {
WatchPods(f func(Event, Pod))
GetLogs(namespaceID, podID string) (io.ReadCloser, error)
GetLogs(namespaceID, podID string, containerNames []string) (io.ReadCloser, error)
DeletePod(namespaceID, podID string) error
ScaleUp(resource, namespaceID, id string) error
ScaleDown(resource, namespaceID, id string) error
@@ -326,15 +326,27 @@ func (c *client) WalkNodes(f func(*apiv1.Node) error) error {
return nil
}
func (c *client) GetLogs(namespaceID, podID string) (io.ReadCloser, error) {
req := c.client.CoreV1().Pods(namespaceID).GetLogs(
podID,
&apiv1.PodLogOptions{
Follow: true,
Timestamps: true,
},
)
return req.Stream()
func (c *client) GetLogs(namespaceID, podID string, containerNames []string) (io.ReadCloser, error) {
readClosers := make([]io.ReadCloser, len(containerNames))
for i, container := range containerNames {
req := c.client.CoreV1().Pods(namespaceID).GetLogs(
podID,
&apiv1.PodLogOptions{
Follow: true,
Timestamps: true,
Container: container,
},
)
readCloser, err := req.Stream()
if err != nil {
for _, rc := range readClosers {
rc.Close()
}
return nil, err
}
readClosers[i] = readCloser
}
return NewLogReadCloser(readClosers...), nil
}
func (c *client) DeletePod(namespaceID, podID string) error {

View File

@@ -18,8 +18,8 @@ const (
)
// GetLogs is the control to get the logs for a kubernetes pod
func (r *Reporter) GetLogs(req xfer.Request, namespaceID, podID string) xfer.Response {
readCloser, err := r.client.GetLogs(namespaceID, podID)
func (r *Reporter) GetLogs(req xfer.Request, namespaceID, podID string, containerNames []string) xfer.Response {
readCloser, err := r.client.GetLogs(namespaceID, podID, containerNames)
if err != nil {
return xfer.ResponseError(err)
}
@@ -43,7 +43,7 @@ func (r *Reporter) GetLogs(req xfer.Request, namespaceID, podID string) xfer.Res
}
}
func (r *Reporter) deletePod(req xfer.Request, namespaceID, podID string) xfer.Response {
func (r *Reporter) deletePod(req xfer.Request, namespaceID, podID string, _ []string) xfer.Response {
if err := r.client.DeletePod(namespaceID, podID); err != nil {
return xfer.ResponseError(err)
}
@@ -53,7 +53,7 @@ func (r *Reporter) deletePod(req xfer.Request, namespaceID, podID string) xfer.R
}
// CapturePod is exported for testing
func (r *Reporter) CapturePod(f func(xfer.Request, string, string) xfer.Response) func(xfer.Request) xfer.Response {
func (r *Reporter) CapturePod(f func(xfer.Request, string, string, []string) xfer.Response) func(xfer.Request) xfer.Response {
return func(req xfer.Request) xfer.Response {
uid, ok := report.ParsePodNodeID(req.NodeID)
if !ok {
@@ -70,7 +70,7 @@ func (r *Reporter) CapturePod(f func(xfer.Request, string, string) xfer.Response
if pod == nil {
return xfer.ResponseErrorf("Pod not found: %s", uid)
}
return f(req, pod.Namespace(), pod.Name())
return f(req, pod.Namespace(), pod.Name(), pod.ContainerNames())
}
}

View File

@@ -0,0 +1,148 @@
package kubernetes
import (
"bytes"
"io"
log "github.com/Sirupsen/logrus"
)
const (
internalBufferSize = 1024
)
type logReadCloser struct {
readClosers []io.ReadCloser
eof []bool
buffer bytes.Buffer
dataChannel chan []byte
stopChannels []chan struct{}
eofChannel chan int
}
// NewLogReadCloser takes multiple io.ReadCloser and reads where data is available.
func NewLogReadCloser(readClosers ...io.ReadCloser) io.ReadCloser {
stopChannels := make([]chan struct{}, len(readClosers))
for i := range readClosers {
stopChannels[i] = make(chan struct{})
}
l := logReadCloser{
readClosers: readClosers,
dataChannel: make(chan []byte),
stopChannels: stopChannels,
eofChannel: make(chan int),
eof: make([]bool, len(readClosers)),
}
for idx := range l.readClosers {
go l.readInput(idx)
}
return &l
}
func (l *logReadCloser) Read(p []byte) (int, error) {
if len(p) <= l.buffer.Len() {
return l.readInternalBuffer(p)
}
// if there's data available to read, read it,
// otherwise block
byteCount := 0
if l.buffer.Len() > 0 {
n, err := l.readInternalBuffer(p)
if err != nil {
return n, err
}
byteCount += n
} else {
// block on read or EOF
received := false
for !received && !l.isEOF() {
select {
case data := <-l.dataChannel:
l.buffer.Write(data)
received = true
case idx := <-l.eofChannel:
l.eof[idx] = true
}
}
}
// check if there's more data to read, without blocking
empty := false
for !empty && l.buffer.Len() < len(p) {
select {
case data := <-l.dataChannel:
l.buffer.Write(data)
case idx := <-l.eofChannel:
l.eof[idx] = true
default:
empty = true
}
}
return l.readInternalBuffer(p[byteCount:])
}
func (l *logReadCloser) Close() error {
for i, rc := range l.readClosers {
err := rc.Close()
if err != nil {
return err
}
// synchronous stop:
// the routines write to dataChannel which will be closed by this thread
select {
case <-l.stopChannels[i]:
break
}
close(l.stopChannels[i])
}
close(l.dataChannel)
close(l.eofChannel)
return nil
}
func (l *logReadCloser) readInternalBuffer(p []byte) (int, error) {
n, err := l.buffer.Read(p)
if err == io.EOF && !l.isEOF() {
return n, nil
}
return n, err
}
func (l *logReadCloser) readInput(idx int) {
tmpBuffer := make([]byte, internalBufferSize)
for {
n, err := l.readClosers[idx].Read(tmpBuffer)
if err == io.EOF {
if n > 0 {
l.dataChannel <- tmpBuffer[:n]
}
l.eofChannel <- idx
break
}
if err != nil {
log.Errorf("Failed to read: %v", err)
break
}
l.dataChannel <- tmpBuffer[:n]
}
// signal the routine won't write to dataChannel
l.stopChannels[idx] <- struct{}{}
}
func (l *logReadCloser) isEOF() bool {
for _, e := range l.eof {
if !e {
return false
}
}
return true
}

View File

@@ -0,0 +1,77 @@
package kubernetes_test
import (
"bytes"
"io"
"io/ioutil"
"testing"
"github.com/weaveworks/scope/probe/kubernetes"
)
func TestLogReadCloser(t *testing.T) {
s0 := []byte("abcdefghijklmnopqrstuvwxyz")
s1 := []byte("ABCDEFGHIJKLMNOPQRSTUVWXYZ")
s2 := []byte("0123456789012345")
r0 := ioutil.NopCloser(bytes.NewReader(s0))
r1 := ioutil.NopCloser(bytes.NewReader(s1))
r2 := ioutil.NopCloser(bytes.NewReader(s2))
l := kubernetes.NewLogReadCloser(r0, r1, r2)
buf := make([]byte, 3000)
count := 0
for {
n, err := l.Read(buf[count:])
if err == io.EOF {
break
}
if err != nil {
t.Error(err)
}
count += n
}
total := len(s0) + len(s1) + len(s2)
if count != total {
t.Errorf("Must read %v characters, but got %v", total, count)
}
// check every byte
byteCounter := map[byte]int{}
byteCount(byteCounter, s0)
byteCount(byteCounter, s1)
byteCount(byteCounter, s2)
for i := 0; i < count; i++ {
b := buf[i]
v, ok := byteCounter[b]
if ok {
v--
byteCounter[b] = v
}
}
for b, c := range byteCounter {
if c != 0 {
t.Errorf("%v should be 0 instead of %v", b, c)
}
}
err := l.Close()
if err != nil {
t.Errorf("Close must not return an error: %v", err)
}
}
func byteCount(accumulator map[byte]int, s []byte) {
for _, b := range s {
v, ok := accumulator[b]
if !ok {
v = 0
}
v++
accumulator[b] = v
}
}

View File

@@ -24,6 +24,7 @@ type Pod interface {
NodeName() string
GetNode(probeID string) report.Node
RestartCount() uint
ContainerNames() []string
}
type pod struct {
@@ -86,3 +87,11 @@ func (p *pod) GetNode(probeID string) report.Node {
WithParents(p.parents).
WithLatestActiveControls(GetLogs, DeletePod)
}
func (p *pod) ContainerNames() []string {
containerNames := make([]string, 0, len(p.Pod.Spec.Containers))
for _, c := range p.Pod.Spec.Containers {
containerNames = append(containerNames, c.Name)
}
return containerNames
}

View File

@@ -155,7 +155,7 @@ func (*mockClient) WalkNodes(f func(*apiv1.Node) error) error {
return nil
}
func (*mockClient) WatchPods(func(kubernetes.Event, kubernetes.Pod)) {}
func (c *mockClient) GetLogs(namespaceID, podName string) (io.ReadCloser, error) {
func (c *mockClient) GetLogs(namespaceID, podName string, _ []string) (io.ReadCloser, error) {
r, ok := c.logs[namespaceID+";"+podName]
if !ok {
return nil, fmt.Errorf("Not found")