8 Commits

Author SHA1 Message Date
silverwind
97c1320234 reduce diff 2026-02-19 04:18:37 +01:00
silverwind
c1c36d1c2d undo whitespace change, add comments 2026-02-19 04:14:40 +01:00
silverwind
3e139b7f09 fix: prevent RunDaemon from sending completed state before Close sends final logs
Split ReportState into a public method that skips when closed (used by
RunDaemon) and a private reportState that always sends (used by Close).
This prevents the server from deleting ephemeral runners before final
logs are uploaded. Add test reproducing the exact interleaving from #793.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 04:05:54 +01:00
silverwind
5e139031c6 fmt 2026-02-19 03:25:58 +01:00
silverwind
64ab5fdd51 fix: use stateMu instead of closedM to protect r.closed
Remove the dedicated closedM mutex and use stateMu instead, since
closed is part of the reporter state. RunDaemon reads r.closed under
stateMu.RLock, Close sets it under the existing stateMu.Lock block.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 03:20:13 +01:00
silverwind
39cef65b52 fix: hold closedM for both ReportLog and ReportState in RunDaemon
After Close() sets r.closed, the daemon should make zero network calls.
Move ReportLog(false) under the same closedM guard as ReportState() so
the daemon skips both when closed. This also simplifies RunDaemon by
using a single lock/unlock pair.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 03:13:55 +01:00
silverwind
a4a6e291d5 fix: protect all r.closed accesses, add race test, enable -race in CI
The initial `if r.closed` guard in RunDaemon() was still reading
r.closed without holding closedM, causing a data race with Close().
Wrap it with closedM to match the other access sites.

Add TestReporter_RunDaemonClose_Race which exercises RunDaemon() and
Close() concurrently — the exact scenario from #793. Without the fix,
`go test -race` reliably detects the data race.

Enable the -race flag in `make test` so CI catches data races.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 02:54:45 +01:00
rmawatson
5591f0a546 fixes #793 2026-02-19 01:00:53 +00:00
3 changed files with 142 additions and 5 deletions

View File

@@ -112,7 +112,7 @@ security-check: deps-tools
GOEXPERIMENT= $(GO) run $(GOVULNCHECK_PACKAGE) -show color ./...
test: fmt-check security-check
@$(GO) test -v -cover -coverprofile coverage.txt ./... && echo "\n==>\033[32m Ok\033[m\n" || exit 1
@$(GO) test -race -v -cover -coverprofile coverage.txt ./... && echo "\n==>\033[32m Ok\033[m\n" || exit 1
.PHONY: vet
vet:

View File

