13 Commits

Author SHA1 Message Date
ChristopherHX
49e6324e5c remove weird symbol from comment
All checks were successful
checks / check and test (push) Successful in 2m29s
2026-02-20 17:05:29 +00:00
Christopher Homberger
9ab05e3d6a retry log upload and result independently 2026-02-20 15:26:11 +01:00
Christopher Homberger
6d2ce82645 fix not all logs are submitted error check 2026-02-20 15:22:00 +01:00
Christopher Homberger
f4b8e7605c update modules
All checks were successful
checks / check and test (push) Successful in 1m51s
2026-02-20 13:10:32 +01:00
Christopher Homberger
b0726c4423 Wait for RunDaemon Acknowledge
* Always skip reporting job result from RunDaemon
2026-02-20 13:01:12 +01:00
silverwind
97c1320234 reduce diff 2026-02-19 04:18:37 +01:00
silverwind
c1c36d1c2d undo whitespace change, add comments 2026-02-19 04:14:40 +01:00
silverwind
3e139b7f09 fix: prevent RunDaemon from sending completed state before Close sends final logs
Split ReportState into a public method that skips when closed (used by
RunDaemon) and a private reportState that always sends (used by Close).
This prevents the server from deleting ephemeral runners before final
logs are uploaded. Add test reproducing the exact interleaving from #793.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 04:05:54 +01:00
silverwind
5e139031c6 fmt 2026-02-19 03:25:58 +01:00
silverwind
64ab5fdd51 fix: use stateMu instead of closedM to protect r.closed
Remove the dedicated closedM mutex and use stateMu instead, since
closed is part of the reporter state. RunDaemon reads r.closed under
stateMu.RLock, Close sets it under the existing stateMu.Lock block.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 03:20:13 +01:00
silverwind
39cef65b52 fix: hold closedM for both ReportLog and ReportState in RunDaemon
After Close() sets r.closed, the daemon should make zero network calls.
Move ReportLog(false) under the same closedM guard as ReportState() so
the daemon skips both when closed. This also simplifies RunDaemon by
using a single lock/unlock pair.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 03:13:55 +01:00
silverwind
a4a6e291d5 fix: protect all r.closed accesses, add race test, enable -race in CI
The initial `if r.closed` guard in RunDaemon() was still reading
r.closed without holding closedM, causing a data race with Close().
Wrap it with closedM to match the other access sites.

Add TestReporter_RunDaemonClose_Race which exercises RunDaemon() and
Close() concurrently — the exact scenario from #793. Without the fix,
`go test -race` reliably detects the data race.

Enable the -race flag in `make test` so CI catches data races.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 02:54:45 +01:00
rmawatson
5591f0a546 fixes #793 2026-02-19 01:00:53 +00:00
5 changed files with 183 additions and 35 deletions

View File

@@ -112,7 +112,7 @@ security-check: deps-tools
GOEXPERIMENT= $(GO) run $(GOVULNCHECK_PACKAGE) -show color ./...
test: fmt-check security-check
@$(GO) test -v -cover -coverprofile coverage.txt ./... && echo "\n==>\033[32m Ok\033[m\n" || exit 1
@$(GO) test -race -v -cover -coverprofile coverage.txt ./... && echo "\n==>\033[32m Ok\033[m\n" || exit 1
.PHONY: vet
vet:

12
go.mod
View File

