diff --git a/probe/kubernetes/logreadcloser.go b/probe/kubernetes/logreadcloser.go index d2e780bd4..740bd1b61 100644 --- a/probe/kubernetes/logreadcloser.go +++ b/probe/kubernetes/logreadcloser.go @@ -6,30 +6,30 @@ import ( "fmt" "io" "math" + "sync" ) type logReadCloser struct { - labels []string - labelLength int - readClosers []io.ReadCloser - eof []bool - buffer bytes.Buffer - dataChannel chan []byte - stopChannels []chan struct{} - eofChannel chan int + labels []string + labelLength int + readClosers []io.ReadCloser + eof []bool + buffer bytes.Buffer + dataChannel chan []byte + eofChannel chan int + wg sync.WaitGroup } // NewLogReadCloser reads from multiple io.ReadCloser, where data is available, // and annotates each line with the reader's label func NewLogReadCloser(readClosersWithLabel map[io.ReadCloser]string) io.ReadCloser { - stopChannels := make([]chan struct{}, len(readClosersWithLabel)) - labels := make([]string, len(readClosersWithLabel)) - readClosers := make([]io.ReadCloser, len(readClosersWithLabel)) + n := len(readClosersWithLabel) + labels := make([]string, n) + readClosers := make([]io.ReadCloser, n) i := 0 labelLength := 0 for readCloser, label := range readClosersWithLabel { - stopChannels[i] = make(chan struct{}) readClosers[i] = readCloser labels[i] = label labelLength = int(math.Max(float64(labelLength), float64(len(label)))) @@ -37,15 +37,16 @@ func NewLogReadCloser(readClosersWithLabel map[io.ReadCloser]string) io.ReadClos } l := logReadCloser{ - readClosers: readClosers, - labels: labels, - labelLength: labelLength, - dataChannel: make(chan []byte), - stopChannels: stopChannels, - eofChannel: make(chan int), - eof: make([]bool, len(readClosers)), + readClosers: readClosers, + labels: labels, + labelLength: labelLength, + dataChannel: make(chan []byte), + eofChannel: make(chan int), + eof: make([]bool, len(readClosers)), } + l.wg.Add(n) + for idx := range l.readClosers { go l.readInput(idx) } @@ -98,21 +99,15 @@ func (l *logReadCloser) Read(p []byte) (int, error) { } func (l *logReadCloser) Close() error { - for i, rc := range l.readClosers { + for _, rc := range l.readClosers { err := rc.Close() if err != nil { return err } - - // synchronous stop: - // the routines write to dataChannel which will be closed by this thread - select { - case <-l.stopChannels[i]: - break - } - close(l.stopChannels[i]) } + l.wg.Wait() + close(l.dataChannel) close(l.eofChannel) return nil @@ -127,6 +122,7 @@ func (l *logReadCloser) readInternalBuffer(p []byte) (int, error) { } func (l *logReadCloser) readInput(idx int) { + defer l.wg.Done() reader := bufio.NewReader(l.readClosers[idx]) for { line, err := reader.ReadBytes('\n') @@ -143,9 +139,6 @@ func (l *logReadCloser) readInput(idx int) { } l.dataChannel <- l.annotateLine(idx, line) } - - // signal the routine won't write to dataChannel - l.stopChannels[idx] <- struct{}{} } func (l *logReadCloser) annotateLine(idx int, line []byte) []byte {