@@ -176,7 +176,10 @@ func (r *Reporter) Fire(entry *log.Entry) error {
}
func (r *Reporter) RunDaemon() {
if r.closed {
r.stateMu.RLock()
closed := r.closed
r.stateMu.RUnlock()
if closed {
return
}
if r.ctx.Err() != nil {
@@ -226,9 +229,8 @@ func (r *Reporter) SetOutputs(outputs map[string]string) {
}
func (r *Reporter) Close(lastWords string) error {
r.closed = true
r.stateMu.Lock()
r.closed = true
if r.state.Result == runnerv1.Result_RESULT_UNSPECIFIED {
if lastWords == "" {
lastWords = "Early termination"
@@ -256,7 +258,7 @@ func (r *Reporter) Close(lastWords string) error {
if err := r.ReportLog(true); err != nil {
return err
}
return r.ReportState()
return r.reportState()
}, retry.Context(r.ctx))
}
@@ -295,7 +297,20 @@ func (r *Reporter) ReportLog(noMore bool) error {
return nil
}
// ReportState skips when closed so RunDaemon cannot send a completed state
// before Close sends the final logs.
func (r *Reporter) ReportState() error {
r.stateMu.RLock()
closed := r.closed
r.stateMu.RUnlock()
if closed {
return nil
}
return r.reportState()
}
// reportState sends unconditionally. Used by Close after final logs are uploaded.
func (r *Reporter) reportState() error {
r.clientM.Lock()
defer r.clientM.Unlock()

View File

@@ -5,8 +5,11 @@ package report
import (
"context"
"fmt"
"strings"
"sync"
"testing"
"time"
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
connect_go "connectrpc.com/connect"
@@ -15,6 +18,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"
"gitea.com/gitea/act_runner/internal/pkg/client/mocks"
)
@@ -195,3 +199,121 @@ func TestReporter_Fire(t *testing.T) {
assert.Equal(t, int64(3), reporter.state.Steps[0].LogLength)
})
}
// TestReporter_EphemeralRunnerDeletion reproduces the exact scenario from
// https://gitea.com/gitea/act_runner/issues/793:
//
// 1. RunDaemon calls ReportLog(false) — runner is still alive
// 2. Close() updates state to Result=FAILURE (between RunDaemon's ReportLog and ReportState)
// 3. RunDaemon's ReportState() would clone the completed state and send it,
// but the fix makes ReportState return early when closed, preventing this
// 4. Close's ReportLog(true) succeeds because the runner was not deleted
func TestReporter_EphemeralRunnerDeletion(t *testing.T) {
runnerDeleted := false
client := mocks.NewClient(t)
client.On("UpdateLog", mock.Anything, mock.Anything).Return(
func(_ context.Context, req *connect_go.Request[runnerv1.UpdateLogRequest]) (*connect_go.Response[runnerv1.UpdateLogResponse], error) {
if runnerDeleted {
return nil, fmt.Errorf("runner has been deleted")
}
return connect_go.NewResponse(&runnerv1.UpdateLogResponse{
AckIndex: req.Msg.Index + int64(len(req.Msg.Rows)),
}), nil
},
)
client.On("UpdateTask", mock.Anything, mock.Anything).Maybe().Return(
func(_ context.Context, req *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) {
// Server deletes ephemeral runner when it receives a completed state
if req.Msg.State != nil && req.Msg.State.Result != runnerv1.Result_RESULT_UNSPECIFIED {
runnerDeleted = true
}
return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil
},
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
taskCtx, err := structpb.NewStruct(map[string]interface{}{})
require.NoError(t, err)
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx})
reporter.ResetSteps(1)
// Fire a log entry to create pending data
assert.NoError(t, reporter.Fire(&log.Entry{
Message: "build output",
Data: log.Fields{"stage": "Main", "stepNumber": 0, "raw_output": true},
}))
// Step 1: RunDaemon calls ReportLog(false) — runner is still alive
assert.NoError(t, reporter.ReportLog(false))
// Step 2: Close() updates state — sets Result=FAILURE and marks steps cancelled.
// In the real race, this happens while RunDaemon is between ReportLog and ReportState.
reporter.stateMu.Lock()
reporter.closed = true
for _, v := range reporter.state.Steps {
if v.Result == runnerv1.Result_RESULT_UNSPECIFIED {
v.Result = runnerv1.Result_RESULT_CANCELLED
}
}
reporter.state.Result = runnerv1.Result_RESULT_FAILURE
reporter.logRows = append(reporter.logRows, &runnerv1.LogRow{
Time: timestamppb.Now(),
Content: "Early termination",
})
reporter.state.StoppedAt = timestamppb.Now()
reporter.stateMu.Unlock()
// Step 3: RunDaemon's ReportState() — with the fix, this returns early
// because closed=true, preventing the server from deleting the runner.
assert.NoError(t, reporter.ReportState())
assert.False(t, runnerDeleted, "runner must not be deleted by RunDaemon's ReportState")
// Step 4: Close's final log upload succeeds because the runner is still alive.
// Flush pending rows first, then send the noMore signal (matching Close's retry behavior).
assert.NoError(t, reporter.ReportLog(false))
err = reporter.ReportLog(true)
assert.NoError(t, err, "final log upload must not fail: runner should not be deleted before Close finishes sending logs")
}
func TestReporter_RunDaemonClose_Race(t *testing.T) {
client := mocks.NewClient(t)
client.On("UpdateLog", mock.Anything, mock.Anything).Return(
func(_ context.Context, req *connect_go.Request[runnerv1.UpdateLogRequest]) (*connect_go.Response[runnerv1.UpdateLogResponse], error) {
return connect_go.NewResponse(&runnerv1.UpdateLogResponse{
AckIndex: req.Msg.Index + int64(len(req.Msg.Rows)),
}), nil
},
)
client.On("UpdateTask", mock.Anything, mock.Anything).Return(
func(_ context.Context, req *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) {
return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil
},
)
ctx, cancel := context.WithCancel(context.Background())
taskCtx, err := structpb.NewStruct(map[string]interface{}{})
require.NoError(t, err)
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{
Context: taskCtx,
})
reporter.ResetSteps(1)
// Start the daemon loop in a separate goroutine.
// RunDaemon reads r.closed and reschedules itself via time.AfterFunc.
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
reporter.RunDaemon()
}()
// Close concurrently — this races with RunDaemon on r.closed.
assert.NoError(t, reporter.Close(""))
// Cancel context so pending AfterFunc callbacks exit quickly.
cancel()
wg.Wait()
time.Sleep(2 * time.Second)
}