From f202470c7d81fdbac377cf499e445a6e6519b38b Mon Sep 17 00:00:00 2001 From: Sam Richard Date: Mon, 26 Jan 2026 22:42:35 -0300 Subject: [PATCH] fix(agent): workflow runner use shutdown context (#6021) Co-authored-by: 6543 <6543@obermui.de> --- agent/runner.go | 94 +++++++++++++++++++++++++++---------------------- 1 file changed, 52 insertions(+), 42 deletions(-) diff --git a/agent/runner.go b/agent/runner.go index 4cf5b835e..a618dc7ef 100644 --- a/agent/runner.go +++ b/agent/runner.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "sync" + "sync/atomic" "time" "github.com/rs/zerolog/log" @@ -50,13 +51,14 @@ func NewRunner(workEngine rpc.Peer, f rpc.Filter, h string, state *State, backen } } -func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:contextcheck +func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { log.Debug().Msg("request next execution") + // Preserve metadata AND cancellation from runnerCtx. meta, _ := metadata.FromOutgoingContext(runnerCtx) - ctxMeta := metadata.NewOutgoingContext(context.Background(), meta) + ctxMeta := metadata.NewOutgoingContext(shutdownCtx, meta) - // get the next workflow from the queue + // Fetch next workflow from the queue workflow, err := r.client.Next(runnerCtx, r.filter) if err != nil { return err @@ -65,6 +67,7 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:co return nil } + // Compute workflow timeout timeout := time.Hour if minutes := workflow.Timeout; minutes != 0 { timeout = time.Duration(minutes) * time.Minute @@ -73,12 +76,8 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:co repoName := extractRepositoryName(workflow.Config) // hack pipelineNumber := extractPipelineNumber(workflow.Config) // hack - r.counter.Add( - workflow.ID, - timeout, - repoName, - pipelineNumber, - ) + // Track workflow execution in runner state + r.counter.Add(workflow.ID, timeout, repoName, pipelineNumber) defer r.counter.Done(workflow.ID) logger := log.With(). @@ -89,57 +88,71 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:co logger.Debug().Msg("received execution") + // Workflow execution context. + // This context is the SINGLE source of truth for cancellation. workflowCtx, cancel := context.WithTimeout(ctxMeta, timeout) defer cancel() - // Add sigterm support for internal context. - // Required when the pipeline is terminated by external signals - // like kubernetes. + // Handle SIGTERM (k8s, docker, system shutdown) workflowCtx = utils.WithContextSigtermCallback(workflowCtx, func() { - logger.Error().Msg("Received sigterm termination signal") + logger.Error().Msg("received sigterm termination signal") + cancel() }) - canceled := false + // canceled indicates whether the workflow was canceled remotely (UI/API). + // Must be atomic because it is written from a goroutine and read later. + var canceled atomic.Bool + + // Listen for remote cancel events (UI / API). + // When canceled, we MUST cancel the workflow context + // so that pipeline execution and backend processes stop immediately. go func() { - logger.Debug().Msg("listen for cancel signal") + logger.Debug().Msg("listening for cancel signal") if err := r.client.Wait(workflowCtx, workflow.ID); err != nil { - canceled = true - logger.Warn().Err(err).Msg("cancel signal received") + logger.Warn().Err(err).Msg("cancel signal received from server") + + // Mark workflow as canceled (thread-safe) + canceled.Store(true) + + // Propagate cancellation to pipeline + backend cancel() } else { - logger.Debug().Msg("done listening for cancel signal") + // Wait returned without error, meaning the workflow finished normally + logger.Debug().Msg("cancel listener exited normally") } }() + // Periodically extend the workflow lease while running go func() { for { select { case <-workflowCtx.Done(): - logger.Debug().Msg("pipeline done") + logger.Debug().Msg("workflow context done") return case <-time.After(constant.TaskTimeout / 3): - logger.Debug().Msg("pipeline lease renewed") + logger.Debug().Msg("renewing workflow lease") if err := r.client.Extend(workflowCtx, workflow.ID); err != nil { - log.Error().Err(err).Msg("extending pipeline deadline failed") + logger.Error().Err(err).Msg("failed to extend workflow lease") } } } }() - state := rpc.WorkflowState{} - state.Started = time.Now().Unix() - - err = r.client.Init(runnerCtx, workflow.ID, state) - if err != nil { + state := rpc.WorkflowState{ + Started: time.Now().Unix(), + } + if err := r.client.Init(runnerCtx, workflow.ID, state); err != nil { logger.Error().Err(err).Msg("workflow initialization failed") // TODO: should we return here? } var uploads sync.WaitGroup - //nolint:contextcheck - err = pipeline.New(workflow.Config, + + // Run pipeline + err = pipeline.New( + workflow.Config, pipeline.WithContext(workflowCtx), pipeline.WithTaskUUID(fmt.Sprint(workflow.ID)), pipeline.WithLogger(r.createLogger(logger, &uploads, workflow)), @@ -154,10 +167,10 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:co state.Finished = time.Now().Unix() - if errors.Is(err, pipeline.ErrCancel) { - canceled = true - } else if canceled { - err = errors.Join(err, pipeline.ErrCancel) + // Normalize cancellation error + if errors.Is(err, pipeline.ErrCancel) || canceled.Load() { + canceled.Store(true) + err = pipeline.ErrCancel } if err != nil { @@ -166,25 +179,22 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:co logger.Debug(). Str("error", state.Error). - Bool("canceled", canceled). + Bool("canceled", canceled.Load()). Msg("workflow finished") - logger.Debug().Msg("uploading logs and traces / states ...") + // Ensure all logs/traces are uploaded before finishing + logger.Debug().Msg("waiting for logs and traces upload") uploads.Wait() - logger.Debug().Msg("uploaded logs and traces / states") - - logger.Debug(). - Str("error", state.Error). - Msg("updating workflow status") + logger.Debug().Msg("logs and traces uploaded") + // Update workflow state doneCtx := runnerCtx if doneCtx.Err() != nil { doneCtx = shutdownCtx } + if err := r.client.Done(doneCtx, workflow.ID, state); err != nil { - logger.Error().Err(err).Msg("updating workflow status failed") - } else { - logger.Debug().Msg("updating workflow status complete") + logger.Error().Err(err).Msg("failed to update workflow status") } return nil