diff --git a/probe/kubernetes/client.go b/probe/kubernetes/client.go index c629ae0e0..7e3f2f156 100644 --- a/probe/kubernetes/client.go +++ b/probe/kubernetes/client.go @@ -39,7 +39,7 @@ type Client interface { WatchPods(f func(Event, Pod)) - GetLogs(namespaceID, podID string) (io.ReadCloser, error) + GetLogs(namespaceID, podID string, containerNames []string) (io.ReadCloser, error) DeletePod(namespaceID, podID string) error ScaleUp(resource, namespaceID, id string) error ScaleDown(resource, namespaceID, id string) error @@ -326,15 +326,27 @@ func (c *client) WalkNodes(f func(*apiv1.Node) error) error { return nil } -func (c *client) GetLogs(namespaceID, podID string) (io.ReadCloser, error) { - req := c.client.CoreV1().Pods(namespaceID).GetLogs( - podID, - &apiv1.PodLogOptions{ - Follow: true, - Timestamps: true, - }, - ) - return req.Stream() +func (c *client) GetLogs(namespaceID, podID string, containerNames []string) (io.ReadCloser, error) { + readClosers := make([]io.ReadCloser, len(containerNames)) + for i, container := range containerNames { + req := c.client.CoreV1().Pods(namespaceID).GetLogs( + podID, + &apiv1.PodLogOptions{ + Follow: true, + Timestamps: true, + Container: container, + }, + ) + readCloser, err := req.Stream() + if err != nil { + for _, rc := range readClosers { + rc.Close() + } + return nil, err + } + readClosers[i] = readCloser + } + return NewLogReadCloser(readClosers...), nil } func (c *client) DeletePod(namespaceID, podID string) error { diff --git a/probe/kubernetes/controls.go b/probe/kubernetes/controls.go index 62ddb806e..55f31e004 100644 --- a/probe/kubernetes/controls.go +++ b/probe/kubernetes/controls.go @@ -18,8 +18,8 @@ const ( ) // GetLogs is the control to get the logs for a kubernetes pod -func (r *Reporter) GetLogs(req xfer.Request, namespaceID, podID string) xfer.Response { - readCloser, err := r.client.GetLogs(namespaceID, podID) +func (r *Reporter) GetLogs(req xfer.Request, namespaceID, podID string, containerNames []string) xfer.Response { + readCloser, err := r.client.GetLogs(namespaceID, podID, containerNames) if err != nil { return xfer.ResponseError(err) } @@ -43,7 +43,7 @@ func (r *Reporter) GetLogs(req xfer.Request, namespaceID, podID string) xfer.Res } } -func (r *Reporter) deletePod(req xfer.Request, namespaceID, podID string) xfer.Response { +func (r *Reporter) deletePod(req xfer.Request, namespaceID, podID string, _ []string) xfer.Response { if err := r.client.DeletePod(namespaceID, podID); err != nil { return xfer.ResponseError(err) } @@ -53,7 +53,7 @@ func (r *Reporter) deletePod(req xfer.Request, namespaceID, podID string) xfer.R } // CapturePod is exported for testing -func (r *Reporter) CapturePod(f func(xfer.Request, string, string) xfer.Response) func(xfer.Request) xfer.Response { +func (r *Reporter) CapturePod(f func(xfer.Request, string, string, []string) xfer.Response) func(xfer.Request) xfer.Response { return func(req xfer.Request) xfer.Response { uid, ok := report.ParsePodNodeID(req.NodeID) if !ok { @@ -70,7 +70,7 @@ func (r *Reporter) CapturePod(f func(xfer.Request, string, string) xfer.Response if pod == nil { return xfer.ResponseErrorf("Pod not found: %s", uid) } - return f(req, pod.Namespace(), pod.Name()) + return f(req, pod.Namespace(), pod.Name(), pod.ContainerNames()) } } diff --git a/probe/kubernetes/logreadcloser.go b/probe/kubernetes/logreadcloser.go new file mode 100644 index 000000000..865ce73f4 --- /dev/null +++ b/probe/kubernetes/logreadcloser.go @@ -0,0 +1,148 @@ +package kubernetes + +import ( + "bytes" + "io" + + log "github.com/Sirupsen/logrus" +) + +const ( + internalBufferSize = 1024 +) + +type logReadCloser struct { + readClosers []io.ReadCloser + eof []bool + buffer bytes.Buffer + dataChannel chan []byte + stopChannels []chan struct{} + eofChannel chan int +} + +// NewLogReadCloser takes multiple io.ReadCloser and reads where data is available. +func NewLogReadCloser(readClosers ...io.ReadCloser) io.ReadCloser { + stopChannels := make([]chan struct{}, len(readClosers)) + for i := range readClosers { + stopChannels[i] = make(chan struct{}) + } + + l := logReadCloser{ + readClosers: readClosers, + dataChannel: make(chan []byte), + stopChannels: stopChannels, + eofChannel: make(chan int), + eof: make([]bool, len(readClosers)), + } + + for idx := range l.readClosers { + go l.readInput(idx) + } + + return &l +} + +func (l *logReadCloser) Read(p []byte) (int, error) { + if len(p) <= l.buffer.Len() { + return l.readInternalBuffer(p) + } + + // if there's data available to read, read it, + // otherwise block + byteCount := 0 + if l.buffer.Len() > 0 { + n, err := l.readInternalBuffer(p) + if err != nil { + return n, err + } + byteCount += n + } else { + // block on read or EOF + received := false + for !received && !l.isEOF() { + select { + case data := <-l.dataChannel: + l.buffer.Write(data) + received = true + case idx := <-l.eofChannel: + l.eof[idx] = true + } + } + } + + // check if there's more data to read, without blocking + empty := false + for !empty && l.buffer.Len() < len(p) { + select { + case data := <-l.dataChannel: + l.buffer.Write(data) + case idx := <-l.eofChannel: + l.eof[idx] = true + default: + empty = true + } + } + + return l.readInternalBuffer(p[byteCount:]) +} + +func (l *logReadCloser) Close() error { + for i, rc := range l.readClosers { + err := rc.Close() + if err != nil { + return err + } + + // synchronous stop: + // the routines write to dataChannel which will be closed by this thread + select { + case <-l.stopChannels[i]: + break + } + close(l.stopChannels[i]) + } + + close(l.dataChannel) + close(l.eofChannel) + return nil +} + +func (l *logReadCloser) readInternalBuffer(p []byte) (int, error) { + n, err := l.buffer.Read(p) + if err == io.EOF && !l.isEOF() { + return n, nil + } + + return n, err +} + +func (l *logReadCloser) readInput(idx int) { + tmpBuffer := make([]byte, internalBufferSize) + for { + n, err := l.readClosers[idx].Read(tmpBuffer) + if err == io.EOF { + if n > 0 { + l.dataChannel <- tmpBuffer[:n] + } + l.eofChannel <- idx + break + } + if err != nil { + log.Errorf("Failed to read: %v", err) + break + } + l.dataChannel <- tmpBuffer[:n] + } + + // signal the routine won't write to dataChannel + l.stopChannels[idx] <- struct{}{} +} + +func (l *logReadCloser) isEOF() bool { + for _, e := range l.eof { + if !e { + return false + } + } + return true +} diff --git a/probe/kubernetes/logreadcloser_test.go b/probe/kubernetes/logreadcloser_test.go new file mode 100644 index 000000000..55801376b --- /dev/null +++ b/probe/kubernetes/logreadcloser_test.go @@ -0,0 +1,77 @@ +package kubernetes_test + +import ( + "bytes" + "io" + "io/ioutil" + "testing" + + "github.com/weaveworks/scope/probe/kubernetes" +) + +func TestLogReadCloser(t *testing.T) { + s0 := []byte("abcdefghijklmnopqrstuvwxyz") + s1 := []byte("ABCDEFGHIJKLMNOPQRSTUVWXYZ") + s2 := []byte("0123456789012345") + + r0 := ioutil.NopCloser(bytes.NewReader(s0)) + r1 := ioutil.NopCloser(bytes.NewReader(s1)) + r2 := ioutil.NopCloser(bytes.NewReader(s2)) + + l := kubernetes.NewLogReadCloser(r0, r1, r2) + + buf := make([]byte, 3000) + count := 0 + for { + n, err := l.Read(buf[count:]) + if err == io.EOF { + break + } + if err != nil { + t.Error(err) + } + count += n + } + + total := len(s0) + len(s1) + len(s2) + if count != total { + t.Errorf("Must read %v characters, but got %v", total, count) + } + + // check every byte + byteCounter := map[byte]int{} + byteCount(byteCounter, s0) + byteCount(byteCounter, s1) + byteCount(byteCounter, s2) + + for i := 0; i < count; i++ { + b := buf[i] + v, ok := byteCounter[b] + if ok { + v-- + byteCounter[b] = v + } + } + + for b, c := range byteCounter { + if c != 0 { + t.Errorf("%v should be 0 instead of %v", b, c) + } + } + + err := l.Close() + if err != nil { + t.Errorf("Close must not return an error: %v", err) + } +} + +func byteCount(accumulator map[byte]int, s []byte) { + for _, b := range s { + v, ok := accumulator[b] + if !ok { + v = 0 + } + v++ + accumulator[b] = v + } +} diff --git a/probe/kubernetes/pod.go b/probe/kubernetes/pod.go index d96336d90..789e8ecea 100644 --- a/probe/kubernetes/pod.go +++ b/probe/kubernetes/pod.go @@ -24,6 +24,7 @@ type Pod interface { NodeName() string GetNode(probeID string) report.Node RestartCount() uint + ContainerNames() []string } type pod struct { @@ -86,3 +87,11 @@ func (p *pod) GetNode(probeID string) report.Node { WithParents(p.parents). WithLatestActiveControls(GetLogs, DeletePod) } + +func (p *pod) ContainerNames() []string { + containerNames := make([]string, 0, len(p.Pod.Spec.Containers)) + for _, c := range p.Pod.Spec.Containers { + containerNames = append(containerNames, c.Name) + } + return containerNames +} diff --git a/probe/kubernetes/reporter_test.go b/probe/kubernetes/reporter_test.go index f1bd1d6e5..bd37cbcc7 100644 --- a/probe/kubernetes/reporter_test.go +++ b/probe/kubernetes/reporter_test.go @@ -155,7 +155,7 @@ func (*mockClient) WalkNodes(f func(*apiv1.Node) error) error { return nil } func (*mockClient) WatchPods(func(kubernetes.Event, kubernetes.Pod)) {} -func (c *mockClient) GetLogs(namespaceID, podName string) (io.ReadCloser, error) { +func (c *mockClient) GetLogs(namespaceID, podName string, _ []string) (io.ReadCloser, error) { r, ok := c.logs[namespaceID+";"+podName] if !ok { return nil, fmt.Errorf("Not found")