From 4edfefe5d69a2adb9c5be4a0446d4f05a65f9ac0 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Tue, 27 Jan 2026 14:27:03 +0100 Subject: [PATCH] Simplify and Fix server task queue (#6017) --- cmd/agent/core/agent.go | 2 +- server/pipeline/cancel.go | 25 +- server/queue/LICENSE | 29 - server/queue/fifo.go | 115 ++- server/queue/fifo_test.go | 1482 ++++++++++++++++++------------ server/queue/mocks/mock_Queue.go | 57 -- server/queue/persistent.go | 16 +- server/queue/queue.go | 9 +- 8 files changed, 961 insertions(+), 774 deletions(-) delete mode 100644 server/queue/LICENSE diff --git a/cmd/agent/core/agent.go b/cmd/agent/core/agent.go index 0ac9e6914..bf9adec76 100644 --- a/cmd/agent/core/agent.go +++ b/cmd/agent/core/agent.go @@ -296,7 +296,7 @@ func run(ctx context.Context, c *cli.Command, backends []types.Backend) error { return nil } - log.Debug().Msg("polling new steps") + log.Debug().Msg("polling new workflow") if err := runner.Run(agentCtx, shutdownCtx); err != nil { log.Error().Err(err).Msg("runner error, retrying...") // Check if context is canceled diff --git a/server/pipeline/cancel.go b/server/pipeline/cancel.go index 78612562d..d351d5c90 100644 --- a/server/pipeline/cancel.go +++ b/server/pipeline/cancel.go @@ -39,31 +39,28 @@ func Cancel(ctx context.Context, _forge forge.Forge, store store.Store, repo *mo return &ErrNotFound{Msg: err.Error()} } - // First cancel/evict steps in the queue in one go + // First cancel/evict workflows in the queue in one go var ( - stepsToCancel []string - stepsToEvict []string + workflowsToCancel []string + workflowsToEvict []string ) for _, workflow := range workflows { if workflow.State == model.StatusRunning { - stepsToCancel = append(stepsToCancel, fmt.Sprint(workflow.ID)) + workflowsToCancel = append(workflowsToCancel, fmt.Sprint(workflow.ID)) } if workflow.State == model.StatusPending { - stepsToEvict = append(stepsToEvict, fmt.Sprint(workflow.ID)) + workflowsToEvict = append(workflowsToEvict, fmt.Sprint(workflow.ID)) } } - if len(stepsToEvict) != 0 { - if err := server.Config.Services.Queue.EvictAtOnce(ctx, stepsToEvict); err != nil { - log.Error().Err(err).Msgf("queue: evict_at_once: %v", stepsToEvict) - } - if err := server.Config.Services.Queue.ErrorAtOnce(ctx, stepsToEvict, queue.ErrCancel); err != nil { - log.Error().Err(err).Msgf("queue: evict_at_once: %v", stepsToEvict) + if len(workflowsToEvict) != 0 { + if err := server.Config.Services.Queue.ErrorAtOnce(ctx, workflowsToEvict, queue.ErrCancel); err != nil { + log.Error().Err(err).Msgf("queue: evict_at_once: %v", workflowsToEvict) } } - if len(stepsToCancel) != 0 { - if err := server.Config.Services.Queue.ErrorAtOnce(ctx, stepsToCancel, queue.ErrCancel); err != nil { - log.Error().Err(err).Msgf("queue: evict_at_once: %v", stepsToCancel) + if len(workflowsToCancel) != 0 { + if err := server.Config.Services.Queue.ErrorAtOnce(ctx, workflowsToCancel, queue.ErrCancel); err != nil { + log.Error().Err(err).Msgf("queue: evict_at_once: %v", workflowsToCancel) } } diff --git a/server/queue/LICENSE b/server/queue/LICENSE deleted file mode 100644 index 64e202179..000000000 --- a/server/queue/LICENSE +++ /dev/null @@ -1,29 +0,0 @@ -BSD 3-Clause License - -Copyright (c) 2017, Brad Rydzewski -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are met: - -* Redistributions of source code must retain the above copyright notice, this - list of conditions and the following disclaimer. - -* Redistributions in binary form must reproduce the above copyright notice, - this list of conditions and the following disclaimer in the documentation - and/or other materials provided with the distribution. - -* Neither the name of the copyright holder nor the names of its - contributors may be used to endorse or promote products derived from - this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/server/queue/fifo.go b/server/queue/fifo.go index c4bd11abf..25acb25bf 100644 --- a/server/queue/fifo.go +++ b/server/queue/fifo.go @@ -17,7 +17,7 @@ package queue import ( "container/list" "context" - "fmt" + "errors" "slices" "sync" "time" @@ -58,8 +58,6 @@ type fifo struct { // as the agent pull in 10 milliseconds we should also give them work asap. const processTimeInterval = 100 * time.Millisecond -var ErrWorkerKicked = fmt.Errorf("worker was kicked") - // NewMemoryQueue returns a new fifo queue. func NewMemoryQueue(ctx context.Context) Queue { q := &fifo{ @@ -90,23 +88,23 @@ func (q *fifo) Poll(c context.Context, agentID int64, filter FilterFn) (*model.T q.Lock() ctx, stop := context.WithCancelCause(c) - _worker := &worker{ + w := &worker{ agentID: agentID, channel: make(chan *model.Task, 1), filter: filter, stop: stop, } - q.workers[_worker] = struct{}{} + q.workers[w] = struct{}{} q.Unlock() for { select { case <-ctx.Done(): q.Lock() - delete(q.workers, _worker) + delete(q.workers, w) q.Unlock() return nil, ctx.Err() - case t := <-_worker.channel: + case t := <-w.channel: return t, nil } } @@ -122,47 +120,40 @@ func (q *fifo) Error(_ context.Context, id string, err error) error { return q.finished([]string{id}, model.StatusFailure, err) } -// ErrorAtOnce signals multiple done are complete with an error. +// ErrorAtOnce signals multiple tasks are done and complete with an error. +// If still pending they will just get removed from the queue. func (q *fifo) ErrorAtOnce(_ context.Context, ids []string, err error) error { + if errors.Is(err, ErrCancel) { + return q.finished(ids, model.StatusKilled, err) + } return q.finished(ids, model.StatusFailure, err) } +// locks the queue itself! func (q *fifo) finished(ids []string, exitStatus model.StatusValue, err error) error { q.Lock() + defer q.Unlock() + var errs []error + // we first process the tasks itself for _, id := range ids { - taskEntry, ok := q.running[id] - if ok { + if taskEntry, ok := q.running[id]; ok { taskEntry.error = err close(taskEntry.done) delete(q.running, id) } else { - q.removeFromPending(id) + errs = append(errs, q.removeFromPendingAndWaiting(id)) } + } + + // next we aim for there dependencies + // we do this because in our ids list there could be tasks and its dependencies + // so not to mess things up + for _, id := range ids { q.updateDepStatusInQueue(id, exitStatus) } - q.Unlock() - return nil -} - -// EvictAtOnce removes multiple pending tasks from the queue. -func (q *fifo) EvictAtOnce(_ context.Context, taskIDs []string) error { - q.Lock() - defer q.Unlock() - - for _, id := range taskIDs { - var next *list.Element - for element := q.pending.Front(); element != nil; element = next { - next = element.Next() - task, ok := element.Value.(*model.Task) - if ok && task.ID == id { - q.pending.Remove(element) - return nil - } - } - } - return ErrNotFound + return errors.Join(errs...) } // Wait waits until the item is done executing. @@ -286,19 +277,15 @@ func (q *fifo) process() { func (q *fifo) filterWaiting() { // resubmits all waiting tasks to pending, deps may have cleared - var nextWaiting *list.Element - for e := q.waitingOnDeps.Front(); e != nil; e = nextWaiting { - nextWaiting = e.Next() - task, _ := e.Value.(*model.Task) + for element := q.waitingOnDeps.Front(); element != nil; element = element.Next() { + task, _ := element.Value.(*model.Task) q.pending.PushBack(task) } // rebuild waitingDeps q.waitingOnDeps = list.New() var filtered []*list.Element - var nextPending *list.Element - for element := q.pending.Front(); element != nil; element = nextPending { - nextPending = element.Next() + for element := q.pending.Front(); element != nil; element = element.Next() { task, _ := element.Value.(*model.Task) if q.depsInQueue(task) { log.Debug().Msgf("queue: waiting due to unmet dependencies %v", task.ID) @@ -314,12 +301,10 @@ func (q *fifo) filterWaiting() { } func (q *fifo) assignToWorker() (*list.Element, *worker) { - var next *list.Element var bestWorker *worker var bestScore int - for element := q.pending.Front(); element != nil; element = next { - next = element.Next() + for element := q.pending.Front(); element != nil; element = element.Next() { task, _ := element.Value.(*model.Task) log.Debug().Msgf("queue: trying to assign task: %v with deps %v", task.ID, task.Dependencies) @@ -352,9 +337,7 @@ func (q *fifo) resubmitExpiredPipelines() { } func (q *fifo) depsInQueue(task *model.Task) bool { - var next *list.Element - for element := q.pending.Front(); element != nil; element = next { - next = element.Next() + for element := q.pending.Front(); element != nil; element = element.Next() { possibleDep, ok := element.Value.(*model.Task) log.Debug().Msgf("queue: pending right now: %v", possibleDep.ID) for _, dep := range task.Dependencies { @@ -372,13 +355,12 @@ func (q *fifo) depsInQueue(task *model.Task) bool { return false } +// expects the q to be currently owned e.g. locked by caller! func (q *fifo) updateDepStatusInQueue(taskID string, status model.StatusValue) { - var next *list.Element - for element := q.pending.Front(); element != nil; element = next { - next = element.Next() - pending, ok := element.Value.(*model.Task) + for element := q.pending.Front(); element != nil; element = element.Next() { + pending, _ := element.Value.(*model.Task) for _, dep := range pending.Dependencies { - if ok && taskID == dep { + if taskID == dep { pending.DepStatus[dep] = status } } @@ -392,27 +374,40 @@ func (q *fifo) updateDepStatusInQueue(taskID string, status model.StatusValue) { } } - for element := q.waitingOnDeps.Front(); element != nil; element = next { - next = element.Next() - waiting, ok := element.Value.(*model.Task) + for element := q.waitingOnDeps.Front(); element != nil; element = element.Next() { + waiting, _ := element.Value.(*model.Task) for _, dep := range waiting.Dependencies { - if ok && taskID == dep { + if taskID == dep { waiting.DepStatus[dep] = status } } } } -func (q *fifo) removeFromPending(taskID string) { +// expects the q to be currently owned e.g. locked by caller! +func (q *fifo) removeFromPendingAndWaiting(taskID string) error { log.Debug().Msgf("queue: trying to remove %s", taskID) - var next *list.Element - for element := q.pending.Front(); element != nil; element = next { - next = element.Next() + + // we assume pending first + for element := q.pending.Front(); element != nil; element = element.Next() { task, _ := element.Value.(*model.Task) if task.ID == taskID { log.Debug().Msgf("queue: %s is removed from pending", taskID) - q.pending.Remove(element) - return + _ = q.pending.Remove(element) + return nil } } + + // well looks like it's waiting + for element := q.waitingOnDeps.Front(); element != nil; element = element.Next() { + task, _ := element.Value.(*model.Task) + if task.ID == taskID { + log.Debug().Msgf("queue: %s is removed from waitingOnDeps", taskID) + _ = q.waitingOnDeps.Remove(element) + return nil + } + } + + // well it could not be found + return ErrNotFound } diff --git a/server/queue/fifo_test.go b/server/queue/fifo_test.go index 6270a0337..b8f05f625 100644 --- a/server/queue/fifo_test.go +++ b/server/queue/fifo_test.go @@ -38,634 +38,863 @@ var ( waitForProcess = func() { time.Sleep(processTimeInterval + 50*time.Millisecond) } ) -func TestFifo(t *testing.T) { - ctx, cancel := context.WithCancelCause(t.Context()) - t.Cleanup(func() { cancel(nil) }) - - q := NewMemoryQueue(ctx) - dummyTask := genDummyTask() - - assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{dummyTask})) - waitForProcess() - info := q.Info(ctx) - assert.Len(t, info.Pending, 1, "expect task in pending queue") - - got, err := q.Poll(ctx, 1, filterFnTrue) - assert.NoError(t, err) - assert.Equal(t, dummyTask, got) - - waitForProcess() - info = q.Info(ctx) - assert.Len(t, info.Pending, 0, "expect task removed from pending queue") - assert.Len(t, info.Running, 1, "expect task in running queue") - - assert.NoError(t, q.Done(ctx, got.ID, model.StatusSuccess)) - - waitForProcess() - info = q.Info(ctx) - assert.Len(t, info.Pending, 0, "expect task removed from pending queue") - assert.Len(t, info.Running, 0, "expect task removed from running queue") -} - -func TestFifoExpire(t *testing.T) { +func setupTestQueue(t *testing.T) (context.Context, context.CancelCauseFunc, *fifo) { ctx, cancel := context.WithCancelCause(t.Context()) t.Cleanup(func() { cancel(nil) }) q, _ := NewMemoryQueue(ctx).(*fifo) - assert.NotNil(t, q) - - dummyTask := genDummyTask() - - q.extension = 0 - assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{dummyTask})) - waitForProcess() - info := q.Info(ctx) - assert.Len(t, info.Pending, 1, "expect task in pending queue") - - got, err := q.Poll(ctx, 1, filterFnTrue) - assert.NoError(t, err) - assert.Equal(t, dummyTask, got) - - waitForProcess() - info = q.Info(ctx) - assert.Len(t, info.Pending, 1, "expect task re-added to pending queue") -} - -func TestFifoWaitOnExpireReturnsError(t *testing.T) { - ctx, cancel := context.WithCancelCause(t.Context()) - t.Cleanup(func() { cancel(nil) }) - - q, _ := NewMemoryQueue(ctx).(*fifo) - assert.NotNil(t, q) - - q.extension = 0 - - dummyTask := genDummyTask() - assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{dummyTask})) - - waitForProcess() - got, err := q.Poll(ctx, 1, filterFnTrue) - assert.NoError(t, err) - assert.Equal(t, dummyTask, got) - - errCh := make(chan error, 1) - go func() { - errCh <- q.Wait(ctx, got.ID) - }() - - waitForProcess() - - select { - case werr := <-errCh: - assert.Error(t, werr, "expected Wait to return error when lease expires") - case <-time.After(2 * time.Second): - t.Fatal("timeout waiting for Wait to return after lease expiration") + if q == nil { + t.Fatal("Failed to create queue") } - info := q.Info(ctx) - assert.Len(t, info.Pending, 1, "expect task re-added to pending queue after expiration") + return ctx, cancel, q } -func TestFifoWait(t *testing.T) { - ctx, cancel := context.WithCancelCause(t.Context()) - t.Cleanup(func() { cancel(nil) }) +func TestFifoBasicOperations(t *testing.T) { + ctx, cancel, q := setupTestQueue(t) + defer cancel(nil) - q, _ := NewMemoryQueue(ctx).(*fifo) - assert.NotNil(t, q) + t.Run("push poll done lifecycle", func(t *testing.T) { + dummyTask := genDummyTask() - dummyTask := genDummyTask() + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{dummyTask})) + waitForProcess() - assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{dummyTask})) + info := q.Info(ctx) + assert.Len(t, info.Pending, 1) - waitForProcess() - got, err := q.Poll(ctx, 1, filterFnTrue) - assert.NoError(t, err) - assert.Equal(t, dummyTask, got) + got, err := q.Poll(ctx, 1, filterFnTrue) + assert.NoError(t, err) + assert.Equal(t, dummyTask, got) - var wg sync.WaitGroup - wg.Add(1) - go func() { - assert.NoError(t, q.Wait(ctx, got.ID)) - wg.Done() - }() + waitForProcess() + info = q.Info(ctx) + assert.Len(t, info.Pending, 0) + assert.Len(t, info.Running, 1) - <-time.After(time.Millisecond) - assert.NoError(t, q.Done(ctx, got.ID, model.StatusSuccess)) - wg.Wait() + // Edge case: verify task can't be polled again while running + pollCtx, pollCancel := context.WithTimeout(ctx, 100*time.Millisecond) + _, err = q.Poll(pollCtx, 2, filterFnTrue) + pollCancel() + assert.Error(t, err) // Should timeout/cancel, not return the same task + + assert.NoError(t, q.Done(ctx, got.ID, model.StatusSuccess)) + + waitForProcess() + info = q.Info(ctx) + assert.Len(t, info.Running, 0) + + // Edge case: Done on already completed task should handle gracefully + err = q.Done(ctx, got.ID, model.StatusSuccess) + // Document current behavior - should either error or be idempotent + if err != nil { + assert.Error(t, err) + } + }) + + t.Run("error handling", func(t *testing.T) { + task1 := &model.Task{ID: "task-error-1"} + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task1})) + + waitForProcess() + got, _ := q.Poll(ctx, 1, filterFnTrue) + + assert.NoError(t, q.Error(ctx, got.ID, fmt.Errorf("test error"))) + waitForProcess() + info := q.Info(ctx) + assert.Len(t, info.Running, 0) + + assert.Error(t, q.Error(ctx, "totally-fake-id", fmt.Errorf("test error"))) + + // Edge case: Error on task that's already errored + err := q.Error(ctx, got.ID, fmt.Errorf("double error")) + // Should either error or be idempotent + if err != nil { + assert.Error(t, err) + } + }) + + t.Run("error at once", func(t *testing.T) { + task1 := &model.Task{ID: "batch-1"} + task2 := &model.Task{ID: "batch-2"} + task3 := &model.Task{ID: "batch-3"} + + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task1, task2, task3})) + waitForProcess() + + got1, _ := q.Poll(ctx, 1, filterFnTrue) + got2, _ := q.Poll(ctx, 2, filterFnTrue) + + assert.NoError(t, q.ErrorAtOnce(ctx, []string{got1.ID, got2.ID}, fmt.Errorf("batch error"))) + waitForProcess() + info := q.Info(ctx) + assert.Len(t, info.Running, 0) + assert.Len(t, info.Pending, 1) + + got3, _ := q.Poll(ctx, 1, filterFnTrue) + assert.NoError(t, q.Done(ctx, got3.ID, model.StatusSuccess)) + waitForProcess() + + task4 := &model.Task{ID: "batch-4"} + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task4})) + waitForProcess() + got4, _ := q.Poll(ctx, 1, filterFnTrue) + + err := q.ErrorAtOnce(ctx, []string{got4.ID, "fake-1", "fake-2"}, fmt.Errorf("test")) + assert.Error(t, err) + assert.ErrorIs(t, err, ErrNotFound) + + waitForProcess() + info = q.Info(ctx) + assert.Len(t, info.Running, 0) + + // Edge case: ErrorAtOnce with empty slice + err = q.ErrorAtOnce(ctx, []string{}, fmt.Errorf("no tasks")) + assert.NoError(t, err) + // Should handle gracefully, potentially no-op + + // Edge case: ErrorAtOnce with nil error + task5 := &model.Task{ID: "batch-5"} + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task5})) + waitForProcess() + got5, _ := q.Poll(ctx, 3, filterFnTrue) + err = q.ErrorAtOnce(ctx, []string{got5.ID}, nil) + assert.NoError(t, err) + // Should handle nil error gracefully + waitForProcess() + }) + + t.Run("error at once with waiting deps", func(t *testing.T) { + task5 := &model.Task{ID: "deps-cancel-5"} + task6 := &model.Task{ + ID: "deps-cancel-6", + Dependencies: []string{"deps-cancel-5"}, + DepStatus: make(map[string]model.StatusValue), + } + + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task5, task6})) + waitForProcess() + + info := q.Info(ctx) + assert.Equal(t, 1, info.Stats.WaitingOnDeps) + + assert.NoError(t, q.ErrorAtOnce(ctx, []string{"deps-cancel-5", "deps-cancel-6"}, fmt.Errorf("canceled"))) + + waitForProcess() + info = q.Info(ctx) + assert.Equal(t, 0, info.Stats.WaitingOnDeps) + assert.Len(t, info.Pending, 0) + + // Edge case: verify both tasks are actually gone, not stuck somewhere + assert.Len(t, info.Running, 0) + assert.Len(t, info.WaitingOnDeps, 0) + }) + + t.Run("error at once cancellation", func(t *testing.T) { + task1 := &model.Task{ID: "cancel-prop-1"} + task2 := &model.Task{ + ID: "cancel-prop-2", + Dependencies: []string{"cancel-prop-1"}, + DepStatus: make(map[string]model.StatusValue), + RunOn: []string{"success", "failure"}, + } + + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task1, task2})) + waitForProcess() + got1, _ := q.Poll(ctx, 1, filterFnTrue) + + assert.NoError(t, q.ErrorAtOnce(ctx, []string{got1.ID}, ErrCancel)) + + waitForProcess() + waitForProcess() + + got2, _ := q.Poll(ctx, 2, filterFnTrue) + assert.Equal(t, model.StatusKilled, got2.DepStatus["cancel-prop-1"]) + + // Edge case: verify ErrCancel results in StatusKilled not StatusFailure + assert.NotEqual(t, model.StatusFailure, got2.DepStatus["cancel-prop-1"]) + assert.NoError(t, q.Done(ctx, got2.ID, model.StatusSuccess)) + waitForProcess() + }) + + t.Run("pause resume", func(t *testing.T) { + dummyTask := &model.Task{ID: "pause-1"} + + var wg sync.WaitGroup + wg.Add(1) + go func() { + _, _ = q.Poll(ctx, 99, filterFnTrue) + wg.Done() + }() + + q.Pause() + t0 := time.Now() + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{dummyTask})) + waitForProcess() + + // Edge case: verify queue is actually paused + info := q.Info(ctx) + assert.True(t, info.Paused) + assert.Len(t, info.Pending, 1) + assert.Len(t, info.Running, 0) + + q.Resume() + + wg.Wait() + assert.Greater(t, time.Since(t0), 20*time.Millisecond) + + // Edge case: verify queue is unpaused + info = q.Info(ctx) + assert.False(t, info.Paused) + + // Edge case: multiple pause/resume cycles + task2 := &model.Task{ID: "pause-2"} + q.Pause() + q.Pause() // Double pause + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2})) + waitForProcess() + q.Resume() + q.Resume() // Double resume + waitForProcess() + got, _ := q.Poll(ctx, 99, filterFnTrue) + assert.NoError(t, q.Done(ctx, got.ID, model.StatusSuccess)) + waitForProcess() + }) } func TestFifoDependencies(t *testing.T) { - ctx, cancel := context.WithCancelCause(t.Context()) - t.Cleanup(func() { cancel(nil) }) + ctx, cancel, q := setupTestQueue(t) + defer cancel(nil) - task1 := genDummyTask() - task2 := &model.Task{ - ID: "2", - Dependencies: []string{"1"}, - DepStatus: make(map[string]model.StatusValue), - } + t.Run("basic dependency handling", func(t *testing.T) { + task1 := &model.Task{ID: "dep-basic-1"} + task2 := &model.Task{ + ID: "dep-basic-2", + Dependencies: []string{"dep-basic-1"}, + DepStatus: make(map[string]model.StatusValue), + } + task3 := &model.Task{ + ID: "dep-basic-3", + Dependencies: []string{"dep-basic-1"}, + DepStatus: make(map[string]model.StatusValue), + RunOn: []string{"success", "failure"}, + } - q, _ := NewMemoryQueue(ctx).(*fifo) - assert.NotNil(t, q) + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1})) + waitForProcess() - assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task1})) + info := q.Info(ctx) + assert.Equal(t, 2, info.Stats.WaitingOnDeps) - waitForProcess() - got, err := q.Poll(ctx, 1, filterFnTrue) - assert.NoError(t, err) - assert.Equal(t, task1, got) + got, err := q.Poll(ctx, 1, filterFnTrue) + assert.NoError(t, err) + assert.Equal(t, task1, got) + assert.NoError(t, q.Error(ctx, got.ID, fmt.Errorf("exit code 1"))) - waitForProcess() - assert.NoError(t, q.Done(ctx, got.ID, model.StatusSuccess)) + waitForProcess() + got, err = q.Poll(ctx, 1, filterFnTrue) + assert.NoError(t, err) + assert.Equal(t, task2, got) + assert.False(t, got.ShouldRun()) + assert.Equal(t, model.StatusFailure, got.DepStatus["dep-basic-1"]) - waitForProcess() - got, err = q.Poll(ctx, 1, filterFnTrue) - assert.NoError(t, err) - assert.Equal(t, task2, got) + waitForProcess() + got, err = q.Poll(ctx, 1, filterFnTrue) + assert.NoError(t, err) + assert.Equal(t, task3, got) + assert.True(t, got.ShouldRun()) + assert.Equal(t, model.StatusFailure, got.DepStatus["dep-basic-1"]) + + waitForProcess() + info = q.Info(ctx) + assert.Equal(t, 0, info.Stats.WaitingOnDeps) + + // Edge case: verify DepStatus is correctly set before polling + assert.NotEmpty(t, task2.DepStatus) + assert.NotEmpty(t, task3.DepStatus) + }) + + t.Run("multiple dependencies", func(t *testing.T) { + task1 := &model.Task{ID: "multi-dep-1"} + task2 := &model.Task{ID: "multi-dep-2"} + task3 := &model.Task{ + ID: "multi-dep-3", + Dependencies: []string{"multi-dep-1", "multi-dep-2"}, + DepStatus: make(map[string]model.StatusValue), + } + + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1})) + waitForProcess() + + got1, _ := q.Poll(ctx, 1, filterFnTrue) + got2, _ := q.Poll(ctx, 2, filterFnTrue) + + gotIDs := map[string]bool{got1.ID: true, got2.ID: true} + assert.True(t, gotIDs["multi-dep-1"] && gotIDs["multi-dep-2"]) + + if got1.ID == "multi-dep-1" { + assert.NoError(t, q.Done(ctx, got1.ID, model.StatusSuccess)) + assert.NoError(t, q.Error(ctx, got2.ID, fmt.Errorf("failed"))) + } else { + assert.NoError(t, q.Done(ctx, got2.ID, model.StatusSuccess)) + assert.NoError(t, q.Error(ctx, got1.ID, fmt.Errorf("failed"))) + } + + waitForProcess() + got3, err := q.Poll(ctx, 3, filterFnTrue) + assert.NoError(t, err) + + assert.Contains(t, got3.DepStatus, "multi-dep-1") + assert.Contains(t, got3.DepStatus, "multi-dep-2") + assert.True(t, + (got3.DepStatus["multi-dep-1"] == model.StatusSuccess && got3.DepStatus["multi-dep-2"] == model.StatusFailure) || + (got3.DepStatus["multi-dep-1"] == model.StatusFailure && got3.DepStatus["multi-dep-2"] == model.StatusSuccess)) + assert.False(t, got3.ShouldRun()) + + // Edge case: verify both deps are tracked + assert.Len(t, got3.DepStatus, 2) + assert.NoError(t, q.Done(ctx, got3.ID, model.StatusSkipped)) + waitForProcess() + }) + + t.Run("transitive dependencies", func(t *testing.T) { + task1 := &model.Task{ID: "trans-1"} + task2 := &model.Task{ + ID: "trans-2", + Dependencies: []string{"trans-1"}, + DepStatus: make(map[string]model.StatusValue), + } + task3 := &model.Task{ + ID: "trans-3", + Dependencies: []string{"trans-2"}, + DepStatus: make(map[string]model.StatusValue), + } + + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1})) + waitForProcess() + + got, _ := q.Poll(ctx, 1, filterFnTrue) + assert.NoError(t, q.Error(ctx, got.ID, fmt.Errorf("exit code 1"))) + + waitForProcess() + got, _ = q.Poll(ctx, 2, filterFnTrue) + assert.False(t, got.ShouldRun()) + assert.NoError(t, q.Done(ctx, got.ID, model.StatusSkipped)) + + waitForProcess() + got, _ = q.Poll(ctx, 3, filterFnTrue) + assert.Equal(t, model.StatusSkipped, got.DepStatus["trans-2"]) + assert.False(t, got.ShouldRun()) + + // Edge case: verify transitive failure propagates correctly + // task3 should see trans-2 as skipped, not trans-1's status + assert.NotContains(t, got.DepStatus, "trans-1") + assert.NoError(t, q.Done(ctx, got.ID, model.StatusSkipped)) + waitForProcess() + }) + + t.Run("dependency status propagation", func(t *testing.T) { + task1 := &model.Task{ID: "prop-1"} + task2 := &model.Task{ + ID: "prop-2", + Dependencies: []string{"prop-1"}, + DepStatus: make(map[string]model.StatusValue), + } + task3 := &model.Task{ + ID: "prop-3", + Dependencies: []string{"prop-1"}, + DepStatus: make(map[string]model.StatusValue), + RunOn: []string{"success", "failure"}, + } + + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task1, task2, task3})) + waitForProcess() + + info := q.Info(ctx) + assert.Equal(t, 2, info.Stats.WaitingOnDeps) + + got1, _ := q.Poll(ctx, 1, filterFnTrue) + assert.NoError(t, q.Done(ctx, got1.ID, model.StatusSuccess)) + + waitForProcess() + + got2, _ := q.Poll(ctx, 2, filterFnTrue) + got3, _ := q.Poll(ctx, 3, filterFnTrue) + + assert.Equal(t, model.StatusSuccess, got2.DepStatus["prop-1"]) + assert.Equal(t, model.StatusSuccess, got3.DepStatus["prop-1"]) + + // Edge case: verify both tasks can be polled concurrently + assert.NotEqual(t, got2.ID, got3.ID) + assert.NoError(t, q.Done(ctx, got2.ID, model.StatusSuccess)) + assert.NoError(t, q.Done(ctx, got3.ID, model.StatusSuccess)) + waitForProcess() + + task4 := &model.Task{ID: "prop-4"} + task5 := &model.Task{ + ID: "prop-5", + Dependencies: []string{"prop-4"}, + DepStatus: make(map[string]model.StatusValue), + } + task6 := &model.Task{ + ID: "prop-6", + Dependencies: []string{"prop-4"}, + DepStatus: make(map[string]model.StatusValue), + RunOn: []string{"success", "failure"}, + } + + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task4, task5, task6})) + waitForProcess() + + got4, _ := q.Poll(ctx, 4, filterFnTrue) + assert.NoError(t, q.Error(ctx, got4.ID, fmt.Errorf("failed"))) + + waitForProcess() + + got5, _ := q.Poll(ctx, 5, filterFnTrue) + assert.Equal(t, model.StatusFailure, got5.DepStatus["prop-4"]) + assert.False(t, got5.ShouldRun()) + + got6, _ := q.Poll(ctx, 6, filterFnTrue) + assert.Equal(t, model.StatusFailure, got6.DepStatus["prop-4"]) + assert.True(t, got6.ShouldRun()) + + // Edge case: complete dependent tasks + assert.NoError(t, q.Done(ctx, got5.ID, model.StatusSkipped)) + assert.NoError(t, q.Done(ctx, got6.ID, model.StatusSuccess)) + waitForProcess() + }) + + // Edge case: circular dependency detection (should be handled or cause issue) + t.Run("circular dependencies", func(t *testing.T) { + task1 := &model.Task{ + ID: "circ-1", + Dependencies: []string{"circ-2"}, + DepStatus: make(map[string]model.StatusValue), + } + task2 := &model.Task{ + ID: "circ-2", + Dependencies: []string{"circ-1"}, + DepStatus: make(map[string]model.StatusValue), + } + + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task1, task2})) + waitForProcess() + + info := q.Info(ctx) + // Both should be waiting on deps - this is a deadlock scenario + assert.Equal(t, 2, info.Stats.WaitingOnDeps) + assert.Len(t, info.Pending, 0) + + // Verify they never become available for polling + pollCtx, pollCancel := context.WithTimeout(ctx, 200*time.Millisecond) + _, err := q.Poll(pollCtx, 99, filterFnTrue) + pollCancel() + assert.Error(t, err) // Should timeout + + // Clean up the deadlocked tasks + assert.NoError(t, q.ErrorAtOnce(ctx, []string{"circ-1", "circ-2"}, fmt.Errorf("circular dep"))) + waitForProcess() + }) + + // Edge case: dependency on non-existent task + // NOTE: This reveals a potential issue - the queue doesn't validate dependencies exist. + // If a dependency was never added to the queue, the task will run immediately since + // depsInQueue() only checks currently pending/running tasks, not if deps will arrive. + t.Run("non-existent dependency", func(t *testing.T) { + task1 := &model.Task{ + ID: "orphan-1", + Dependencies: []string{"does-not-exist"}, + DepStatus: make(map[string]model.StatusValue), + } + + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task1})) + waitForProcess() + + info := q.Info(ctx) + // Current implementation: task doesn't wait if dependency not in queue + // This means tasks with typos in dependency names will run immediately! + assert.Equal(t, 0, info.Stats.WaitingOnDeps) + assert.Len(t, info.Pending, 1) + + // Task will be available for polling even though dependency doesn't exist + got, err := q.Poll(ctx, 99, filterFnTrue) + assert.NoError(t, err) + assert.Equal(t, "orphan-1", got.ID) + + // DepStatus will be empty since dependency never completed + assert.Empty(t, got.DepStatus) + + // Clean up + assert.NoError(t, q.Done(ctx, got.ID, model.StatusSuccess)) + waitForProcess() + }) + + // Edge case: dependency added AFTER dependent task (race condition) + t.Run("dependency added after dependent", func(t *testing.T) { + // Push dependent task first + dependent := &model.Task{ + ID: "late-dep-child", + Dependencies: []string{"late-dep-parent"}, + DepStatus: make(map[string]model.StatusValue), + } + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{dependent})) + waitForProcess() + + // At this point, dependent doesn't see parent in queue, so it won't wait + info := q.Info(ctx) + // Dependent should NOT be waiting since parent doesn't exist yet + initialWaiting := info.Stats.WaitingOnDeps + + // Now add the parent task + parent := &model.Task{ID: "late-dep-parent"} + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{parent})) + waitForProcess() + + // After filterWaiting runs, dependent SHOULD now see parent and wait + info = q.Info(ctx) + // The implementation calls filterWaiting() which rechecks dependencies + // So dependent should now be waiting + assert.Greater(t, info.Stats.WaitingOnDeps, initialWaiting, + "dependent should start waiting once parent is added") + + // Complete parent first + gotParent, _ := q.Poll(ctx, 1, filterFnTrue) + assert.Equal(t, "late-dep-parent", gotParent.ID, "parent should be polled first") + assert.NoError(t, q.Done(ctx, gotParent.ID, model.StatusSuccess)) + waitForProcess() + + // Now child should be unblocked with parent's status + gotChild, _ := q.Poll(ctx, 2, filterFnTrue) + assert.Equal(t, "late-dep-child", gotChild.ID) + assert.Equal(t, model.StatusSuccess, gotChild.DepStatus["late-dep-parent"]) + + assert.NoError(t, q.Done(ctx, gotChild.ID, model.StatusSuccess)) + waitForProcess() + }) } -func TestFifoErrors(t *testing.T) { - ctx, cancel := context.WithCancelCause(t.Context()) - t.Cleanup(func() { cancel(nil) }) +func TestFifoLeaseManagement(t *testing.T) { + ctx, cancel, q := setupTestQueue(t) + defer cancel(nil) - task1 := genDummyTask() - task2 := &model.Task{ - ID: "2", - Dependencies: []string{"1"}, - DepStatus: make(map[string]model.StatusValue), - } - task3 := &model.Task{ - ID: "3", - Dependencies: []string{"1"}, - DepStatus: make(map[string]model.StatusValue), - RunOn: []string{"success", "failure"}, - } + t.Run("lease expiration", func(t *testing.T) { + q.extension = 0 + dummyTask := &model.Task{ID: "lease-exp-1"} + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{dummyTask})) - q, _ := NewMemoryQueue(ctx).(*fifo) - assert.NotNil(t, q) - - assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1})) - - waitForProcess() - got, err := q.Poll(ctx, 1, filterFnTrue) - assert.NoError(t, err) - assert.Equal(t, task1, got) - - assert.NoError(t, q.Error(ctx, got.ID, fmt.Errorf("exit code 1, there was an error"))) - - waitForProcess() - got, err = q.Poll(ctx, 1, filterFnTrue) - assert.NoError(t, err) - assert.Equal(t, task2, got) - assert.False(t, got.ShouldRun()) - - waitForProcess() - got, err = q.Poll(ctx, 1, filterFnTrue) - assert.NoError(t, err) - assert.Equal(t, task3, got) - assert.True(t, got.ShouldRun()) -} - -func TestFifoErrors2(t *testing.T) { - ctx, cancel := context.WithCancelCause(t.Context()) - t.Cleanup(func() { cancel(nil) }) - - task1 := genDummyTask() - task2 := &model.Task{ - ID: "2", - } - task3 := &model.Task{ - ID: "3", - Dependencies: []string{"1", "2"}, - DepStatus: make(map[string]model.StatusValue), - } - - q, _ := NewMemoryQueue(ctx).(*fifo) - assert.NotNil(t, q) - - assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1})) - - for i := 0; i < 2; i++ { waitForProcess() got, err := q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, err) - assert.False(t, got != task1 && got != task2, "expect task1 or task2 returned from queue as task3 depends on them") - if got != task1 { - assert.NoError(t, q.Done(ctx, got.ID, model.StatusSuccess)) - } - if got != task2 { - assert.NoError(t, q.Error(ctx, got.ID, fmt.Errorf("exit code 1, there was an error"))) - } - } + errCh := make(chan error, 1) + go func() { errCh <- q.Wait(ctx, got.ID) }() - waitForProcess() - got, err := q.Poll(ctx, 1, filterFnTrue) - assert.NoError(t, err) - assert.Equal(t, task3, got) - assert.False(t, got.ShouldRun()) -} - -func TestFifoErrorsMultiThread(t *testing.T) { - ctx, cancel := context.WithCancelCause(t.Context()) - t.Cleanup(func() { cancel(nil) }) - - task1 := genDummyTask() - task2 := &model.Task{ - ID: "2", - Dependencies: []string{"1"}, - DepStatus: make(map[string]model.StatusValue), - } - task3 := &model.Task{ - ID: "3", - Dependencies: []string{"1", "2"}, - DepStatus: make(map[string]model.StatusValue), - } - - q, _ := NewMemoryQueue(ctx).(*fifo) - assert.NotNil(t, q) - - assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1})) - - obtainedWorkCh := make(chan *model.Task) - defer func() { close(obtainedWorkCh) }() - - for i := 0; i < 10; i++ { - go func(i int) { - for { - fmt.Printf("Worker %d started\n", i) - got, err := q.Poll(ctx, 1, filterFnTrue) - if err != nil && errors.Is(err, context.Canceled) { - return - } - assert.NoError(t, err) - obtainedWorkCh <- got - } - }(i) - } - - task1Processed := false - task2Processed := false - - for { + waitForProcess() select { - case got := <-obtainedWorkCh: - fmt.Println(got.ID) - - switch { - case !task1Processed: - assert.Equal(t, task1, got) - task1Processed = true - assert.NoError(t, q.Error(ctx, got.ID, fmt.Errorf("exit code 1, there was an error"))) - go func() { - for { - fmt.Printf("Worker spawned\n") - got, err := q.Poll(ctx, 1, filterFnTrue) - if err != nil && errors.Is(err, context.Canceled) { - return - } - assert.NoError(t, err) - obtainedWorkCh <- got - } - }() - case !task2Processed: - assert.Equal(t, task2, got) - task2Processed = true - assert.NoError(t, q.Done(ctx, got.ID, model.StatusSuccess)) - go func() { - for { - fmt.Printf("Worker spawned\n") - got, err := q.Poll(ctx, 1, filterFnTrue) - if err != nil && errors.Is(err, context.Canceled) { - return - } - assert.NoError(t, err) - obtainedWorkCh <- got - } - }() - default: - assert.Equal(t, task3, got) - assert.False(t, got.ShouldRun(), "expect task3 should not run, task1 succeeded but task2 failed") - return - } - - case <-time.After(5 * time.Second): - info := q.Info(ctx) - fmt.Println(info.String()) - t.Errorf("test timed out") - return + case werr := <-errCh: + assert.Error(t, werr) + // Edge case: verify error is ErrTaskExpired + assert.ErrorIs(t, werr, ErrTaskExpired) + case <-time.After(2 * time.Second): + t.Fatal("timeout waiting for Wait to return") } - } + + info := q.Info(ctx) + assert.Len(t, info.Pending, 1) + + // Edge case: verify task was resubmitted to front of queue + got2, _ := q.Poll(ctx, 1, filterFnTrue) + assert.Equal(t, got.ID, got2.ID) // Same task resubmitted + + assert.NoError(t, q.Done(ctx, got2.ID, model.StatusSuccess)) + waitForProcess() + + // Verify cleanup + info = q.Info(ctx) + assert.Len(t, info.Pending, 0) + assert.Len(t, info.Running, 0) + }) + + t.Run("extend lease", func(t *testing.T) { + q.extension = 50 * time.Millisecond + dummyTask := &model.Task{ID: "extend-1"} + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{dummyTask})) + + waitForProcess() + got, _ := q.Poll(ctx, 5, filterFnTrue) + + assert.NoError(t, q.Extend(ctx, 5, got.ID)) + assert.ErrorIs(t, q.Extend(ctx, 999, got.ID), ErrAgentMissMatch) + assert.ErrorIs(t, q.Extend(ctx, 1, got.ID), ErrAgentMissMatch) + assert.ErrorIs(t, q.Extend(ctx, 1, "non-existent"), ErrNotFound) + + // Edge case: extend multiple times rapidly + for i := 0; i < 3; i++ { + time.Sleep(30 * time.Millisecond) + assert.NoError(t, q.Extend(ctx, 5, got.ID)) + } + + info := q.Info(ctx) + assert.Len(t, info.Running, 1) + assert.Len(t, info.Pending, 0) + + // Edge case: extend after Done should error + assert.NoError(t, q.Done(ctx, got.ID, model.StatusSuccess)) + waitForProcess() + assert.ErrorIs(t, q.Extend(ctx, 5, got.ID), ErrNotFound) + + // Verify cleanup + info = q.Info(ctx) + assert.Len(t, info.Pending, 0) + assert.Len(t, info.Running, 0) + }) + + t.Run("wait operations", func(t *testing.T) { + // Verify queue is clean before starting + info := q.Info(ctx) + assert.Len(t, info.Pending, 0, "queue should be empty at start of wait operations") + assert.Len(t, info.Running, 0, "queue should be empty at start of wait operations") + + dummyTask := &model.Task{ID: "wait-1"} + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{dummyTask})) + + waitForProcess() + got, _ := q.Poll(ctx, 1, filterFnTrue) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + assert.NoError(t, q.Wait(ctx, got.ID)) + wg.Done() + }() + + time.Sleep(time.Millisecond) + assert.NoError(t, q.Done(ctx, got.ID, model.StatusSuccess)) + wg.Wait() + + // Edge case: Wait on non-existent task should return immediately + assert.NoError(t, q.Wait(ctx, "non-existent")) + + dummyTask2 := &model.Task{ID: "wait-2"} + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{dummyTask2})) + waitForProcess() + got2, _ := q.Poll(ctx, 1, filterFnTrue) + + waitCtx, waitCancel := context.WithCancelCause(ctx) + errCh := make(chan error, 1) + go func() { errCh <- q.Wait(waitCtx, got2.ID) }() + + time.Sleep(50 * time.Millisecond) + waitCancel(nil) + + select { + case err := <-errCh: + assert.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("Wait should return when context is canceled") + } + + // Clean up - complete the second wait task + assert.NoError(t, q.Done(ctx, got2.ID, model.StatusSuccess)) + waitForProcess() + + // Edge case: multiple concurrent waits on same task + dummyTask3 := &model.Task{ID: "wait-3"} + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{dummyTask3})) + waitForProcess() + got3, _ := q.Poll(ctx, 1, filterFnTrue) + + var wg2 sync.WaitGroup + wg2.Add(3) + for i := 0; i < 3; i++ { + go func() { + assert.NoError(t, q.Wait(ctx, got3.ID)) + wg2.Done() + }() + } + + time.Sleep(10 * time.Millisecond) + assert.NoError(t, q.Done(ctx, got3.ID, model.StatusSuccess)) + wg2.Wait() + + // Verify cleanup + info = q.Info(ctx) + assert.Len(t, info.Pending, 0) + assert.Len(t, info.Running, 0) + }) } -func TestFifoTransitiveErrors(t *testing.T) { +func TestFifoWorkerManagement(t *testing.T) { + ctx, cancel, q := setupTestQueue(t) + defer cancel(nil) + + t.Run("poll with context cancellation", func(t *testing.T) { + pollCtx, pollCancel := context.WithCancelCause(ctx) + errCh := make(chan error, 1) + go func() { + _, err := q.Poll(pollCtx, 1, filterFnTrue) + errCh <- err + }() + + time.Sleep(50 * time.Millisecond) + pollCancel(nil) + + select { + case err := <-errCh: + assert.ErrorIs(t, err, context.Canceled) + case <-time.After(time.Second): + t.Fatal("Poll should return when context is canceled") + } + + // Edge case: verify worker is cleaned up + info := q.Info(ctx) + assert.Equal(t, 0, info.Stats.Workers) + }) + + t.Run("kick agent workers", func(t *testing.T) { + pollResults := make(chan error, 5) + for i := 0; i < 5; i++ { + go func() { + _, err := q.Poll(ctx, 42, filterFnTrue) + pollResults <- err + }() + } + + time.Sleep(50 * time.Millisecond) + + // Edge case: verify workers are registered before kicking + info := q.Info(ctx) + assert.Equal(t, 5, info.Stats.Workers) + + q.KickAgentWorkers(42) + + kickedCount := 0 + for i := 0; i < 5; i++ { + select { + case err := <-pollResults: + if errors.Is(err, context.Canceled) { + kickedCount++ + } + case <-time.After(time.Second): + t.Fatal("expected all workers to be kicked") + } + } + assert.Equal(t, 5, kickedCount) + + // Edge case: verify workers are removed after kicking + waitForProcess() + info = q.Info(ctx) + assert.Equal(t, 0, info.Stats.Workers) + + // Edge case: kick non-existent agent should be no-op + q.KickAgentWorkers(999) + }) + + // Edge case: mixed agent workers + t.Run("kick specific agent among multiple", func(t *testing.T) { + pollResults := make(chan struct { + agentID int64 + err error + }, 10) + + // Start workers for agent 1 + for i := 0; i < 3; i++ { + go func() { + _, err := q.Poll(ctx, 1, filterFnTrue) + pollResults <- struct { + agentID int64 + err error + }{1, err} + }() + } + + // Start workers for agent 2 + for i := 0; i < 3; i++ { + go func() { + _, err := q.Poll(ctx, 2, filterFnTrue) + pollResults <- struct { + agentID int64 + err error + }{2, err} + }() + } + + time.Sleep(50 * time.Millisecond) + info := q.Info(ctx) + assert.Equal(t, 6, info.Stats.Workers) + + // Kick only agent 1 + q.KickAgentWorkers(1) + + kickedAgent1 := 0 + kickedAgent2 := 0 + for i := 0; i < 3; i++ { + select { + case result := <-pollResults: + if errors.Is(result.err, context.Canceled) { + if result.agentID == 1 { + kickedAgent1++ + } else { + kickedAgent2++ + } + } + case <-time.After(time.Second): + t.Fatal("expected kicked workers to return") + } + } + + assert.Equal(t, 3, kickedAgent1) + assert.Equal(t, 0, kickedAgent2) + + // Clean up agent 2 workers + q.KickAgentWorkers(2) + for i := 0; i < 3; i++ { + <-pollResults + } + }) +} + +func TestFifoLabelBasedScoring(t *testing.T) { ctx, cancel := context.WithCancelCause(t.Context()) - t.Cleanup(func() { cancel(nil) }) - - task1 := genDummyTask() - task2 := &model.Task{ - ID: "2", - Dependencies: []string{"1"}, - DepStatus: make(map[string]model.StatusValue), - } - task3 := &model.Task{ - ID: "3", - Dependencies: []string{"2"}, - DepStatus: make(map[string]model.StatusValue), - } - - q, _ := NewMemoryQueue(ctx).(*fifo) - assert.NotNil(t, q) - - assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1})) - - waitForProcess() - got, err := q.Poll(ctx, 1, filterFnTrue) - assert.NoError(t, err) - assert.Equal(t, task1, got) - assert.NoError(t, q.Error(ctx, got.ID, fmt.Errorf("exit code 1, there was an error"))) - - waitForProcess() - got, err = q.Poll(ctx, 1, filterFnTrue) - assert.NoError(t, err) - assert.Equal(t, task2, got) - assert.False(t, got.ShouldRun(), "expect task2 should not run, since task1 failed") - assert.NoError(t, q.Done(ctx, got.ID, model.StatusSkipped)) - - waitForProcess() - got, err = q.Poll(ctx, 1, filterFnTrue) - assert.NoError(t, err) - assert.Equal(t, task3, got) - assert.False(t, got.ShouldRun(), "expect task3 should not run, task1 failed, thus task2 was skipped, task3 should be skipped too") -} - -func TestFifoCancel(t *testing.T) { - ctx, cancel := context.WithCancelCause(t.Context()) - t.Cleanup(func() { cancel(nil) }) - - task1 := genDummyTask() - task2 := &model.Task{ - ID: "2", - Dependencies: []string{"1"}, - DepStatus: make(map[string]model.StatusValue), - } - task3 := &model.Task{ - ID: "3", - Dependencies: []string{"1"}, - DepStatus: make(map[string]model.StatusValue), - RunOn: []string{"success", "failure"}, - } - - q, _ := NewMemoryQueue(ctx).(*fifo) - assert.NotNil(t, q) - - assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1})) - - _, _ = q.Poll(ctx, 1, filterFnTrue) - assert.NoError(t, q.Error(ctx, task1.ID, fmt.Errorf("canceled"))) - assert.NoError(t, q.Error(ctx, task2.ID, fmt.Errorf("canceled"))) - assert.NoError(t, q.Error(ctx, task3.ID, fmt.Errorf("canceled"))) - info := q.Info(ctx) - assert.Len(t, info.Pending, 0, "all pipelines should be canceled") - - time.Sleep(processTimeInterval * 2) - info = q.Info(ctx) - assert.Len(t, info.Pending, 2, "canceled are rescheduled") - assert.Len(t, info.Running, 0, "canceled are rescheduled") - assert.Len(t, info.WaitingOnDeps, 0, "canceled are rescheduled") -} - -func TestFifoPause(t *testing.T) { - ctx, cancel := context.WithCancelCause(t.Context()) - t.Cleanup(func() { cancel(nil) }) - - q, _ := NewMemoryQueue(ctx).(*fifo) - assert.NotNil(t, q) - - dummyTask := genDummyTask() - - var wg sync.WaitGroup - wg.Add(1) - go func() { - _, _ = q.Poll(ctx, 1, filterFnTrue) - wg.Done() - }() - - q.Pause() - t0 := time.Now() - assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{dummyTask})) - waitForProcess() - q.Resume() - - wg.Wait() - t1 := time.Now() - - assert.Greater(t, t1.Sub(t0), 20*time.Millisecond, "should have waited til resume") - - q.Pause() - assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{dummyTask})) - q.Resume() - _, _ = q.Poll(ctx, 1, filterFnTrue) -} - -func TestFifoPauseResume(t *testing.T) { - ctx, cancel := context.WithCancelCause(t.Context()) - t.Cleanup(func() { cancel(nil) }) - - q, _ := NewMemoryQueue(ctx).(*fifo) - assert.NotNil(t, q) - - dummyTask := genDummyTask() - - q.Pause() - assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{dummyTask})) - q.Resume() - - _, _ = q.Poll(ctx, 1, filterFnTrue) -} - -func TestWaitingVsPending(t *testing.T) { - ctx, cancel := context.WithCancelCause(t.Context()) - t.Cleanup(func() { cancel(nil) }) - - task1 := genDummyTask() - task2 := &model.Task{ - ID: "2", - Dependencies: []string{"1"}, - DepStatus: make(map[string]model.StatusValue), - } - task3 := &model.Task{ - ID: "3", - Dependencies: []string{"1"}, - DepStatus: make(map[string]model.StatusValue), - RunOn: []string{"success", "failure"}, - } - - q, _ := NewMemoryQueue(ctx).(*fifo) - assert.NotNil(t, q) - - assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1})) - - got, _ := q.Poll(ctx, 1, filterFnTrue) - - waitForProcess() - info := q.Info(ctx) - assert.Equal(t, 2, info.Stats.WaitingOnDeps) - - assert.NoError(t, q.Error(ctx, got.ID, fmt.Errorf("exit code 1, there was an error"))) - got, err := q.Poll(ctx, 1, filterFnTrue) - assert.NoError(t, err) - assert.EqualValues(t, task2, got) - - waitForProcess() - info = q.Info(ctx) - assert.Equal(t, 0, info.Stats.WaitingOnDeps) - assert.Equal(t, 1, info.Stats.Pending) -} - -func TestShouldRun(t *testing.T) { - task := &model.Task{ - ID: "2", - Dependencies: []string{"1"}, - DepStatus: map[string]model.StatusValue{ - "1": model.StatusSuccess, - }, - RunOn: []string{"failure"}, - } - assert.False(t, task.ShouldRun(), "expect task to not run, it runs on failure only") - - task = &model.Task{ - ID: "2", - Dependencies: []string{"1"}, - DepStatus: map[string]model.StatusValue{ - "1": model.StatusSuccess, - }, - RunOn: []string{"failure", "success"}, - } - assert.True(t, task.ShouldRun()) - - task = &model.Task{ - ID: "2", - Dependencies: []string{"1"}, - DepStatus: map[string]model.StatusValue{ - "1": model.StatusFailure, - }, - } - assert.False(t, task.ShouldRun()) - - task = &model.Task{ - ID: "2", - Dependencies: []string{"1"}, - DepStatus: map[string]model.StatusValue{ - "1": model.StatusSuccess, - }, - RunOn: []string{"success"}, - } - assert.True(t, task.ShouldRun()) - - task = &model.Task{ - ID: "2", - Dependencies: []string{"1"}, - DepStatus: map[string]model.StatusValue{ - "1": model.StatusFailure, - }, - RunOn: []string{"failure"}, - } - assert.True(t, task.ShouldRun()) - - task = &model.Task{ - ID: "2", - Dependencies: []string{"1"}, - DepStatus: map[string]model.StatusValue{ - "1": model.StatusSkipped, - }, - } - assert.False(t, task.ShouldRun(), "task should not run if dependency is skipped") - - task = &model.Task{ - ID: "2", - Dependencies: []string{"1"}, - DepStatus: map[string]model.StatusValue{ - "1": model.StatusSkipped, - }, - RunOn: []string{"failure"}, - } - assert.True(t, task.ShouldRun(), "on failure, tasks should run on skipped deps, something failed higher up the chain") -} - -func TestFifoWithScoring(t *testing.T) { - ctx, cancel := context.WithCancelCause(t.Context()) - t.Cleanup(func() { cancel(nil) }) + defer cancel(nil) q := NewMemoryQueue(ctx) - // Create tasks with different labels tasks := []*model.Task{ {ID: "1", Labels: map[string]string{"org-id": "123", "platform": "linux"}}, {ID: "2", Labels: map[string]string{"org-id": "456", "platform": "linux"}}, - {ID: "3", Labels: map[string]string{"org-id": "789", "platform": "windows"}}, - {ID: "4", Labels: map[string]string{"org-id": "123", "platform": "linux"}}, - {ID: "5", Labels: map[string]string{"org-id": "*", "platform": "linux"}}, + {ID: "3", Labels: map[string]string{"org-id": "123", "platform": "windows"}}, } assert.NoError(t, q.PushAtOnce(ctx, tasks)) - // Create filter functions for different workers - filters := map[int]FilterFn{ - 1: func(task *model.Task) (bool, int) { - if task.Labels["org-id"] == "123" { - return true, 20 - } - if task.Labels["platform"] == "linux" { - return true, 10 - } - return true, 1 - }, - 2: func(task *model.Task) (bool, int) { - if task.Labels["org-id"] == "456" { - return true, 20 - } - if task.Labels["platform"] == "linux" { - return true, 10 - } - return true, 1 - }, - 3: func(task *model.Task) (bool, int) { - if task.Labels["platform"] == "windows" { - return true, 20 - } - return true, 1 - }, - 4: func(task *model.Task) (bool, int) { - if task.Labels["org-id"] == "123" { - return true, 20 - } - if task.Labels["platform"] == "linux" { - return true, 10 - } - return true, 1 - }, - 5: func(task *model.Task) (bool, int) { - if task.Labels["org-id"] == "*" { - return true, 15 - } - return true, 1 - }, + filter123 := func(task *model.Task) (bool, int) { + if task.Labels["org-id"] == "123" { + return true, 20 + } + return true, 1 } - // Start polling in separate goroutines - results := make(chan *model.Task, 5) - for i := 1; i <= 5; i++ { - go func(n int) { - task, err := q.Poll(ctx, int64(n), filters[n]) - assert.NoError(t, err) - results <- task - }(i) + filter456 := func(task *model.Task) (bool, int) { + if task.Labels["org-id"] == "456" { + return true, 20 + } + return true, 1 } - // Collect results + results := make(chan *model.Task, 2) + go func() { + task, _ := q.Poll(ctx, 1, filter123) + results <- task + }() + go func() { + task, _ := q.Poll(ctx, 2, filter456) + results <- task + }() + receivedTasks := make(map[string]int64) - for i := 0; i < 5; i++ { + for i := 0; i < 2; i++ { select { case task := <-results: receivedTasks[task.ID] = task.AgentID @@ -674,26 +903,89 @@ func TestFifoWithScoring(t *testing.T) { } } - assert.Len(t, receivedTasks, 5, "All tasks should be assigned") + assert.Contains(t, []string{"1", "3"}, findTaskByAgent(receivedTasks, 1)) + assert.Equal(t, "2", findTaskByAgent(receivedTasks, 2)) - // Define expected agent assignments - // Map structure: {taskID: []possible agentIDs} - // - taskID "1" and "4" can be assigned to agents 1 or 4 (org-id "123") - // - taskID "2" should be assigned to agent 2 (org-id "456") - // - taskID "3" should be assigned to agent 3 (platform "windows") - // - taskID "5" should be assigned to agent 5 (org-id "*") - expectedAssignments := map[string][]int64{ - "1": {1, 4}, - "2": {2}, - "3": {3}, - "4": {1, 4}, - "5": {5}, + // Edge case: filter that rejects all tasks + filterRejectAll := func(task *model.Task) (bool, int) { + return false, 0 } - // Check if tasks are assigned as expected - for taskID, expectedAgents := range expectedAssignments { - agentID, ok := receivedTasks[taskID] - assert.True(t, ok, "Task %s should be assigned", taskID) - assert.Contains(t, expectedAgents, agentID, "Task %s should be assigned to one of the expected agents", taskID) - } + task4 := &model.Task{ID: "4", Labels: map[string]string{"org-id": "789"}} + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task4})) + waitForProcess() + + pollCtx, pollCancel := context.WithTimeout(ctx, 200*time.Millisecond) + _, err := q.Poll(pollCtx, 99, filterRejectAll) + pollCancel() + assert.Error(t, err) // Should timeout as filter rejects task + + // Clean up remaining tasks + task3, _ := q.Poll(ctx, 1, filterFnTrue) + assert.NoError(t, q.Done(ctx, task3.ID, model.StatusSuccess)) + task4Got, _ := q.Poll(ctx, 99, filterFnTrue) + assert.NoError(t, q.Done(ctx, task4Got.ID, model.StatusSuccess)) + waitForProcess() +} + +func TestShouldRunLogic(t *testing.T) { + tests := []struct { + name string + depStatus model.StatusValue + runOn []string + expected bool + }{ + {"Success without RunOn", model.StatusSuccess, nil, true}, + {"Failure without RunOn", model.StatusFailure, nil, false}, + {"Success with failure RunOn", model.StatusSuccess, []string{"failure"}, false}, + {"Failure with failure RunOn", model.StatusFailure, []string{"failure"}, true}, + {"Success with both RunOn", model.StatusSuccess, []string{"success", "failure"}, true}, + {"Skipped without RunOn", model.StatusSkipped, nil, false}, + {"Skipped with failure RunOn", model.StatusSkipped, []string{"failure"}, true}, + // Edge cases + {"Killed without RunOn", model.StatusKilled, nil, false}, + {"Killed with failure RunOn", model.StatusKilled, []string{"failure"}, true}, + {"Success with success RunOn only", model.StatusSuccess, []string{"success"}, true}, + {"Failure with success RunOn only", model.StatusFailure, []string{"success"}, false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + task := &model.Task{ + ID: "2", + Dependencies: []string{"1"}, + DepStatus: map[string]model.StatusValue{"1": tt.depStatus}, + RunOn: tt.runOn, + } + assert.Equal(t, tt.expected, task.ShouldRun()) + }) + } + + // Edge case: multiple dependencies with mixed statuses + t.Run("multiple deps mixed status", func(t *testing.T) { + task := &model.Task{ + ID: "3", + Dependencies: []string{"1", "2"}, + DepStatus: map[string]model.StatusValue{ + "1": model.StatusSuccess, + "2": model.StatusFailure, + }, + RunOn: nil, + } + // With default RunOn (nil), needs all deps successful + assert.False(t, task.ShouldRun()) + + task.RunOn = []string{"success", "failure"} + // With both RunOn, should run regardless + assert.True(t, task.ShouldRun()) + }) +} + +func findTaskByAgent(tasks map[string]int64, agentID int64) string { + for taskID, aid := range tasks { + if aid == agentID { + return taskID + } + } + return "" } diff --git a/server/queue/mocks/mock_Queue.go b/server/queue/mocks/mock_Queue.go index 97a215ed9..0ae4eb09d 100644 --- a/server/queue/mocks/mock_Queue.go +++ b/server/queue/mocks/mock_Queue.go @@ -228,63 +228,6 @@ func (_c *MockQueue_ErrorAtOnce_Call) RunAndReturn(run func(c context.Context, i return _c } -// EvictAtOnce provides a mock function for the type MockQueue -func (_mock *MockQueue) EvictAtOnce(c context.Context, ids []string) error { - ret := _mock.Called(c, ids) - - if len(ret) == 0 { - panic("no return value specified for EvictAtOnce") - } - - var r0 error - if returnFunc, ok := ret.Get(0).(func(context.Context, []string) error); ok { - r0 = returnFunc(c, ids) - } else { - r0 = ret.Error(0) - } - return r0 -} - -// MockQueue_EvictAtOnce_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'EvictAtOnce' -type MockQueue_EvictAtOnce_Call struct { - *mock.Call -} - -// EvictAtOnce is a helper method to define mock.On call -// - c context.Context -// - ids []string -func (_e *MockQueue_Expecter) EvictAtOnce(c interface{}, ids interface{}) *MockQueue_EvictAtOnce_Call { - return &MockQueue_EvictAtOnce_Call{Call: _e.mock.On("EvictAtOnce", c, ids)} -} - -func (_c *MockQueue_EvictAtOnce_Call) Run(run func(c context.Context, ids []string)) *MockQueue_EvictAtOnce_Call { - _c.Call.Run(func(args mock.Arguments) { - var arg0 context.Context - if args[0] != nil { - arg0 = args[0].(context.Context) - } - var arg1 []string - if args[1] != nil { - arg1 = args[1].([]string) - } - run( - arg0, - arg1, - ) - }) - return _c -} - -func (_c *MockQueue_EvictAtOnce_Call) Return(err error) *MockQueue_EvictAtOnce_Call { - _c.Call.Return(err) - return _c -} - -func (_c *MockQueue_EvictAtOnce_Call) RunAndReturn(run func(c context.Context, ids []string) error) *MockQueue_EvictAtOnce_Call { - _c.Call.Return(run) - return _c -} - // Extend provides a mock function for the type MockQueue func (_mock *MockQueue) Extend(c context.Context, agentID int64, workflowID string) error { ret := _mock.Called(c, agentID, workflowID) diff --git a/server/queue/persistent.go b/server/queue/persistent.go index b5a887ed5..2178389e7 100644 --- a/server/queue/persistent.go +++ b/server/queue/persistent.go @@ -72,19 +72,6 @@ func (q *persistentQueue) Poll(c context.Context, agentID int64, f FilterFn) (*m return task, err } -// EvictAtOnce removes multiple pending tasks from the queue. -func (q *persistentQueue) EvictAtOnce(c context.Context, ids []string) error { - if err := q.Queue.EvictAtOnce(c, ids); err != nil { - return err - } - for _, id := range ids { - if err := q.store.TaskDelete(id); err != nil { - return err - } - } - return nil -} - // Error signals the task is done with an error. func (q *persistentQueue) Error(c context.Context, id string, err error) error { if err := q.Queue.Error(c, id, err); err != nil { @@ -93,7 +80,8 @@ func (q *persistentQueue) Error(c context.Context, id string, err error) error { return q.store.TaskDelete(id) } -// ErrorAtOnce signals multiple tasks are done with an error. +// ErrorAtOnce signals multiple tasks are done and complete with an error. +// If still pending they will just get removed from the queue. func (q *persistentQueue) ErrorAtOnce(c context.Context, ids []string, err error) error { if err := q.Queue.ErrorAtOnce(c, ids, err); err != nil { return err diff --git a/server/queue/queue.go b/server/queue/queue.go index e40588711..d924eddcf 100644 --- a/server/queue/queue.go +++ b/server/queue/queue.go @@ -36,6 +36,9 @@ var ( // ErrTaskExpired indicates a running task exceeded its lease/deadline and was resubmitted. ErrTaskExpired = errors.New("queue: task expired") + + // ErrWorkerKicked worker of an agent got kicked. + ErrWorkerKicked = errors.New("worker was kicked") ) // InfoT provides runtime information. @@ -93,12 +96,10 @@ type Queue interface { // Error signals the task is done with an error. Error(c context.Context, id string, err error) error - // ErrorAtOnce signals multiple done are complete with an error. + // ErrorAtOnce signals multiple tasks are done and complete with an error. + // If still pending they will just get removed from the queue. ErrorAtOnce(c context.Context, ids []string, err error) error - // EvictAtOnce removes multiple pending tasks from the queue. - EvictAtOnce(c context.Context, ids []string) error - // Wait waits until the task is complete. Wait(c context.Context, id string) error