Simplify and Fix server task queue (#6017)

This commit is contained in:
6543
2026-01-27 14:27:03 +01:00
committed by GitHub
parent f202470c7d
commit 4edfefe5d6
8 changed files with 961 additions and 774 deletions

View File

@@ -296,7 +296,7 @@ func run(ctx context.Context, c *cli.Command, backends []types.Backend) error {
return nil return nil
} }
log.Debug().Msg("polling new steps") log.Debug().Msg("polling new workflow")
if err := runner.Run(agentCtx, shutdownCtx); err != nil { if err := runner.Run(agentCtx, shutdownCtx); err != nil {
log.Error().Err(err).Msg("runner error, retrying...") log.Error().Err(err).Msg("runner error, retrying...")
// Check if context is canceled // Check if context is canceled

View File

@@ -39,31 +39,28 @@ func Cancel(ctx context.Context, _forge forge.Forge, store store.Store, repo *mo
return &ErrNotFound{Msg: err.Error()} return &ErrNotFound{Msg: err.Error()}
} }
// First cancel/evict steps in the queue in one go // First cancel/evict workflows in the queue in one go
var ( var (
stepsToCancel []string workflowsToCancel []string
stepsToEvict []string workflowsToEvict []string
) )
for _, workflow := range workflows { for _, workflow := range workflows {
if workflow.State == model.StatusRunning { if workflow.State == model.StatusRunning {
stepsToCancel = append(stepsToCancel, fmt.Sprint(workflow.ID)) workflowsToCancel = append(workflowsToCancel, fmt.Sprint(workflow.ID))
} }
if workflow.State == model.StatusPending { if workflow.State == model.StatusPending {
stepsToEvict = append(stepsToEvict, fmt.Sprint(workflow.ID)) workflowsToEvict = append(workflowsToEvict, fmt.Sprint(workflow.ID))
} }
} }
if len(stepsToEvict) != 0 { if len(workflowsToEvict) != 0 {
if err := server.Config.Services.Queue.EvictAtOnce(ctx, stepsToEvict); err != nil { if err := server.Config.Services.Queue.ErrorAtOnce(ctx, workflowsToEvict, queue.ErrCancel); err != nil {
log.Error().Err(err).Msgf("queue: evict_at_once: %v", stepsToEvict) log.Error().Err(err).Msgf("queue: evict_at_once: %v", workflowsToEvict)
}
if err := server.Config.Services.Queue.ErrorAtOnce(ctx, stepsToEvict, queue.ErrCancel); err != nil {
log.Error().Err(err).Msgf("queue: evict_at_once: %v", stepsToEvict)
} }
} }
if len(stepsToCancel) != 0 { if len(workflowsToCancel) != 0 {
if err := server.Config.Services.Queue.ErrorAtOnce(ctx, stepsToCancel, queue.ErrCancel); err != nil { if err := server.Config.Services.Queue.ErrorAtOnce(ctx, workflowsToCancel, queue.ErrCancel); err != nil {
log.Error().Err(err).Msgf("queue: evict_at_once: %v", stepsToCancel) log.Error().Err(err).Msgf("queue: evict_at_once: %v", workflowsToCancel)
} }
} }

View File

@@ -1,29 +0,0 @@
BSD 3-Clause License
Copyright (c) 2017, Brad Rydzewski
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@@ -17,7 +17,7 @@ package queue
import ( import (
"container/list" "container/list"
"context" "context"
"fmt" "errors"
"slices" "slices"
"sync" "sync"
"time" "time"
@@ -58,8 +58,6 @@ type fifo struct {
// as the agent pull in 10 milliseconds we should also give them work asap. // as the agent pull in 10 milliseconds we should also give them work asap.
const processTimeInterval = 100 * time.Millisecond const processTimeInterval = 100 * time.Millisecond
var ErrWorkerKicked = fmt.Errorf("worker was kicked")
// NewMemoryQueue returns a new fifo queue. // NewMemoryQueue returns a new fifo queue.
func NewMemoryQueue(ctx context.Context) Queue { func NewMemoryQueue(ctx context.Context) Queue {
q := &fifo{ q := &fifo{
@@ -90,23 +88,23 @@ func (q *fifo) Poll(c context.Context, agentID int64, filter FilterFn) (*model.T
q.Lock() q.Lock()
ctx, stop := context.WithCancelCause(c) ctx, stop := context.WithCancelCause(c)
_worker := &worker{ w := &worker{
agentID: agentID, agentID: agentID,
channel: make(chan *model.Task, 1), channel: make(chan *model.Task, 1),
filter: filter, filter: filter,
stop: stop, stop: stop,
} }
q.workers[_worker] = struct{}{} q.workers[w] = struct{}{}
q.Unlock() q.Unlock()
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
q.Lock() q.Lock()
delete(q.workers, _worker) delete(q.workers, w)
q.Unlock() q.Unlock()
return nil, ctx.Err() return nil, ctx.Err()
case t := <-_worker.channel: case t := <-w.channel:
return t, nil return t, nil
} }
} }
@@ -122,47 +120,40 @@ func (q *fifo) Error(_ context.Context, id string, err error) error {
return q.finished([]string{id}, model.StatusFailure, err) return q.finished([]string{id}, model.StatusFailure, err)
} }
// ErrorAtOnce signals multiple done are complete with an error. // ErrorAtOnce signals multiple tasks are done and complete with an error.
// If still pending they will just get removed from the queue.
func (q *fifo) ErrorAtOnce(_ context.Context, ids []string, err error) error { func (q *fifo) ErrorAtOnce(_ context.Context, ids []string, err error) error {
if errors.Is(err, ErrCancel) {
return q.finished(ids, model.StatusKilled, err)
}
return q.finished(ids, model.StatusFailure, err) return q.finished(ids, model.StatusFailure, err)
} }
// locks the queue itself!
func (q *fifo) finished(ids []string, exitStatus model.StatusValue, err error) error { func (q *fifo) finished(ids []string, exitStatus model.StatusValue, err error) error {
q.Lock() q.Lock()
defer q.Unlock()
var errs []error
// we first process the tasks itself
for _, id := range ids { for _, id := range ids {
taskEntry, ok := q.running[id] if taskEntry, ok := q.running[id]; ok {
if ok {
taskEntry.error = err taskEntry.error = err
close(taskEntry.done) close(taskEntry.done)
delete(q.running, id) delete(q.running, id)
} else { } else {
q.removeFromPending(id) errs = append(errs, q.removeFromPendingAndWaiting(id))
} }
}
// next we aim for there dependencies
// we do this because in our ids list there could be tasks and its dependencies
// so not to mess things up
for _, id := range ids {
q.updateDepStatusInQueue(id, exitStatus) q.updateDepStatusInQueue(id, exitStatus)
} }
q.Unlock() return errors.Join(errs...)
return nil
}
// EvictAtOnce removes multiple pending tasks from the queue.
func (q *fifo) EvictAtOnce(_ context.Context, taskIDs []string) error {
q.Lock()
defer q.Unlock()
for _, id := range taskIDs {
var next *list.Element
for element := q.pending.Front(); element != nil; element = next {
next = element.Next()
task, ok := element.Value.(*model.Task)
if ok && task.ID == id {
q.pending.Remove(element)
return nil
}
}
}
return ErrNotFound
} }
// Wait waits until the item is done executing. // Wait waits until the item is done executing.
@@ -286,19 +277,15 @@ func (q *fifo) process() {
func (q *fifo) filterWaiting() { func (q *fifo) filterWaiting() {
// resubmits all waiting tasks to pending, deps may have cleared // resubmits all waiting tasks to pending, deps may have cleared
var nextWaiting *list.Element for element := q.waitingOnDeps.Front(); element != nil; element = element.Next() {
for e := q.waitingOnDeps.Front(); e != nil; e = nextWaiting { task, _ := element.Value.(*model.Task)
nextWaiting = e.Next()
task, _ := e.Value.(*model.Task)
q.pending.PushBack(task) q.pending.PushBack(task)
} }
// rebuild waitingDeps // rebuild waitingDeps
q.waitingOnDeps = list.New() q.waitingOnDeps = list.New()
var filtered []*list.Element var filtered []*list.Element
var nextPending *list.Element for element := q.pending.Front(); element != nil; element = element.Next() {
for element := q.pending.Front(); element != nil; element = nextPending {
nextPending = element.Next()
task, _ := element.Value.(*model.Task) task, _ := element.Value.(*model.Task)
if q.depsInQueue(task) { if q.depsInQueue(task) {
log.Debug().Msgf("queue: waiting due to unmet dependencies %v", task.ID) log.Debug().Msgf("queue: waiting due to unmet dependencies %v", task.ID)
@@ -314,12 +301,10 @@ func (q *fifo) filterWaiting() {
} }
func (q *fifo) assignToWorker() (*list.Element, *worker) { func (q *fifo) assignToWorker() (*list.Element, *worker) {
var next *list.Element
var bestWorker *worker var bestWorker *worker
var bestScore int var bestScore int
for element := q.pending.Front(); element != nil; element = next { for element := q.pending.Front(); element != nil; element = element.Next() {
next = element.Next()
task, _ := element.Value.(*model.Task) task, _ := element.Value.(*model.Task)
log.Debug().Msgf("queue: trying to assign task: %v with deps %v", task.ID, task.Dependencies) log.Debug().Msgf("queue: trying to assign task: %v with deps %v", task.ID, task.Dependencies)
@@ -352,9 +337,7 @@ func (q *fifo) resubmitExpiredPipelines() {
} }
func (q *fifo) depsInQueue(task *model.Task) bool { func (q *fifo) depsInQueue(task *model.Task) bool {
var next *list.Element for element := q.pending.Front(); element != nil; element = element.Next() {
for element := q.pending.Front(); element != nil; element = next {
next = element.Next()
possibleDep, ok := element.Value.(*model.Task) possibleDep, ok := element.Value.(*model.Task)
log.Debug().Msgf("queue: pending right now: %v", possibleDep.ID) log.Debug().Msgf("queue: pending right now: %v", possibleDep.ID)
for _, dep := range task.Dependencies { for _, dep := range task.Dependencies {
@@ -372,13 +355,12 @@ func (q *fifo) depsInQueue(task *model.Task) bool {
return false return false
} }
// expects the q to be currently owned e.g. locked by caller!
func (q *fifo) updateDepStatusInQueue(taskID string, status model.StatusValue) { func (q *fifo) updateDepStatusInQueue(taskID string, status model.StatusValue) {
var next *list.Element for element := q.pending.Front(); element != nil; element = element.Next() {
for element := q.pending.Front(); element != nil; element = next { pending, _ := element.Value.(*model.Task)
next = element.Next()
pending, ok := element.Value.(*model.Task)
for _, dep := range pending.Dependencies { for _, dep := range pending.Dependencies {
if ok && taskID == dep { if taskID == dep {
pending.DepStatus[dep] = status pending.DepStatus[dep] = status
} }
} }
@@ -392,27 +374,40 @@ func (q *fifo) updateDepStatusInQueue(taskID string, status model.StatusValue) {
} }
} }
for element := q.waitingOnDeps.Front(); element != nil; element = next { for element := q.waitingOnDeps.Front(); element != nil; element = element.Next() {
next = element.Next() waiting, _ := element.Value.(*model.Task)
waiting, ok := element.Value.(*model.Task)
for _, dep := range waiting.Dependencies { for _, dep := range waiting.Dependencies {
if ok && taskID == dep { if taskID == dep {
waiting.DepStatus[dep] = status waiting.DepStatus[dep] = status
} }
} }
} }
} }
func (q *fifo) removeFromPending(taskID string) { // expects the q to be currently owned e.g. locked by caller!
func (q *fifo) removeFromPendingAndWaiting(taskID string) error {
log.Debug().Msgf("queue: trying to remove %s", taskID) log.Debug().Msgf("queue: trying to remove %s", taskID)
var next *list.Element
for element := q.pending.Front(); element != nil; element = next { // we assume pending first
next = element.Next() for element := q.pending.Front(); element != nil; element = element.Next() {
task, _ := element.Value.(*model.Task) task, _ := element.Value.(*model.Task)
if task.ID == taskID { if task.ID == taskID {
log.Debug().Msgf("queue: %s is removed from pending", taskID) log.Debug().Msgf("queue: %s is removed from pending", taskID)
q.pending.Remove(element) _ = q.pending.Remove(element)
return return nil
} }
} }
// well looks like it's waiting
for element := q.waitingOnDeps.Front(); element != nil; element = element.Next() {
task, _ := element.Value.(*model.Task)
if task.ID == taskID {
log.Debug().Msgf("queue: %s is removed from waitingOnDeps", taskID)
_ = q.waitingOnDeps.Remove(element)
return nil
}
}
// well it could not be found
return ErrNotFound
} }

File diff suppressed because it is too large Load Diff

View File

@@ -228,63 +228,6 @@ func (_c *MockQueue_ErrorAtOnce_Call) RunAndReturn(run func(c context.Context, i
return _c return _c
} }
// EvictAtOnce provides a mock function for the type MockQueue
func (_mock *MockQueue) EvictAtOnce(c context.Context, ids []string) error {
ret := _mock.Called(c, ids)
if len(ret) == 0 {
panic("no return value specified for EvictAtOnce")
}
var r0 error
if returnFunc, ok := ret.Get(0).(func(context.Context, []string) error); ok {
r0 = returnFunc(c, ids)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockQueue_EvictAtOnce_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'EvictAtOnce'
type MockQueue_EvictAtOnce_Call struct {
*mock.Call
}
// EvictAtOnce is a helper method to define mock.On call
// - c context.Context
// - ids []string
func (_e *MockQueue_Expecter) EvictAtOnce(c interface{}, ids interface{}) *MockQueue_EvictAtOnce_Call {
return &MockQueue_EvictAtOnce_Call{Call: _e.mock.On("EvictAtOnce", c, ids)}
}
func (_c *MockQueue_EvictAtOnce_Call) Run(run func(c context.Context, ids []string)) *MockQueue_EvictAtOnce_Call {
_c.Call.Run(func(args mock.Arguments) {
var arg0 context.Context
if args[0] != nil {
arg0 = args[0].(context.Context)
}
var arg1 []string
if args[1] != nil {
arg1 = args[1].([]string)
}
run(
arg0,
arg1,
)
})
return _c
}
func (_c *MockQueue_EvictAtOnce_Call) Return(err error) *MockQueue_EvictAtOnce_Call {
_c.Call.Return(err)
return _c
}
func (_c *MockQueue_EvictAtOnce_Call) RunAndReturn(run func(c context.Context, ids []string) error) *MockQueue_EvictAtOnce_Call {
_c.Call.Return(run)
return _c
}
// Extend provides a mock function for the type MockQueue // Extend provides a mock function for the type MockQueue
func (_mock *MockQueue) Extend(c context.Context, agentID int64, workflowID string) error { func (_mock *MockQueue) Extend(c context.Context, agentID int64, workflowID string) error {
ret := _mock.Called(c, agentID, workflowID) ret := _mock.Called(c, agentID, workflowID)

View File

@@ -72,19 +72,6 @@ func (q *persistentQueue) Poll(c context.Context, agentID int64, f FilterFn) (*m
return task, err return task, err
} }
// EvictAtOnce removes multiple pending tasks from the queue.
func (q *persistentQueue) EvictAtOnce(c context.Context, ids []string) error {
if err := q.Queue.EvictAtOnce(c, ids); err != nil {
return err
}
for _, id := range ids {
if err := q.store.TaskDelete(id); err != nil {
return err
}
}
return nil
}
// Error signals the task is done with an error. // Error signals the task is done with an error.
func (q *persistentQueue) Error(c context.Context, id string, err error) error { func (q *persistentQueue) Error(c context.Context, id string, err error) error {
if err := q.Queue.Error(c, id, err); err != nil { if err := q.Queue.Error(c, id, err); err != nil {
@@ -93,7 +80,8 @@ func (q *persistentQueue) Error(c context.Context, id string, err error) error {
return q.store.TaskDelete(id) return q.store.TaskDelete(id)
} }
// ErrorAtOnce signals multiple tasks are done with an error. // ErrorAtOnce signals multiple tasks are done and complete with an error.
// If still pending they will just get removed from the queue.
func (q *persistentQueue) ErrorAtOnce(c context.Context, ids []string, err error) error { func (q *persistentQueue) ErrorAtOnce(c context.Context, ids []string, err error) error {
if err := q.Queue.ErrorAtOnce(c, ids, err); err != nil { if err := q.Queue.ErrorAtOnce(c, ids, err); err != nil {
return err return err

View File

@@ -36,6 +36,9 @@ var (
// ErrTaskExpired indicates a running task exceeded its lease/deadline and was resubmitted. // ErrTaskExpired indicates a running task exceeded its lease/deadline and was resubmitted.
ErrTaskExpired = errors.New("queue: task expired") ErrTaskExpired = errors.New("queue: task expired")
// ErrWorkerKicked worker of an agent got kicked.
ErrWorkerKicked = errors.New("worker was kicked")
) )
// InfoT provides runtime information. // InfoT provides runtime information.
@@ -93,12 +96,10 @@ type Queue interface {
// Error signals the task is done with an error. // Error signals the task is done with an error.
Error(c context.Context, id string, err error) error Error(c context.Context, id string, err error) error
// ErrorAtOnce signals multiple done are complete with an error. // ErrorAtOnce signals multiple tasks are done and complete with an error.
// If still pending they will just get removed from the queue.
ErrorAtOnce(c context.Context, ids []string, err error) error ErrorAtOnce(c context.Context, ids []string, err error) error
// EvictAtOnce removes multiple pending tasks from the queue.
EvictAtOnce(c context.Context, ids []string) error
// Wait waits until the task is complete. // Wait waits until the task is complete.
Wait(c context.Context, id string) error Wait(c context.Context, id string) error