@@ -14,7 +14,7 @@ require (
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.8.1
github.com/stretchr/testify v1.11.1
golang.org/x/term v0.36.0
golang.org/x/term v0.37.0
golang.org/x/time v0.12.0
google.golang.org/protobuf v1.35.2
gopkg.in/yaml.v3 v3.0.1
@@ -47,7 +47,7 @@ require (
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376 // indirect
github.com/go-git/go-billy/v5 v5.6.2 // indirect
github.com/go-git/go-git/v5 v5.12.0 // indirect
github.com/go-git/go-git/v5 v5.16.5 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gobwas/glob v0.2.3 // indirect
@@ -93,10 +93,10 @@ require (
go.opentelemetry.io/otel v1.28.0 // indirect
go.opentelemetry.io/otel/metric v1.28.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect
golang.org/x/crypto v0.43.0 // indirect
golang.org/x/net v0.45.0 // indirect
golang.org/x/crypto v0.45.0 // indirect
golang.org/x/net v0.47.0 // indirect
golang.org/x/sync v0.16.0 // indirect
golang.org/x/sys v0.37.0 // indirect
golang.org/x/sys v0.38.0 // indirect
golang.org/x/tools v0.23.0 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
@@ -104,7 +104,5 @@ require (
replace github.com/nektos/act => gitea.com/gitea/act v0.261.8
replace github.com/go-git/go-git/v5 => github.com/go-git/go-git/v5 v5.16.2
// Remove after
replace github.com/distribution/reference v0.6.0 => github.com/distribution/reference v0.5.0

24
go.sum
View File

@@ -75,8 +75,8 @@ github.com/go-git/go-billy/v5 v5.6.2 h1:6Q86EsPXMa7c3YZ3aLAQsMA0VlWmy43r6FHqa/UN
github.com/go-git/go-billy/v5 v5.6.2/go.mod h1:rcFC2rAsp/erv7CMz9GczHcuD0D32fWzH+MJAU+jaUU=
github.com/go-git/go-git-fixtures/v4 v4.3.2-0.20231010084843-55a94097c399 h1:eMje31YglSBqCdIqdhKBW8lokaMrL3uTkpGYlE2OOT4=
github.com/go-git/go-git-fixtures/v4 v4.3.2-0.20231010084843-55a94097c399/go.mod h1:1OCfN199q1Jm3HZlxleg+Dw/mwps2Wbk9frAWm+4FII=
github.com/go-git/go-git/v5 v5.16.2 h1:fT6ZIOjE5iEnkzKyxTHK1W4HGAsPhqEqiSAssSO77hM=
github.com/go-git/go-git/v5 v5.16.2/go.mod h1:4Ge4alE/5gPs30F2H1esi2gPd69R0C39lolkucHBOp8=
github.com/go-git/go-git/v5 v5.16.5 h1:mdkuqblwr57kVfXri5TTH+nMFLNUxIj9Z7F5ykFbw5s=
github.com/go-git/go-git/v5 v5.16.5/go.mod h1:QOMLpNf1qxuSY4StA/ArOdfFR2TrKEjJiye2kel2m+M=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
@@ -233,8 +233,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.43.0 h1:dduJYIi3A3KOfdGOHX8AVZ/jGiyPa3IbBozJ5kNuE04=
golang.org/x/crypto v0.43.0/go.mod h1:BFbav4mRNlXJL4wNeejLpWxB7wMbc79PdRGhWKncxR0=
golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q=
golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4=
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8=
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
@@ -246,8 +246,8 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.45.0 h1:RLBg5JKixCy82FtLJpeNlVM0nrSqpCRYzVU1n8kj0tM=
golang.org/x/net v0.45.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY=
golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY=
golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -267,16 +267,16 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ=
golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc=
golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.36.0 h1:zMPR+aF8gfksFprF/Nc/rd1wRS1EI6nDBGyWAvDzx2Q=
golang.org/x/term v0.36.0/go.mod h1:Qu394IJq6V6dCBRgwqshf3mPF85AqzYEzofzRdZkWss=
golang.org/x/term v0.37.0 h1:8EGAD0qCmHYZg6J17DvsMy9/wJ7/D/4pV/wfnld5lTU=
golang.org/x/term v0.37.0/go.mod h1:5pB4lxRNYYVZuTLmy8oR2BH8dflOR+IbTYFD8fi3254=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.30.0 h1:yznKA/E9zq54KzlzBEAWn1NXSQ8DIp/NYMy88xJjl4k=
golang.org/x/text v0.30.0/go.mod h1:yDdHFIX9t+tORqspjENWgzaCVXgk0yYnYuSZ8UzzBVM=
golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM=
golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM=
golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE=
golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

View File

@@ -5,6 +5,7 @@ package report
import (
"context"
"errors"
"fmt"
"regexp"
"strings"
@@ -37,6 +38,7 @@ type Reporter struct {
state *runnerv1.TaskState
stateMu sync.RWMutex
outputs sync.Map
daemon chan struct{}
debugOutputEnabled bool
stopCommandEndToken string
@@ -63,6 +65,7 @@ func NewReporter(ctx context.Context, cancel context.CancelFunc, client client.C
state: &runnerv1.TaskState{
Id: task.Id,
},
daemon: make(chan struct{}),
}
if task.Secrets["ACTIONS_STEP_DEBUG"] == "true" {
@@ -109,6 +112,7 @@ func (r *Reporter) Fire(entry *log.Entry) error {
if stage != "Main" {
if v, ok := entry.Data["jobResult"]; ok {
if jobResult, ok := r.parseResult(v); ok {
// We need to ensure log upload before this upload
r.state.Result = jobResult
r.state.StoppedAt = timestamppb.New(timestamp)
for _, s := range r.state.Steps {
@@ -176,15 +180,17 @@ func (r *Reporter) Fire(entry *log.Entry) error {
}
func (r *Reporter) RunDaemon() {
if r.closed {
return
}
if r.ctx.Err() != nil {
r.stateMu.RLock()
closed := r.closed
r.stateMu.RUnlock()
if closed || r.ctx.Err() != nil {
// Acknowledge close
close(r.daemon)
return
}
_ = r.ReportLog(false)
_ = r.ReportState()
_ = r.ReportState(false)
time.AfterFunc(time.Second, r.RunDaemon)
}
@@ -226,9 +232,8 @@ func (r *Reporter) SetOutputs(outputs map[string]string) {
}
func (r *Reporter) Close(lastWords string) error {
r.closed = true
r.stateMu.Lock()
r.closed = true
if r.state.Result == runnerv1.Result_RESULT_UNSPECIFIED {
if lastWords == "" {
lastWords = "Early termination"
@@ -251,13 +256,23 @@ func (r *Reporter) Close(lastWords string) error {
})
}
r.stateMu.Unlock()
// Wait for Acknowledge
select {
case <-r.daemon:
case <-time.After(60 * time.Second):
close(r.daemon)
log.Error("No Response from RunDaemon for 60s, continue best effort")
}
return retry.Do(func() error {
if err := r.ReportLog(true); err != nil {
return err
}
return r.ReportState()
}, retry.Context(r.ctx))
// Report the job outcome even when all log upload retry attempts have been exhausted
return errors.Join(
retry.Do(func() error {
return r.ReportLog(true)
}, retry.Context(r.ctx)),
retry.Do(func() error {
return r.ReportState(true)
}, retry.Context(r.ctx)),
)
}
func (r *Reporter) ReportLog(noMore bool) error {
@@ -285,17 +300,20 @@ func (r *Reporter) ReportLog(noMore bool) error {
r.stateMu.Lock()
r.logRows = r.logRows[ack-r.logOffset:]
submitted := r.logOffset + len(rows)
r.logOffset = ack
r.stateMu.Unlock()
if noMore && ack < r.logOffset+len(rows) {
if noMore && ack < submitted {
return fmt.Errorf("not all logs are submitted")
}
return nil
}
func (r *Reporter) ReportState() error {
// ReportState only reports the job result if reportResult is true
// RunDaemon never reports results even if result is set
func (r *Reporter) ReportState(reportResult bool) error {
r.clientM.Lock()
defer r.clientM.Unlock()
@@ -303,6 +321,11 @@ func (r *Reporter) ReportState() error {
state := proto.Clone(r.state).(*runnerv1.TaskState)
r.stateMu.RUnlock()
// Only report result from Close to reliable sent logs
if !reportResult {
state.Result = runnerv1.Result_RESULT_UNSPECIFIED
}
outputs := make(map[string]string)
r.outputs.Range(func(k, v interface{}) bool {
if val, ok := v.(string); ok {

View File

@@ -5,8 +5,11 @@ package report
import (
"context"
"fmt"
"strings"
"sync"
"testing"
"time"
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
connect_go "connectrpc.com/connect"
@@ -15,6 +18,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"
"gitea.com/gitea/act_runner/internal/pkg/client/mocks"
)
@@ -174,6 +178,7 @@ func TestReporter_Fire(t *testing.T) {
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{
Context: taskCtx,
})
reporter.RunDaemon()
defer func() {
assert.NoError(t, reporter.Close(""))
}()
@@ -195,3 +200,125 @@ func TestReporter_Fire(t *testing.T) {
assert.Equal(t, int64(3), reporter.state.Steps[0].LogLength)
})
}
// TestReporter_EphemeralRunnerDeletion reproduces the exact scenario from
// https://gitea.com/gitea/act_runner/issues/793:
//
// 1. RunDaemon calls ReportLog(false) — runner is still alive
// 2. Close() updates state to Result=FAILURE (between RunDaemon's ReportLog and ReportState)
// 3. RunDaemon's ReportState() would clone the completed state and send it,
// but the fix makes ReportState return early when closed, preventing this
// 4. Close's ReportLog(true) succeeds because the runner was not deleted
func TestReporter_EphemeralRunnerDeletion(t *testing.T) {
runnerDeleted := false
client := mocks.NewClient(t)
client.On("UpdateLog", mock.Anything, mock.Anything).Return(
func(_ context.Context, req *connect_go.Request[runnerv1.UpdateLogRequest]) (*connect_go.Response[runnerv1.UpdateLogResponse], error) {
if runnerDeleted {
return nil, fmt.Errorf("runner has been deleted")
}
return connect_go.NewResponse(&runnerv1.UpdateLogResponse{
AckIndex: req.Msg.Index + int64(len(req.Msg.Rows)),
}), nil
},
)
client.On("UpdateTask", mock.Anything, mock.Anything).Maybe().Return(
func(_ context.Context, req *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) {
// Server deletes ephemeral runner when it receives a completed state
if req.Msg.State != nil && req.Msg.State.Result != runnerv1.Result_RESULT_UNSPECIFIED {
runnerDeleted = true
}
return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil
},
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
taskCtx, err := structpb.NewStruct(map[string]interface{}{})
require.NoError(t, err)
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx})
reporter.ResetSteps(1)
// Fire a log entry to create pending data
assert.NoError(t, reporter.Fire(&log.Entry{
Message: "build output",
Data: log.Fields{"stage": "Main", "stepNumber": 0, "raw_output": true},
}))
// Step 1: RunDaemon calls ReportLog(false) — runner is still alive
assert.NoError(t, reporter.ReportLog(false))
// Step 2: Close() updates state — sets Result=FAILURE and marks steps cancelled.
// In the real race, this happens while RunDaemon is between ReportLog and ReportState.
reporter.stateMu.Lock()
reporter.closed = true
for _, v := range reporter.state.Steps {
if v.Result == runnerv1.Result_RESULT_UNSPECIFIED {
v.Result = runnerv1.Result_RESULT_CANCELLED
}
}
reporter.state.Result = runnerv1.Result_RESULT_FAILURE
reporter.logRows = append(reporter.logRows, &runnerv1.LogRow{
Time: timestamppb.Now(),
Content: "Early termination",
})
reporter.state.StoppedAt = timestamppb.Now()
reporter.stateMu.Unlock()
// Step 3: RunDaemon's ReportState() — with the fix, this returns early
// because closed=true, preventing the server from deleting the runner.
assert.NoError(t, reporter.ReportState(false))
assert.False(t, runnerDeleted, "runner must not be deleted by RunDaemon's ReportState")
// Step 4: Close's final log upload succeeds because the runner is still alive.
// Flush pending rows first, then send the noMore signal (matching Close's retry behavior).
assert.NoError(t, reporter.ReportLog(false))
// Acknowledge Close as done in daemon
close(reporter.daemon)
err = reporter.ReportLog(true)
assert.NoError(t, err, "final log upload must not fail: runner should not be deleted before Close finishes sending logs")
err = reporter.ReportState(true)
assert.NoError(t, err, "final state update should work: runner should not be deleted before Close finishes sending logs")
}
func TestReporter_RunDaemonClose_Race(t *testing.T) {
client := mocks.NewClient(t)
client.On("UpdateLog", mock.Anything, mock.Anything).Return(
func(_ context.Context, req *connect_go.Request[runnerv1.UpdateLogRequest]) (*connect_go.Response[runnerv1.UpdateLogResponse], error) {
return connect_go.NewResponse(&runnerv1.UpdateLogResponse{
AckIndex: req.Msg.Index + int64(len(req.Msg.Rows)),
}), nil
},
)
client.On("UpdateTask", mock.Anything, mock.Anything).Return(
func(_ context.Context, req *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) {
return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil
},
)
ctx, cancel := context.WithCancel(context.Background())
taskCtx, err := structpb.NewStruct(map[string]interface{}{})
require.NoError(t, err)
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{
Context: taskCtx,
})
reporter.ResetSteps(1)
// Start the daemon loop in a separate goroutine.
// RunDaemon reads r.closed and reschedules itself via time.AfterFunc.
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
reporter.RunDaemon()
}()
// Close concurrently — this races with RunDaemon on r.closed.
assert.NoError(t, reporter.Close(""))
// Cancel context so pending AfterFunc callbacks exit quickly.
cancel()
wg.Wait()
time.Sleep(2 * time.Second)
}