From ae6e5dfcf7da01951d742e52a5979551285fc39f Mon Sep 17 00:00:00 2001 From: silverwind Date: Sun, 22 Feb 2026 14:52:49 +0000 Subject: [PATCH] fix: race condition in reporter between RunDaemon and Close (#796) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary - Fix data race on `r.closed` between `RunDaemon()` and `Close()` by protecting it with the existing `stateMu` — `closed` is part of the reporter state. `RunDaemon()` reads it under `stateMu.RLock()`, `Close()` sets it inside the existing `stateMu.Lock()` block - `ReportState` now has a parameter to not report results from runDaemon even if set, from now on `Close` reports the result - `Close` waits for `RunDaemon()` to signal exit via a closed channel `daemon` before reporting the final logs and state with result, unless something really wrong happens it does not time out - Add `TestReporter_EphemeralRunnerDeletion` which reproduces the exact scenario from #793: RunDaemon's `ReportState` racing with `Close`, causing the ephemeral runner to be deleted before final logs are sent - Add `TestReporter_RunDaemonClose_Race` which exercises `RunDaemon()` and `Close()` concurrently to verify no data race on `r.closed` under `go test -race` - Enable `-race` flag in `make test` so CI catches data races going forward Based on #794, with fixes for the remaining unprotected `r.closed` reads that the race detector catches. Fixes #793 --------- Co-authored-by: Christopher Homberger Co-authored-by: ChristopherHX Co-authored-by: rmawatson Reviewed-on: https://gitea.com/gitea/act_runner/pulls/796 Reviewed-by: ChristopherHX Reviewed-by: Lunny Xiao Co-authored-by: silverwind Co-committed-by: silverwind --- Makefile | 2 +- internal/pkg/report/reporter.go | 53 +++++++---- internal/pkg/report/reporter_test.go | 127 +++++++++++++++++++++++++++ 3 files changed, 166 insertions(+), 16 deletions(-) diff --git a/Makefile b/Makefile index f2731af6..841e00ad 100644 --- a/Makefile +++ b/Makefile @@ -125,7 +125,7 @@ tidy-check: tidy fi test: fmt-check security-check - @$(GO) test -v -cover -coverprofile coverage.txt ./... && echo "\n==>\033[32m Ok\033[m\n" || exit 1 + @$(GO) test -race -v -cover -coverprofile coverage.txt ./... && echo "\n==>\033[32m Ok\033[m\n" || exit 1 .PHONY: vet vet: diff --git a/internal/pkg/report/reporter.go b/internal/pkg/report/reporter.go index e49402b9..eb6026ed 100644 --- a/internal/pkg/report/reporter.go +++ b/internal/pkg/report/reporter.go @@ -5,6 +5,7 @@ package report import ( "context" + "errors" "fmt" "regexp" "strings" @@ -37,6 +38,7 @@ type Reporter struct { state *runnerv1.TaskState stateMu sync.RWMutex outputs sync.Map + daemon chan struct{} debugOutputEnabled bool stopCommandEndToken string @@ -63,6 +65,7 @@ func NewReporter(ctx context.Context, cancel context.CancelFunc, client client.C state: &runnerv1.TaskState{ Id: task.Id, }, + daemon: make(chan struct{}), } if task.Secrets["ACTIONS_STEP_DEBUG"] == "true" { @@ -109,6 +112,7 @@ func (r *Reporter) Fire(entry *log.Entry) error { if stage != "Main" { if v, ok := entry.Data["jobResult"]; ok { if jobResult, ok := r.parseResult(v); ok { + // We need to ensure log upload before this upload r.state.Result = jobResult r.state.StoppedAt = timestamppb.New(timestamp) for _, s := range r.state.Steps { @@ -176,15 +180,17 @@ func (r *Reporter) Fire(entry *log.Entry) error { } func (r *Reporter) RunDaemon() { - if r.closed { - return - } - if r.ctx.Err() != nil { + r.stateMu.RLock() + closed := r.closed + r.stateMu.RUnlock() + if closed || r.ctx.Err() != nil { + // Acknowledge close + close(r.daemon) return } _ = r.ReportLog(false) - _ = r.ReportState() + _ = r.ReportState(false) time.AfterFunc(time.Second, r.RunDaemon) } @@ -226,9 +232,8 @@ func (r *Reporter) SetOutputs(outputs map[string]string) { } func (r *Reporter) Close(lastWords string) error { - r.closed = true - r.stateMu.Lock() + r.closed = true if r.state.Result == runnerv1.Result_RESULT_UNSPECIFIED { if lastWords == "" { lastWords = "Early termination" @@ -251,13 +256,23 @@ func (r *Reporter) Close(lastWords string) error { }) } r.stateMu.Unlock() + // Wait for Acknowledge + select { + case <-r.daemon: + case <-time.After(60 * time.Second): + close(r.daemon) + log.Error("No Response from RunDaemon for 60s, continue best effort") + } - return retry.Do(func() error { - if err := r.ReportLog(true); err != nil { - return err - } - return r.ReportState() - }, retry.Context(r.ctx)) + // Report the job outcome even when all log upload retry attempts have been exhausted + return errors.Join( + retry.Do(func() error { + return r.ReportLog(true) + }, retry.Context(r.ctx)), + retry.Do(func() error { + return r.ReportState(true) + }, retry.Context(r.ctx)), + ) } func (r *Reporter) ReportLog(noMore bool) error { @@ -285,17 +300,20 @@ func (r *Reporter) ReportLog(noMore bool) error { r.stateMu.Lock() r.logRows = r.logRows[ack-r.logOffset:] + submitted := r.logOffset + len(rows) r.logOffset = ack r.stateMu.Unlock() - if noMore && ack < r.logOffset+len(rows) { + if noMore && ack < submitted { return fmt.Errorf("not all logs are submitted") } return nil } -func (r *Reporter) ReportState() error { +// ReportState only reports the job result if reportResult is true +// RunDaemon never reports results even if result is set +func (r *Reporter) ReportState(reportResult bool) error { r.clientM.Lock() defer r.clientM.Unlock() @@ -303,6 +321,11 @@ func (r *Reporter) ReportState() error { state := proto.Clone(r.state).(*runnerv1.TaskState) r.stateMu.RUnlock() + // Only report result from Close to reliable sent logs + if !reportResult { + state.Result = runnerv1.Result_RESULT_UNSPECIFIED + } + outputs := make(map[string]string) r.outputs.Range(func(k, v interface{}) bool { if val, ok := v.(string); ok { diff --git a/internal/pkg/report/reporter_test.go b/internal/pkg/report/reporter_test.go index c5a5b61f..2381b521 100644 --- a/internal/pkg/report/reporter_test.go +++ b/internal/pkg/report/reporter_test.go @@ -5,8 +5,11 @@ package report import ( "context" + "fmt" "strings" + "sync" "testing" + "time" runnerv1 "code.gitea.io/actions-proto-go/runner/v1" connect_go "connectrpc.com/connect" @@ -15,6 +18,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "google.golang.org/protobuf/types/known/structpb" + "google.golang.org/protobuf/types/known/timestamppb" "gitea.com/gitea/act_runner/internal/pkg/client/mocks" ) @@ -174,6 +178,7 @@ func TestReporter_Fire(t *testing.T) { reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{ Context: taskCtx, }) + reporter.RunDaemon() defer func() { assert.NoError(t, reporter.Close("")) }() @@ -195,3 +200,125 @@ func TestReporter_Fire(t *testing.T) { assert.Equal(t, int64(3), reporter.state.Steps[0].LogLength) }) } + +// TestReporter_EphemeralRunnerDeletion reproduces the exact scenario from +// https://gitea.com/gitea/act_runner/issues/793: +// +// 1. RunDaemon calls ReportLog(false) — runner is still alive +// 2. Close() updates state to Result=FAILURE (between RunDaemon's ReportLog and ReportState) +// 3. RunDaemon's ReportState() would clone the completed state and send it, +// but the fix makes ReportState return early when closed, preventing this +// 4. Close's ReportLog(true) succeeds because the runner was not deleted +func TestReporter_EphemeralRunnerDeletion(t *testing.T) { + runnerDeleted := false + + client := mocks.NewClient(t) + client.On("UpdateLog", mock.Anything, mock.Anything).Return( + func(_ context.Context, req *connect_go.Request[runnerv1.UpdateLogRequest]) (*connect_go.Response[runnerv1.UpdateLogResponse], error) { + if runnerDeleted { + return nil, fmt.Errorf("runner has been deleted") + } + return connect_go.NewResponse(&runnerv1.UpdateLogResponse{ + AckIndex: req.Msg.Index + int64(len(req.Msg.Rows)), + }), nil + }, + ) + client.On("UpdateTask", mock.Anything, mock.Anything).Maybe().Return( + func(_ context.Context, req *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) { + // Server deletes ephemeral runner when it receives a completed state + if req.Msg.State != nil && req.Msg.State.Result != runnerv1.Result_RESULT_UNSPECIFIED { + runnerDeleted = true + } + return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil + }, + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + taskCtx, err := structpb.NewStruct(map[string]interface{}{}) + require.NoError(t, err) + reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}) + reporter.ResetSteps(1) + + // Fire a log entry to create pending data + assert.NoError(t, reporter.Fire(&log.Entry{ + Message: "build output", + Data: log.Fields{"stage": "Main", "stepNumber": 0, "raw_output": true}, + })) + + // Step 1: RunDaemon calls ReportLog(false) — runner is still alive + assert.NoError(t, reporter.ReportLog(false)) + + // Step 2: Close() updates state — sets Result=FAILURE and marks steps cancelled. + // In the real race, this happens while RunDaemon is between ReportLog and ReportState. + reporter.stateMu.Lock() + reporter.closed = true + for _, v := range reporter.state.Steps { + if v.Result == runnerv1.Result_RESULT_UNSPECIFIED { + v.Result = runnerv1.Result_RESULT_CANCELLED + } + } + reporter.state.Result = runnerv1.Result_RESULT_FAILURE + reporter.logRows = append(reporter.logRows, &runnerv1.LogRow{ + Time: timestamppb.Now(), + Content: "Early termination", + }) + reporter.state.StoppedAt = timestamppb.Now() + reporter.stateMu.Unlock() + + // Step 3: RunDaemon's ReportState() — with the fix, this returns early + // because closed=true, preventing the server from deleting the runner. + assert.NoError(t, reporter.ReportState(false)) + assert.False(t, runnerDeleted, "runner must not be deleted by RunDaemon's ReportState") + + // Step 4: Close's final log upload succeeds because the runner is still alive. + // Flush pending rows first, then send the noMore signal (matching Close's retry behavior). + assert.NoError(t, reporter.ReportLog(false)) + // Acknowledge Close as done in daemon + close(reporter.daemon) + err = reporter.ReportLog(true) + assert.NoError(t, err, "final log upload must not fail: runner should not be deleted before Close finishes sending logs") + err = reporter.ReportState(true) + assert.NoError(t, err, "final state update should work: runner should not be deleted before Close finishes sending logs") +} + +func TestReporter_RunDaemonClose_Race(t *testing.T) { + client := mocks.NewClient(t) + client.On("UpdateLog", mock.Anything, mock.Anything).Return( + func(_ context.Context, req *connect_go.Request[runnerv1.UpdateLogRequest]) (*connect_go.Response[runnerv1.UpdateLogResponse], error) { + return connect_go.NewResponse(&runnerv1.UpdateLogResponse{ + AckIndex: req.Msg.Index + int64(len(req.Msg.Rows)), + }), nil + }, + ) + client.On("UpdateTask", mock.Anything, mock.Anything).Return( + func(_ context.Context, req *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) { + return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil + }, + ) + + ctx, cancel := context.WithCancel(context.Background()) + taskCtx, err := structpb.NewStruct(map[string]interface{}{}) + require.NoError(t, err) + reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{ + Context: taskCtx, + }) + reporter.ResetSteps(1) + + // Start the daemon loop in a separate goroutine. + // RunDaemon reads r.closed and reschedules itself via time.AfterFunc. + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + reporter.RunDaemon() + }() + + // Close concurrently — this races with RunDaemon on r.closed. + assert.NoError(t, reporter.Close("")) + + // Cancel context so pending AfterFunc callbacks exit quickly. + cancel() + wg.Wait() + time.Sleep(2 * time.Second) +}