logReaderCloser: remove stopChannels

Replace them with sync.WaitGroup.
This commit is contained in:
Roberto Bruggemann
2018-01-08 18:11:44 +00:00
parent b4e3f85e89
commit 00639d9476

View File

@@ -6,30 +6,30 @@ import (
"fmt"
"io"
"math"
"sync"
)
type logReadCloser struct {
labels []string
labelLength int
readClosers []io.ReadCloser
eof []bool
buffer bytes.Buffer
dataChannel chan []byte
stopChannels []chan struct{}
eofChannel chan int
labels []string
labelLength int
readClosers []io.ReadCloser
eof []bool
buffer bytes.Buffer
dataChannel chan []byte
eofChannel chan int
wg sync.WaitGroup
}
// NewLogReadCloser reads from multiple io.ReadCloser, where data is available,
// and annotates each line with the reader's label
func NewLogReadCloser(readClosersWithLabel map[io.ReadCloser]string) io.ReadCloser {
stopChannels := make([]chan struct{}, len(readClosersWithLabel))
labels := make([]string, len(readClosersWithLabel))
readClosers := make([]io.ReadCloser, len(readClosersWithLabel))
n := len(readClosersWithLabel)
labels := make([]string, n)
readClosers := make([]io.ReadCloser, n)
i := 0
labelLength := 0
for readCloser, label := range readClosersWithLabel {
stopChannels[i] = make(chan struct{})
readClosers[i] = readCloser
labels[i] = label
labelLength = int(math.Max(float64(labelLength), float64(len(label))))
@@ -37,15 +37,16 @@ func NewLogReadCloser(readClosersWithLabel map[io.ReadCloser]string) io.ReadClos
}
l := logReadCloser{
readClosers: readClosers,
labels: labels,
labelLength: labelLength,
dataChannel: make(chan []byte),
stopChannels: stopChannels,
eofChannel: make(chan int),
eof: make([]bool, len(readClosers)),
readClosers: readClosers,
labels: labels,
labelLength: labelLength,
dataChannel: make(chan []byte),
eofChannel: make(chan int),
eof: make([]bool, len(readClosers)),
}
l.wg.Add(n)
for idx := range l.readClosers {
go l.readInput(idx)
}
@@ -98,21 +99,15 @@ func (l *logReadCloser) Read(p []byte) (int, error) {
}
func (l *logReadCloser) Close() error {
for i, rc := range l.readClosers {
for _, rc := range l.readClosers {
err := rc.Close()
if err != nil {
return err
}
// synchronous stop:
// the routines write to dataChannel which will be closed by this thread
select {
case <-l.stopChannels[i]:
break
}
close(l.stopChannels[i])
}
l.wg.Wait()
close(l.dataChannel)
close(l.eofChannel)
return nil
@@ -127,6 +122,7 @@ func (l *logReadCloser) readInternalBuffer(p []byte) (int, error) {
}
func (l *logReadCloser) readInput(idx int) {
defer l.wg.Done()
reader := bufio.NewReader(l.readClosers[idx])
for {
line, err := reader.ReadBytes('\n')
@@ -143,9 +139,6 @@ func (l *logReadCloser) readInput(idx int) {
}
l.dataChannel <- l.annotateLine(idx, line)
}
// signal the routine won't write to dataChannel
l.stopChannels[idx] <- struct{}{}
}
func (l *logReadCloser) annotateLine(idx int, line []byte) []byte {