diff --git a/pipeline/log/utils.go b/pipeline/log/utils.go index 6c3e2dfe7..600a6271e 100644 --- a/pipeline/log/utils.go +++ b/pipeline/log/utils.go @@ -47,15 +47,16 @@ func CopyLineByLine(dst io.Writer, src io.Reader, maxSize int) error { for { // TODO: read til newline or maxSize directly line, err := r.ReadBytes('\n') + if len(line) > 0 { + if err := writeChunks(dst, line, maxSize); err != nil { + return err + } + } if errors.Is(err, io.EOF) { - return writeChunks(dst, line, maxSize) + break } else if err != nil { return err } - - err = writeChunks(dst, line, maxSize) - if err != nil { - return err - } } + return nil } diff --git a/pipeline/log/utils_test.go b/pipeline/log/utils_test.go index 088656b93..c5d6effb3 100644 --- a/pipeline/log/utils_test.go +++ b/pipeline/log/utils_test.go @@ -58,41 +58,47 @@ func TestCopyLineByLine(t *testing.T) { writes: make([]string, 0), } + done := make(chan struct{}) + go func() { err := log.CopyLineByLine(testWriter, r, 1024) assert.NoError(t, err) + close(done) }() - // wait for the goroutine to start - time.Sleep(time.Second / 10) - // write 4 bytes without newline if _, err := w.Write([]byte("1234")); err != nil { t.Fatalf("unexpected error: %v", err) } - writes := testWriter.GetWrites() - assert.Lenf(t, writes, 0, "expected 0 writes, got: %v", writes) + // Wait until no writes have occurred (should be immediate) + assert.Eventually(t, func() bool { + return len(testWriter.GetWrites()) == 0 + }, time.Second, 5*time.Millisecond, "expected 0 writes after first write") // write more bytes with newlines if _, err := w.Write([]byte("5\n678\n90")); err != nil { t.Fatalf("unexpected error: %v", err) } - writes = testWriter.GetWrites() - assert.Lenf(t, writes, 2, "expected 2 writes, got: %v", writes) - - // wait for the goroutine to write the data - time.Sleep(10 * time.Millisecond) + // Wait until two writes have occurred + assert.Eventually(t, func() bool { + return len(testWriter.GetWrites()) == 2 + }, time.Second, 5*time.Millisecond, "expected 2 writes after second write") + writes := testWriter.GetWrites() writtenData := strings.Join(writes, "-") assert.Equal(t, "12345\n-678\n", writtenData, "unexpected writtenData: %s", writtenData) // closing the writer should flush the remaining data w.Close() - // wait for the goroutine to finish - time.Sleep(10 * time.Millisecond) + // Wait for the goroutine to finish + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for goroutine to finish") + } // the written data contains all the data we wrote writtenData = strings.Join(testWriter.GetWrites(), "-")