From b8efdfafa4163d816271ff47f1808ab71e9352bd Mon Sep 17 00:00:00 2001 From: Henrik Huitti Date: Sat, 6 Dec 2025 10:18:58 +0200 Subject: [PATCH] fix(queue): force agent cancellation on lease expiration (#5823) --- server/queue/fifo.go | 2 ++ server/queue/fifo_test.go | 35 +++++++++++++++++++++++++++++++++++ server/queue/queue.go | 3 +++ 3 files changed, 40 insertions(+) diff --git a/server/queue/fifo.go b/server/queue/fifo.go index 5b951b288..c4bd11abf 100644 --- a/server/queue/fifo.go +++ b/server/queue/fifo.go @@ -342,6 +342,8 @@ func (q *fifo) assignToWorker() (*list.Element, *worker) { func (q *fifo) resubmitExpiredPipelines() { for taskID, taskState := range q.running { if time.Now().After(taskState.deadline) { + log.Info().Msgf("queue: resubmitting expired task %s", taskID) + taskState.error = ErrTaskExpired q.pending.PushFront(taskState.item) delete(q.running, taskID) close(taskState.done) diff --git a/server/queue/fifo_test.go b/server/queue/fifo_test.go index 4bfceb038..6270a0337 100644 --- a/server/queue/fifo_test.go +++ b/server/queue/fifo_test.go @@ -91,6 +91,41 @@ func TestFifoExpire(t *testing.T) { assert.Len(t, info.Pending, 1, "expect task re-added to pending queue") } +func TestFifoWaitOnExpireReturnsError(t *testing.T) { + ctx, cancel := context.WithCancelCause(t.Context()) + t.Cleanup(func() { cancel(nil) }) + + q, _ := NewMemoryQueue(ctx).(*fifo) + assert.NotNil(t, q) + + q.extension = 0 + + dummyTask := genDummyTask() + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{dummyTask})) + + waitForProcess() + got, err := q.Poll(ctx, 1, filterFnTrue) + assert.NoError(t, err) + assert.Equal(t, dummyTask, got) + + errCh := make(chan error, 1) + go func() { + errCh <- q.Wait(ctx, got.ID) + }() + + waitForProcess() + + select { + case werr := <-errCh: + assert.Error(t, werr, "expected Wait to return error when lease expires") + case <-time.After(2 * time.Second): + t.Fatal("timeout waiting for Wait to return after lease expiration") + } + + info := q.Info(ctx) + assert.Len(t, info.Pending, 1, "expect task re-added to pending queue after expiration") +} + func TestFifoWait(t *testing.T) { ctx, cancel := context.WithCancelCause(t.Context()) t.Cleanup(func() { cancel(nil) }) diff --git a/server/queue/queue.go b/server/queue/queue.go index 18dcbbe1e..e40588711 100644 --- a/server/queue/queue.go +++ b/server/queue/queue.go @@ -33,6 +33,9 @@ var ( // ErrAgentMissMatch indicates a task is assigned to a different agent. ErrAgentMissMatch = errors.New("task assigned to different agent") + + // ErrTaskExpired indicates a running task exceeded its lease/deadline and was resubmitted. + ErrTaskExpired = errors.New("queue: task expired") ) // InfoT provides runtime information.