fix: race condition in reporter between RunDaemon and Close (#796)

## Summary

- Fix data race on `r.closed` between `RunDaemon()` and `Close()` by protecting it with the existing `stateMu` — `closed` is part of the reporter state. `RunDaemon()` reads it under `stateMu.RLock()`, `Close()` sets it inside the existing `stateMu.Lock()` block
- `ReportState` now has a parameter to not report results from runDaemon even if set, from now on `Close` reports the result
- `Close` waits for `RunDaemon()` to signal exit via a closed channel `daemon` before reporting the final logs and state with result, unless something really wrong happens it does not time out
- Add `TestReporter_EphemeralRunnerDeletion` which reproduces the exact scenario from #793: RunDaemon's `ReportState` racing with `Close`, causing the ephemeral runner to be deleted before final logs are sent
- Add `TestReporter_RunDaemonClose_Race` which exercises `RunDaemon()` and `Close()` concurrently to verify no data race on `r.closed` under `go test -race`
- Enable `-race` flag in `make test` so CI catches data races going forward

Based on #794, with fixes for the remaining unprotected `r.closed` reads that the race detector catches.

Fixes #793

---------

Co-authored-by: Christopher Homberger <christopher.homberger@web.de>
Co-authored-by: ChristopherHX <christopher.homberger@web.de>
Co-authored-by: rmawatson <rmawatson@hotmail.com>
Reviewed-on: https://gitea.com/gitea/act_runner/pulls/796
Reviewed-by: ChristopherHX <christopherhx@noreply.gitea.com>
Reviewed-by: Lunny Xiao <xiaolunwen@gmail.com>
Co-authored-by: silverwind <me@silverwind.io>
Co-committed-by: silverwind <me@silverwind.io>
This commit is contained in:
silverwind
2026-02-22 14:52:49 +00:00
committed by ChristopherHX
parent f4418eff18
commit ae6e5dfcf7
3 changed files with 166 additions and 16 deletions

View File

@@ -125,7 +125,7 @@ tidy-check: tidy
fi
test: fmt-check security-check
@$(GO) test -v -cover -coverprofile coverage.txt ./... && echo "\n==>\033[32m Ok\033[m\n" || exit 1
@$(GO) test -race -v -cover -coverprofile coverage.txt ./... && echo "\n==>\033[32m Ok\033[m\n" || exit 1
.PHONY: vet
vet:

View File

@@ -5,6 +5,7 @@ package report
import (
"context"
"errors"
"fmt"
"regexp"
"strings"
@@ -37,6 +38,7 @@ type Reporter struct {
state *runnerv1.TaskState
stateMu sync.RWMutex
outputs sync.Map
daemon chan struct{}
debugOutputEnabled bool
stopCommandEndToken string
@@ -63,6 +65,7 @@ func NewReporter(ctx context.Context, cancel context.CancelFunc, client client.C
state: &runnerv1.TaskState{
Id: task.Id,
},
daemon: make(chan struct{}),
}
if task.Secrets["ACTIONS_STEP_DEBUG"] == "true" {
@@ -109,6 +112,7 @@ func (r *Reporter) Fire(entry *log.Entry) error {
if stage != "Main" {
if v, ok := entry.Data["jobResult"]; ok {
if jobResult, ok := r.parseResult(v); ok {
// We need to ensure log upload before this upload
r.state.Result = jobResult
r.state.StoppedAt = timestamppb.New(timestamp)
for _, s := range r.state.Steps {
@@ -176,15 +180,17 @@ func (r *Reporter) Fire(entry *log.Entry) error {
}
func (r *Reporter) RunDaemon() {
if r.closed {
return
}
if r.ctx.Err() != nil {
r.stateMu.RLock()
closed := r.closed
r.stateMu.RUnlock()
if closed || r.ctx.Err() != nil {
// Acknowledge close
close(r.daemon)
return
}
_ = r.ReportLog(false)
_ = r.ReportState()
_ = r.ReportState(false)
time.AfterFunc(time.Second, r.RunDaemon)
}
@@ -226,9 +232,8 @@ func (r *Reporter) SetOutputs(outputs map[string]string) {
}
func (r *Reporter) Close(lastWords string) error {
r.closed = true
r.stateMu.Lock()
r.closed = true
if r.state.Result == runnerv1.Result_RESULT_UNSPECIFIED {
if lastWords == "" {
lastWords = "Early termination"
@@ -251,13 +256,23 @@ func (r *Reporter) Close(lastWords string) error {
})
}
r.stateMu.Unlock()
// Wait for Acknowledge
select {
case <-r.daemon:
case <-time.After(60 * time.Second):
close(r.daemon)
log.Error("No Response from RunDaemon for 60s, continue best effort")
}
return retry.Do(func() error {
if err := r.ReportLog(true); err != nil {
return err
}
return r.ReportState()
}, retry.Context(r.ctx))
// Report the job outcome even when all log upload retry attempts have been exhausted
return errors.Join(
retry.Do(func() error {
return r.ReportLog(true)
}, retry.Context(r.ctx)),
retry.Do(func() error {
return r.ReportState(true)
}, retry.Context(r.ctx)),
)
}
func (r *Reporter) ReportLog(noMore bool) error {
@@ -285,17 +300,20 @@ func (r *Reporter) ReportLog(noMore bool) error {
r.stateMu.Lock()
r.logRows = r.logRows[ack-r.logOffset:]
submitted := r.logOffset + len(rows)
r.logOffset = ack
r.stateMu.Unlock()
if noMore && ack < r.logOffset+len(rows) {
if noMore && ack < submitted {
return fmt.Errorf("not all logs are submitted")
}
return nil
}
func (r *Reporter) ReportState() error {
// ReportState only reports the job result if reportResult is true
// RunDaemon never reports results even if result is set
func (r *Reporter) ReportState(reportResult bool) error {
r.clientM.Lock()
defer r.clientM.Unlock()
@@ -303,6 +321,11 @@ func (r *Reporter) ReportState() error {
state := proto.Clone(r.state).(*runnerv1.TaskState)
r.stateMu.RUnlock()
// Only report result from Close to reliable sent logs
if !reportResult {
state.Result = runnerv1.Result_RESULT_UNSPECIFIED
}
outputs := make(map[string]string)
r.outputs.Range(func(k, v interface{}) bool {
if val, ok := v.(string); ok {

View File

@@ -5,8 +5,11 @@ package report
import (
"context"
"fmt"
"strings"
"sync"
"testing"
"time"
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
connect_go "connectrpc.com/connect"
@@ -15,6 +18,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"
"gitea.com/gitea/act_runner/internal/pkg/client/mocks"
)
@@ -174,6 +178,7 @@ func TestReporter_Fire(t *testing.T) {
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{
Context: taskCtx,
})
reporter.RunDaemon()
defer func() {
assert.NoError(t, reporter.Close(""))
}()
@@ -195,3 +200,125 @@ func TestReporter_Fire(t *testing.T) {
assert.Equal(t, int64(3), reporter.state.Steps[0].LogLength)
})
}
// TestReporter_EphemeralRunnerDeletion reproduces the exact scenario from
// https://gitea.com/gitea/act_runner/issues/793:
//
// 1. RunDaemon calls ReportLog(false) — runner is still alive
// 2. Close() updates state to Result=FAILURE (between RunDaemon's ReportLog and ReportState)
// 3. RunDaemon's ReportState() would clone the completed state and send it,
// but the fix makes ReportState return early when closed, preventing this
// 4. Close's ReportLog(true) succeeds because the runner was not deleted
func TestReporter_EphemeralRunnerDeletion(t *testing.T) {
runnerDeleted := false
client := mocks.NewClient(t)
client.On("UpdateLog", mock.Anything, mock.Anything).Return(
func(_ context.Context, req *connect_go.Request[runnerv1.UpdateLogRequest]) (*connect_go.Response[runnerv1.UpdateLogResponse], error) {
if runnerDeleted {
return nil, fmt.Errorf("runner has been deleted")
}
return connect_go.NewResponse(&runnerv1.UpdateLogResponse{
AckIndex: req.Msg.Index + int64(len(req.Msg.Rows)),
}), nil
},
)
client.On("UpdateTask", mock.Anything, mock.Anything).Maybe().Return(
func(_ context.Context, req *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) {
// Server deletes ephemeral runner when it receives a completed state
if req.Msg.State != nil && req.Msg.State.Result != runnerv1.Result_RESULT_UNSPECIFIED {
runnerDeleted = true
}
return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil
},
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
taskCtx, err := structpb.NewStruct(map[string]interface{}{})
require.NoError(t, err)
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx})
reporter.ResetSteps(1)
// Fire a log entry to create pending data
assert.NoError(t, reporter.Fire(&log.Entry{
Message: "build output",
Data: log.Fields{"stage": "Main", "stepNumber": 0, "raw_output": true},
}))
// Step 1: RunDaemon calls ReportLog(false) — runner is still alive
assert.NoError(t, reporter.ReportLog(false))
// Step 2: Close() updates state — sets Result=FAILURE and marks steps cancelled.
// In the real race, this happens while RunDaemon is between ReportLog and ReportState.
reporter.stateMu.Lock()
reporter.closed = true
for _, v := range reporter.state.Steps {
if v.Result == runnerv1.Result_RESULT_UNSPECIFIED {
v.Result = runnerv1.Result_RESULT_CANCELLED
}
}
reporter.state.Result = runnerv1.Result_RESULT_FAILURE
reporter.logRows = append(reporter.logRows, &runnerv1.LogRow{
Time: timestamppb.Now(),
Content: "Early termination",
})
reporter.state.StoppedAt = timestamppb.Now()
reporter.stateMu.Unlock()
// Step 3: RunDaemon's ReportState() — with the fix, this returns early
// because closed=true, preventing the server from deleting the runner.
assert.NoError(t, reporter.ReportState(false))
assert.False(t, runnerDeleted, "runner must not be deleted by RunDaemon's ReportState")
// Step 4: Close's final log upload succeeds because the runner is still alive.
// Flush pending rows first, then send the noMore signal (matching Close's retry behavior).
assert.NoError(t, reporter.ReportLog(false))
// Acknowledge Close as done in daemon
close(reporter.daemon)
err = reporter.ReportLog(true)
assert.NoError(t, err, "final log upload must not fail: runner should not be deleted before Close finishes sending logs")
err = reporter.ReportState(true)
assert.NoError(t, err, "final state update should work: runner should not be deleted before Close finishes sending logs")
}
func TestReporter_RunDaemonClose_Race(t *testing.T) {
client := mocks.NewClient(t)
client.On("UpdateLog", mock.Anything, mock.Anything).Return(
func(_ context.Context, req *connect_go.Request[runnerv1.UpdateLogRequest]) (*connect_go.Response[runnerv1.UpdateLogResponse], error) {
return connect_go.NewResponse(&runnerv1.UpdateLogResponse{
AckIndex: req.Msg.Index + int64(len(req.Msg.Rows)),
}), nil
},
)
client.On("UpdateTask", mock.Anything, mock.Anything).Return(
func(_ context.Context, req *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) {
return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil
},
)
ctx, cancel := context.WithCancel(context.Background())
taskCtx, err := structpb.NewStruct(map[string]interface{}{})
require.NoError(t, err)
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{
Context: taskCtx,
})
reporter.ResetSteps(1)
// Start the daemon loop in a separate goroutine.
// RunDaemon reads r.closed and reschedules itself via time.AfterFunc.
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
reporter.RunDaemon()
}()
// Close concurrently — this races with RunDaemon on r.closed.
assert.NoError(t, reporter.Close(""))
// Cancel context so pending AfterFunc callbacks exit quickly.
cancel()
wg.Wait()
time.Sleep(2 * time.Second)
}