fix(queue): force agent cancellation on lease expiration (#5823)

This commit is contained in:
Henrik Huitti
2025-12-06 10:18:58 +02:00
committed by GitHub
parent b872171e12
commit b8efdfafa4
3 changed files with 40 additions and 0 deletions

View File

@@ -342,6 +342,8 @@ func (q *fifo) assignToWorker() (*list.Element, *worker) {
func (q *fifo) resubmitExpiredPipelines() {
for taskID, taskState := range q.running {
if time.Now().After(taskState.deadline) {
log.Info().Msgf("queue: resubmitting expired task %s", taskID)
taskState.error = ErrTaskExpired
q.pending.PushFront(taskState.item)
delete(q.running, taskID)
close(taskState.done)

View File

@@ -91,6 +91,41 @@ func TestFifoExpire(t *testing.T) {
assert.Len(t, info.Pending, 1, "expect task re-added to pending queue")
}
func TestFifoWaitOnExpireReturnsError(t *testing.T) {
ctx, cancel := context.WithCancelCause(t.Context())
t.Cleanup(func() { cancel(nil) })
q, _ := NewMemoryQueue(ctx).(*fifo)
assert.NotNil(t, q)
q.extension = 0
dummyTask := genDummyTask()
assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{dummyTask}))
waitForProcess()
got, err := q.Poll(ctx, 1, filterFnTrue)
assert.NoError(t, err)
assert.Equal(t, dummyTask, got)
errCh := make(chan error, 1)
go func() {
errCh <- q.Wait(ctx, got.ID)
}()
waitForProcess()
select {
case werr := <-errCh:
assert.Error(t, werr, "expected Wait to return error when lease expires")
case <-time.After(2 * time.Second):
t.Fatal("timeout waiting for Wait to return after lease expiration")
}
info := q.Info(ctx)
assert.Len(t, info.Pending, 1, "expect task re-added to pending queue after expiration")
}
func TestFifoWait(t *testing.T) {
ctx, cancel := context.WithCancelCause(t.Context())
t.Cleanup(func() { cancel(nil) })

View File

@@ -33,6 +33,9 @@ var (
// ErrAgentMissMatch indicates a task is assigned to a different agent.
ErrAgentMissMatch = errors.New("task assigned to different agent")
// ErrTaskExpired indicates a running task exceeded its lease/deadline and was resubmitted.
ErrTaskExpired = errors.New("queue: task expired")
)
// InfoT provides runtime information.