Fix pipeline cancellation status handling and step state synchronization (#6011)

Co-authored-by: pnkcaht <samzoovsk19@gmail.com>
Co-authored-by: qwerty287 <80460567+qwerty287@users.noreply.github.com>
Co-authored-by: Lauris B <lauris@nix.lv>
This commit is contained in:
6543
2026-02-05 21:41:05 +01:00
committed by GitHub
parent 1af1ef562c
commit 8a8f9ad3aa
26 changed files with 956 additions and 387 deletions

View File

@@ -36,6 +36,7 @@ func (r *Runner) createLogger(_logger zerolog.Logger, uploads *sync.WaitGroup, w
Logger() Logger()
uploads.Add(1) uploads.Add(1)
defer uploads.Done()
var secrets []string var secrets []string
for _, secret := range workflow.Config.Secrets { for _, secret := range workflow.Config.Secrets {
@@ -50,8 +51,6 @@ func (r *Runner) createLogger(_logger zerolog.Logger, uploads *sync.WaitGroup, w
} }
logger.Debug().Msg("log stream copied, close ...") logger.Debug().Msg("log stream copied, close ...")
uploads.Done()
return nil return nil
} }
} }

View File

@@ -148,15 +148,16 @@ func (c *client) Next(ctx context.Context, filter rpc.Filter) (*rpc.Workflow, er
return w, nil return w, nil
} }
// Wait blocks until the workflow is complete. // Wait blocks until the workflow with the given ID is marked as completed or canceled by the server.
func (c *client) Wait(ctx context.Context, workflowID string) (err error) { func (c *client) Wait(ctx context.Context, workflowID string) (canceled bool, err error) {
retry := c.newBackOff() retry := c.newBackOff()
req := new(proto.WaitRequest) req := new(proto.WaitRequest)
req.Id = workflowID req.Id = workflowID
for { for {
_, err = c.client.Wait(ctx, req) resp, err := c.client.Wait(ctx, req)
if err == nil { if err == nil {
break // wait block was released normally as expected by server
return resp.GetCanceled(), nil
} }
switch status.Code(err) { switch status.Code(err) {
@@ -164,10 +165,10 @@ func (c *client) Wait(ctx context.Context, workflowID string) (err error) {
if ctx.Err() != nil { if ctx.Err() != nil {
// expected as context was canceled // expected as context was canceled
log.Debug().Err(err).Msgf("grpc error: wait(): context canceled") log.Debug().Err(err).Msgf("grpc error: wait(): context canceled")
return nil return false, nil
} }
log.Error().Err(err).Msgf("grpc error: wait(): code: %v", status.Code(err)) log.Error().Err(err).Msgf("grpc error: wait(): code: %v", status.Code(err))
return err return false, err
case case
codes.Aborted, codes.Aborted,
codes.DataLoss, codes.DataLoss,
@@ -178,16 +179,15 @@ func (c *client) Wait(ctx context.Context, workflowID string) (err error) {
log.Warn().Err(err).Msgf("grpc error: wait(): code: %v", status.Code(err)) log.Warn().Err(err).Msgf("grpc error: wait(): code: %v", status.Code(err))
default: default:
log.Error().Err(err).Msgf("grpc error: wait(): code: %v", status.Code(err)) log.Error().Err(err).Msgf("grpc error: wait(): code: %v", status.Code(err))
return err return false, err
} }
select { select {
case <-time.After(retry.NextBackOff()): case <-time.After(retry.NextBackOff()):
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return false, ctx.Err()
} }
} }
return nil
} }
// Init signals the workflow is initialized. // Init signals the workflow is initialized.
@@ -199,6 +199,7 @@ func (c *client) Init(ctx context.Context, workflowID string, state rpc.Workflow
req.State.Started = state.Started req.State.Started = state.Started
req.State.Finished = state.Finished req.State.Finished = state.Finished
req.State.Error = state.Error req.State.Error = state.Error
req.State.Canceled = state.Canceled
for { for {
_, err = c.client.Init(ctx, req) _, err = c.client.Init(ctx, req)
if err == nil { if err == nil {
@@ -238,7 +239,7 @@ func (c *client) Init(ctx context.Context, workflowID string, state rpc.Workflow
return nil return nil
} }
// Done signals the workflow is complete. // Done let agent signal to server the workflow has stopped.
func (c *client) Done(ctx context.Context, workflowID string, state rpc.WorkflowState) (err error) { func (c *client) Done(ctx context.Context, workflowID string, state rpc.WorkflowState) (err error) {
retry := c.newBackOff() retry := c.newBackOff()
req := new(proto.DoneRequest) req := new(proto.DoneRequest)
@@ -247,6 +248,7 @@ func (c *client) Done(ctx context.Context, workflowID string, state rpc.Workflow
req.State.Started = state.Started req.State.Started = state.Started
req.State.Finished = state.Finished req.State.Finished = state.Finished
req.State.Error = state.Error req.State.Error = state.Error
req.State.Canceled = state.Canceled
for { for {
_, err = c.client.Done(ctx, req) _, err = c.client.Done(ctx, req)
if err == nil { if err == nil {
@@ -330,7 +332,7 @@ func (c *client) Extend(ctx context.Context, workflowID string) (err error) {
return nil return nil
} }
// Update updates the workflow state. // Update let agent updates the step state at the server.
func (c *client) Update(ctx context.Context, workflowID string, state rpc.StepState) (err error) { func (c *client) Update(ctx context.Context, workflowID string, state rpc.StepState) (err error) {
retry := c.newBackOff() retry := c.newBackOff()
req := new(proto.UpdateRequest) req := new(proto.UpdateRequest)
@@ -342,6 +344,7 @@ func (c *client) Update(ctx context.Context, workflowID string, state rpc.StepSt
req.State.Exited = state.Exited req.State.Exited = state.Exited
req.State.ExitCode = int32(state.ExitCode) req.State.ExitCode = int32(state.ExitCode)
req.State.Error = state.Error req.State.Error = state.Error
req.State.Canceled = state.Canceled
for { for {
_, err = c.client.Update(ctx, req) _, err = c.client.Update(ctx, req)
if err == nil { if err == nil {

View File

@@ -20,7 +20,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
@@ -51,6 +50,7 @@ func NewRunner(workEngine rpc.Peer, f rpc.Filter, h string, state *State, backen
} }
} }
// Run executes a workflow using a backend, tracks its state and reports the state back to the server.
func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error {
log.Debug().Msg("request next execution") log.Debug().Msg("request next execution")
@@ -90,34 +90,32 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error {
// Workflow execution context. // Workflow execution context.
// This context is the SINGLE source of truth for cancellation. // This context is the SINGLE source of truth for cancellation.
workflowCtx, cancel := context.WithTimeout(ctxMeta, timeout) workflowCtx, _ := context.WithTimeout(ctxMeta, timeout) //nolint:govet
defer cancel() workflowCtx, cancelWorkflowCtx := context.WithCancelCause(workflowCtx)
defer cancelWorkflowCtx(nil)
// Handle SIGTERM (k8s, docker, system shutdown) // Add sigterm support for internal context.
// Required to be able to terminate the running workflow by external signals.
workflowCtx = utils.WithContextSigtermCallback(workflowCtx, func() { workflowCtx = utils.WithContextSigtermCallback(workflowCtx, func() {
logger.Error().Msg("received sigterm termination signal") logger.Error().Msg("received sigterm termination signal")
cancel() // WithContextSigtermCallback would cancel the context too, but we want our own custom error
cancelWorkflowCtx(pipeline.ErrCancel)
}) })
// canceled indicates whether the workflow was canceled remotely (UI/API).
// Must be atomic because it is written from a goroutine and read later.
var canceled atomic.Bool
// Listen for remote cancel events (UI / API). // Listen for remote cancel events (UI / API).
// When canceled, we MUST cancel the workflow context // When canceled, we MUST cancel the workflow context
// so that pipeline execution and backend processes stop immediately. // so that workflow execution stop immediately.
go func() { go func() {
logger.Debug().Msg("listening for cancel signal") logger.Debug().Msg("start listening for server side cancel signal")
if err := r.client.Wait(workflowCtx, workflow.ID); err != nil { if canceled, err := r.client.Wait(workflowCtx, workflow.ID); err != nil {
logger.Warn().Err(err).Msg("cancel signal received from server") logger.Error().Err(err).Msg("server returned unexpected err while waiting for workflow to finish run")
cancelWorkflowCtx(err)
// Mark workflow as canceled (thread-safe)
canceled.Store(true)
// Propagate cancellation to pipeline + backend
cancel()
} else { } else {
if canceled {
logger.Debug().Err(err).Msg("server side cancel signal received")
cancelWorkflowCtx(pipeline.ErrCancel)
}
// Wait returned without error, meaning the workflow finished normally // Wait returned without error, meaning the workflow finished normally
logger.Debug().Msg("cancel listener exited normally") logger.Debug().Msg("cancel listener exited normally")
} }
@@ -143,9 +141,13 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error {
state := rpc.WorkflowState{ state := rpc.WorkflowState{
Started: time.Now().Unix(), Started: time.Now().Unix(),
} }
if err := r.client.Init(runnerCtx, workflow.ID, state); err != nil { if err := r.client.Init(runnerCtx, workflow.ID, state); err != nil {
logger.Error().Err(err).Msg("workflow initialization failed") logger.Error().Err(err).Msg("signaling workflow initialization to server failed")
// TODO: should we return here? // We have an error, maybe the server is currently unreachable or other server-side errors occurred.
// So let's clean up and end this not yet started workflow run.
cancelWorkflowCtx(err)
return err
} }
var uploads sync.WaitGroup var uploads sync.WaitGroup
@@ -167,19 +169,18 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error {
state.Finished = time.Now().Unix() state.Finished = time.Now().Unix()
// Normalize cancellation error
if errors.Is(err, pipeline.ErrCancel) || canceled.Load() {
canceled.Store(true)
err = pipeline.ErrCancel
}
if err != nil { if err != nil {
state.Error = err.Error() state.Error = err.Error()
if errors.Is(err, pipeline.ErrCancel) {
state.Canceled = true
// cleanup joined error messages
state.Error = pipeline.ErrCancel.Error()
}
} }
logger.Debug(). logger.Debug().
Str("error", state.Error). Str("error", state.Error).
Bool("canceled", canceled.Load()). Bool("canceled", state.Canceled).
Msg("workflow finished") Msg("workflow finished")
// Ensure all logs/traces are uploaded before finishing // Ensure all logs/traces are uploaded before finishing
@@ -195,6 +196,8 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error {
if err := r.client.Done(doneCtx, workflow.ID, state); err != nil { if err := r.client.Done(doneCtx, workflow.ID, state); err != nil {
logger.Error().Err(err).Msg("failed to update workflow status") logger.Error().Err(err).Msg("failed to update workflow status")
} else {
logger.Debug().Msg("signaling workflow stopped done")
} }
return nil return nil

View File

@@ -16,6 +16,7 @@ package agent
import ( import (
"context" "context"
"errors"
"runtime" "runtime"
"strconv" "strconv"
"sync" "sync"
@@ -30,6 +31,7 @@ import (
func (r *Runner) createTracer(ctxMeta context.Context, uploads *sync.WaitGroup, logger zerolog.Logger, workflow *rpc.Workflow) pipeline.TraceFunc { func (r *Runner) createTracer(ctxMeta context.Context, uploads *sync.WaitGroup, logger zerolog.Logger, workflow *rpc.Workflow) pipeline.TraceFunc {
return func(state *pipeline.State) error { return func(state *pipeline.State) error {
uploads.Add(1) uploads.Add(1)
defer uploads.Done()
stepLogger := logger.With(). stepLogger := logger.With().
Str("image", state.Pipeline.Step.Image). Str("image", state.Pipeline.Step.Image).
@@ -43,12 +45,15 @@ func (r *Runner) createTracer(ctxMeta context.Context, uploads *sync.WaitGroup,
StepUUID: state.Pipeline.Step.UUID, StepUUID: state.Pipeline.Step.UUID,
Exited: state.Process.Exited, Exited: state.Process.Exited,
ExitCode: state.Process.ExitCode, ExitCode: state.Process.ExitCode,
Started: time.Now().Unix(), // TODO: do not do this Started: state.Process.Started,
Finished: time.Now().Unix(), Canceled: errors.Is(state.Process.Error, pipeline.ErrCancel),
} }
if state.Process.Error != nil { if state.Process.Error != nil {
stepState.Error = state.Process.Error.Error() stepState.Error = state.Process.Error.Error()
} }
if state.Process.Exited {
stepState.Finished = time.Now().Unix()
}
defer func() { defer func() {
stepLogger.Debug().Msg("update step status") stepLogger.Debug().Msg("update step status")
@@ -60,7 +65,6 @@ func (r *Runner) createTracer(ctxMeta context.Context, uploads *sync.WaitGroup,
} }
stepLogger.Debug().Msg("update step status complete") stepLogger.Debug().Msg("update step status complete")
uploads.Done()
}() }()
if state.Process.Exited { if state.Process.Exited {
return nil return nil

View File

@@ -250,29 +250,11 @@ func (e *docker) StartStep(ctx context.Context, step *backend.Step, taskUUID str
return e.client.ContainerStart(ctx, containerName, container.StartOptions{}) return e.client.ContainerStart(ctx, containerName, container.StartOptions{})
} }
// WaitStep waits for a step container to exit.
//
// When the context is canceled, the container is immediately killed to prevent
// orphaned containers from continuing to run after agent shutdown.
func (e *docker) WaitStep(ctx context.Context, step *backend.Step, taskUUID string) (*backend.State, error) { func (e *docker) WaitStep(ctx context.Context, step *backend.Step, taskUUID string) (*backend.State, error) {
log := log.Logger.With(). log := log.Logger.With().Str("taskUUID", taskUUID).Str("stepUUID", step.UUID).Logger()
Str("taskUUID", taskUUID).
Str("stepUUID", step.UUID).
Logger()
log.Trace().Msgf("wait for step %s", step.Name) log.Trace().Msgf("wait for step %s", step.Name)
containerName := toContainerName(step) containerName := toContainerName(step)
done := make(chan struct{})
// Ensure container is killed if context is canceled (SIGTERM / pipeline cancel)
go func() {
select {
case <-ctx.Done():
_ = e.client.ContainerKill(context.Background(), containerName, "9") //nolint:contextcheck
case <-done:
}
}()
wait, errC := e.client.ContainerWait(ctx, containerName, "") wait, errC := e.client.ContainerWait(ctx, containerName, "")
select { select {
@@ -282,9 +264,6 @@ func (e *docker) WaitStep(ctx context.Context, step *backend.Step, taskUUID stri
log.Trace().Msgf("ContainerWait returned with err: %v", err) log.Trace().Msgf("ContainerWait returned with err: %v", err)
} }
// Stop cancellation watcher
close(done)
info, err := e.client.ContainerInspect(ctx, containerName) info, err := e.client.ContainerInspect(ctx, containerName)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@@ -171,9 +171,18 @@ func (e *local) StartStep(ctx context.Context, step *types.Step, taskUUID string
} }
} }
func (e *local) WaitStep(_ context.Context, step *types.Step, taskUUID string) (*types.State, error) { func (e *local) WaitStep(ctx context.Context, step *types.Step, taskUUID string) (*types.State, error) {
log.Trace().Str("taskUUID", taskUUID).Msgf("wait for step %s", step.Name) log.Trace().Str("taskUUID", taskUUID).Msgf("wait for step %s", step.Name)
stepState := &types.State{
Exited: true,
}
if err := ctx.Err(); err != nil {
stepState.Error = err
return stepState, nil
}
state, err := e.getStepState(taskUUID, step.UUID) state, err := e.getStepState(taskUUID, step.UUID)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -183,10 +192,6 @@ func (e *local) WaitStep(_ context.Context, step *types.Step, taskUUID string) (
return nil, errors.New("exec: step command not set up") return nil, errors.New("exec: step command not set up")
} }
stepState := &types.State{
Exited: true,
}
// normally we use cmd.Wait() to wait for *exec.Cmd, but cmd.StdoutPipe() tells us not // normally we use cmd.Wait() to wait for *exec.Cmd, but cmd.StdoutPipe() tells us not
// as Wait() would close the io pipe even if not all logs where read and send back // as Wait() would close the io pipe even if not all logs where read and send back
// so we have to do use the underlying functions // so we have to do use the underlying functions

View File

@@ -16,11 +16,14 @@ package types
// State defines a container state. // State defines a container state.
type State struct { type State struct {
// Unix start time
Started int64 `json:"started"`
// Container exit code // Container exit code
ExitCode int `json:"exit_code"` ExitCode int `json:"exit_code"`
// Container exited, true or false // Container exited, true or false
Exited bool `json:"exited"` Exited bool `json:"exited"`
// Container is oom killed, true or false // Container is oom killed, true or false
// TODO (6024): well known errors as string enum into ./errors.go
OOMKilled bool `json:"oom_killed"` OOMKilled bool `json:"oom_killed"`
// Container error // Container error
Error error Error error

View File

@@ -47,18 +47,23 @@ type (
} }
// Current process state. // Current process state.
Process *backend.State Process backend.State
} }
) )
// Runtime is a configuration runtime. // Runtime represents a workflow state executed by a specific backend.
// Each workflow gets its own state configuration at runtime.
type Runtime struct { type Runtime struct {
err error err error
spec *backend.Config spec *backend.Config
engine backend.Backend engine backend.Backend
started int64 started int64
ctx context.Context // The context a workflow is being executed with.
// All normal (non cleanup) operations must use this.
// Cleanup operations should use the runnerCtx passed to Run()
ctx context.Context
tracer Tracer tracer Tracer
logger Logger logger Logger
@@ -122,7 +127,7 @@ func (r *Runtime) Run(runnerCtx context.Context) error {
state := new(State) state := new(State)
state.Pipeline.Step = stepErr.Step state.Pipeline.Step = stepErr.Step
state.Pipeline.Error = stepErr.Err state.Pipeline.Error = stepErr.Err
state.Process = &backend.State{ state.Process = backend.State{
Error: stepErr.Err, Error: stepErr.Err,
Exited: true, Exited: true,
ExitCode: 1, ExitCode: 1,
@@ -143,7 +148,7 @@ func (r *Runtime) Run(runnerCtx context.Context) error {
select { select {
case <-r.ctx.Done(): case <-r.ctx.Done():
return ErrCancel return ErrCancel
case err := <-r.execAll(stage.Steps): case err := <-r.execAll(runnerCtx, stage.Steps):
if err != nil { if err != nil {
r.err = err r.err = err
} }
@@ -154,28 +159,30 @@ func (r *Runtime) Run(runnerCtx context.Context) error {
} }
// Updates the current status of a step. // Updates the current status of a step.
// If processState is nil, we assume the step did not start.
// If step did not started and err exists, it's a step start issue and step is done.
func (r *Runtime) traceStep(processState *backend.State, err error, step *backend.Step) error { func (r *Runtime) traceStep(processState *backend.State, err error, step *backend.Step) error {
if r.tracer == nil { if r.tracer == nil {
// no tracer nothing to trace :) // no tracer nothing to trace :)
return nil return nil
} }
if processState == nil {
processState = new(backend.State)
if err != nil {
processState.Error = err
processState.Exited = true
processState.OOMKilled = false
processState.ExitCode = 126 // command invoked cannot be executed.
}
}
state := new(State) state := new(State)
state.Pipeline.Started = r.started state.Pipeline.Started = r.started
state.Pipeline.Step = step state.Pipeline.Step = step
state.Process = processState // empty
state.Pipeline.Error = r.err state.Pipeline.Error = r.err
// We have an error while starting the step
if processState == nil && err != nil {
state.Process = backend.State{
Error: err,
Exited: true,
OOMKilled: false,
}
} else if processState != nil {
state.Process = *processState
}
if traceErr := r.tracer.Trace(state); traceErr != nil { if traceErr := r.tracer.Trace(state); traceErr != nil {
return traceErr return traceErr
} }
@@ -183,7 +190,7 @@ func (r *Runtime) traceStep(processState *backend.State, err error, step *backen
} }
// Executes a set of parallel steps. // Executes a set of parallel steps.
func (r *Runtime) execAll(steps []*backend.Step) <-chan error { func (r *Runtime) execAll(runnerCtx context.Context, steps []*backend.Step) <-chan error {
var g errgroup.Group var g errgroup.Group
done := make(chan error) done := make(chan error)
logger := r.MakeLogger() logger := r.MakeLogger()
@@ -226,12 +233,17 @@ func (r *Runtime) execAll(steps []*backend.Step) <-chan error {
Str("step", step.Name). Str("step", step.Name).
Msg("executing") Msg("executing")
processState, err := r.exec(step) processState, err := r.exec(runnerCtx, step)
logger.Debug(). logger.Debug().
Str("step", step.Name). Str("step", step.Name).
Msg("complete") Msg("complete")
// normalize context cancel error
if errors.Is(err, context.Canceled) {
err = ErrCancel
}
// Return the error after tracing it. // Return the error after tracing it.
err = r.traceStep(processState, err, step) err = r.traceStep(processState, err, step)
if err != nil && step.Failure == metadata.FailureIgnore { if err != nil && step.Failure == metadata.FailureIgnore {
@@ -245,18 +257,21 @@ func (r *Runtime) execAll(steps []*backend.Step) <-chan error {
done <- g.Wait() done <- g.Wait()
close(done) close(done)
}() }()
return done return done
} }
// Executes the step and returns the state and error. // Executes the step and returns the state and error.
func (r *Runtime) exec(step *backend.Step) (*backend.State, error) { func (r *Runtime) exec(runnerCtx context.Context, step *backend.Step) (*backend.State, error) {
if err := r.engine.StartStep(r.ctx, step, r.taskUUID); err != nil { if err := r.engine.StartStep(r.ctx, step, r.taskUUID); err != nil { //nolint:contextcheck
return nil, err return nil, err
} }
startTime := time.Now().Unix()
logger := r.MakeLogger()
var wg sync.WaitGroup var wg sync.WaitGroup
if r.logger != nil { if r.logger != nil {
rc, err := r.engine.TailStep(r.ctx, step, r.taskUUID) rc, err := r.engine.TailStep(r.ctx, step, r.taskUUID) //nolint:contextcheck
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -264,7 +279,6 @@ func (r *Runtime) exec(step *backend.Step) (*backend.State, error) {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
logger := r.MakeLogger()
if err := r.logger(step, rc); err != nil { if err := r.logger(step, rc); err != nil {
logger.Error().Err(err).Msg("process logging failed") logger.Error().Err(err).Msg("process logging failed")
@@ -281,16 +295,27 @@ func (r *Runtime) exec(step *backend.Step) (*backend.State, error) {
// We wait until all data was logged. (Needed for some backends like local as WaitStep kills the log stream) // We wait until all data was logged. (Needed for some backends like local as WaitStep kills the log stream)
wg.Wait() wg.Wait()
waitState, err := r.engine.WaitStep(r.ctx, step, r.taskUUID) waitState, err := r.engine.WaitStep(r.ctx, step, r.taskUUID) //nolint:contextcheck
if err != nil { if err != nil {
if errors.Is(err, context.Canceled) { if errors.Is(err, context.Canceled) {
return waitState, ErrCancel waitState.Error = ErrCancel
} else {
return nil, err
} }
}
// It is important to use the runnerCtx here because
// in case the workflow was canceled we still have the docker daemon to stop the container.
if err := r.engine.DestroyStep(runnerCtx, step, r.taskUUID); err != nil {
return nil, err return nil, err
} }
if err := r.engine.DestroyStep(r.ctx, step, r.taskUUID); err != nil { // we update with our start time here
return nil, err waitState.Started = startTime
// we handle cancel case
if ctxErr := r.ctx.Err(); ctxErr != nil && errors.Is(ctxErr, context.Canceled) {
waitState.Error = ErrCancel
} }
if waitState.OOMKilled { if waitState.OOMKilled {

View File

@@ -23,26 +23,15 @@ import (
const shutdownTimeout = time.Second * 5 const shutdownTimeout = time.Second * 5
var ( var (
shutdownCtx context.Context shutdownCtx context.Context
shutdownCtxCancel context.CancelFunc shutdownCtxLock sync.Mutex
shutdownCtxLock sync.Mutex
) )
func GetShutdownCtx() context.Context { func GetShutdownCtx() context.Context {
shutdownCtxLock.Lock() shutdownCtxLock.Lock()
defer shutdownCtxLock.Unlock() defer shutdownCtxLock.Unlock()
if shutdownCtx == nil { if shutdownCtx == nil {
shutdownCtx, shutdownCtxCancel = context.WithTimeout(context.Background(), shutdownTimeout) shutdownCtx, _ = context.WithTimeout(context.Background(), shutdownTimeout) //nolint:govet
} }
return shutdownCtx return shutdownCtx
} }
func CancelShutdown() {
shutdownCtxLock.Lock()
defer shutdownCtxLock.Unlock()
if shutdownCtxCancel == nil {
// we create an canceled context
shutdownCtx, shutdownCtxCancel = context.WithCancel(context.Background()) //nolint:forbidigo
}
shutdownCtxCancel()
}

View File

@@ -623,20 +623,29 @@ func (_c *MockPeer_Version_Call) RunAndReturn(run func(c context.Context) (*rpc.
} }
// Wait provides a mock function for the type MockPeer // Wait provides a mock function for the type MockPeer
func (_mock *MockPeer) Wait(c context.Context, workflowID string) error { func (_mock *MockPeer) Wait(c context.Context, workflowID string) (bool, error) {
ret := _mock.Called(c, workflowID) ret := _mock.Called(c, workflowID)
if len(ret) == 0 { if len(ret) == 0 {
panic("no return value specified for Wait") panic("no return value specified for Wait")
} }
var r0 error var r0 bool
if returnFunc, ok := ret.Get(0).(func(context.Context, string) error); ok { var r1 error
if returnFunc, ok := ret.Get(0).(func(context.Context, string) (bool, error)); ok {
return returnFunc(c, workflowID)
}
if returnFunc, ok := ret.Get(0).(func(context.Context, string) bool); ok {
r0 = returnFunc(c, workflowID) r0 = returnFunc(c, workflowID)
} else { } else {
r0 = ret.Error(0) r0 = ret.Get(0).(bool)
} }
return r0 if returnFunc, ok := ret.Get(1).(func(context.Context, string) error); ok {
r1 = returnFunc(c, workflowID)
} else {
r1 = ret.Error(1)
}
return r0, r1
} }
// MockPeer_Wait_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Wait' // MockPeer_Wait_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Wait'
@@ -669,12 +678,12 @@ func (_c *MockPeer_Wait_Call) Run(run func(c context.Context, workflowID string)
return _c return _c
} }
func (_c *MockPeer_Wait_Call) Return(err error) *MockPeer_Wait_Call { func (_c *MockPeer_Wait_Call) Return(canceled bool, err error) *MockPeer_Wait_Call {
_c.Call.Return(err) _c.Call.Return(canceled, err)
return _c return _c
} }
func (_c *MockPeer_Wait_Call) RunAndReturn(run func(c context.Context, workflowID string) error) *MockPeer_Wait_Call { func (_c *MockPeer_Wait_Call) RunAndReturn(run func(c context.Context, workflowID string) (bool, error)) *MockPeer_Wait_Call {
_c.Call.Return(run) _c.Call.Return(run)
return _c return _c
} }

View File

@@ -15,89 +15,291 @@
package rpc package rpc
import ( import "context"
"context"
backend "go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types" // Peer defines the bidirectional communication interface between Woodpecker agents and servers.
) //
// # Architecture and Implementations
type ( //
// Filter defines filters for fetching items from the queue. // The Peer interface is implemented differently on each side of the communication:
Filter struct { //
Labels map[string]string `json:"labels"` // - Agent side: Implemented by agent/rpc/client_grpc.go's client struct, which wraps
} // a gRPC client connection to make RPC calls to the server.
//
// StepState defines the step state. // - Server side: Implemented by server/rpc/rpc.go's RPC struct, which contains the
StepState struct { // business logic and is wrapped by server/rpc/server.go's WoodpeckerServer struct
StepUUID string `json:"step_uuid"` // to handle incoming gRPC requests.
Started int64 `json:"started"` //
Finished int64 `json:"finished"` // # Thread Safety and Concurrency
Exited bool `json:"exited"` //
ExitCode int `json:"exit_code"` // - Implementations must be safe for concurrent calls across different workflows
Error string `json:"error"` // - The same Peer instance may be called concurrently from multiple goroutines
} // - Each workflow is identified by a unique workflowID string
// - Implementations must properly isolate workflow state using workflowID
// WorkflowState defines the workflow state. //
WorkflowState struct { // # Error Handling Conventions
Started int64 `json:"started"` //
Finished int64 `json:"finished"` // - Methods return errors for communication failures, validation errors, or server-side issues
Error string `json:"error"` // - Errors should not be used for business logic
} // - Network/transport errors should be retried by the caller when appropriate
// - Nil error indicates successful operation
// Workflow defines the workflow execution details. // - Context cancellation should return nil or context.Canceled, not a custom error
Workflow struct { // - Business logic errors (e.g., workflow not found) return specific error types
ID string `json:"id"` //
Config *backend.Config `json:"config"` // # Intended Execution Flow
Timeout int64 `json:"timeout"` //
} // 1. Agent Lifecycle:
// - Version() checks compatibility with server
Version struct { // - RegisterAgent() announces agent availability
GrpcVersion int32 `json:"grpc_version,omitempty"` // - ReportHealth() periodically confirms agent is alive
ServerVersion string `json:"server_version,omitempty"` // - UnregisterAgent() gracefully disconnects agent
} //
// 2. Workflow Execution (may happen concurrently for multiple workflows):
// AgentInfo represents all the metadata that should be known about an agent. // - Next() blocks until server assigns a workflow
AgentInfo struct { // - Init() signals workflow execution has started
Version string `json:"version"` // - Wait() (in background goroutine) monitors for cancellation signals
Platform string `json:"platform"` // - Update() reports step state changes as workflow progresses
Backend string `json:"backend"` // - EnqueueLog() streams log output from steps
Capacity int `json:"capacity"` // - Extend() extends workflow timeout if needed so queue does not reschedule it as retry
CustomLabels map[string]string `json:"custom_labels"` // - Done() signals workflow has completed
} //
) // 3. Cancellation Flow:
// - Server can cancel workflow by releasing Wait() with canceled=true
// Peer defines a peer-to-peer connection. // - Agent detects cancellation from Wait() return value
// - Agent stops workflow execution and calls Done() with canceled state
type Peer interface { type Peer interface {
// Version returns the server- & grpc-version // Version returns the server and gRPC protocol version information.
//
// This is typically called once during agent initialization to verify
// compatibility between agent and server versions.
//
// Returns:
// - Version with server version string and gRPC protocol version number
// - Error if communication fails or server is unreachable
Version(c context.Context) (*Version, error) Version(c context.Context) (*Version, error)
// Next returns the next workflow in the queue // Next blocks until the server provides the next workflow to execute from the queue.
//
// This is the primary work-polling mechanism. Agents call this repeatedly in a loop,
// and it blocks until either:
// 1. A workflow matching the filter becomes available
// 2. The context is canceled (agent shutdown, network timeout, etc.)
//
// The filter allows agents to specify capabilities via labels (e.g., platform,
// backend type) so the server only assigns compatible workflows.
//
// Context Handling:
// - This is a long-polling operation that may block for extended periods
// - Implementations MUST check context regularly (not just at entry)
// - When context is canceled, must return nil workflow and nil error
// - Server may send keep-alive signals or periodically return nil to allow reconnection
//
// Returns:
// - Workflow object with ID, Config, and Workflow.Timeout if work is available
// - nil, nil if context is canceled or no work available (retry expected)
// - nil, error if a non-retryable error occurs
Next(c context.Context, f Filter) (*Workflow, error) Next(c context.Context, f Filter) (*Workflow, error)
// Wait blocks until the workflow is complete // Wait blocks until the workflow with the given ID completes or is canceled by the server.
Wait(c context.Context, workflowID string) error //
// This is used by agents to monitor for server-side cancellation signals. Typically
// called in a background goroutine immediately after Init(), running concurrently
// with workflow execution.
//
// The method serves two purposes:
// 1. Signals when server wants to cancel workflow (canceled=true)
// 2. Unblocks when workflow completes normally on agent (canceled=false)
//
// Context Handling:
// - This is a long-running blocking operation for the workflow duration
// - Context cancellation indicates shutdown, not workflow cancellation
// - When context is canceled, should return (false, nil) or (false, ctx.Err())
// - Must not confuse context cancellation with workflow cancellation signal
//
// Cancellation Flow:
// - Server releases Wait() with canceled=true → agent should stop workflow
// - Agent completes workflow normally → Done() is called → server releases Wait() with canceled=false
// - Agent context canceled → Wait() returns immediately, workflow may continue on agent
//
// Returns:
// - canceled=true, err=nil: Server initiated cancellation, agent should stop workflow
// - canceled=false, err=nil: Workflow completed normally (Wait unblocked by Done call)
// - canceled=false, err!=nil: Communication error, agent should retry or handle error
Wait(c context.Context, workflowID string) (canceled bool, err error)
// Init signals the workflow is initialized // Init signals to the server that the workflow has been initialized and execution has started.
//
// This is called once per workflow immediately after the agent accepts it from Next()
// and before starting step execution. It allows the server to track workflow start time
// and update workflow status to "running".
//
// The WorkflowState should have:
// - Started: Unix timestamp when execution began
// - Finished: 0 (not finished yet)
// - Error: empty string (no error yet)
// - Canceled: false (not canceled yet)
//
// Returns:
// - nil on success
// - error if communication fails or server rejects the state
Init(c context.Context, workflowID string, state WorkflowState) error Init(c context.Context, workflowID string, state WorkflowState) error
// Done signals the workflow is complete // Done signals to the server that the workflow has completed execution.
//
// This is called once per workflow after all steps have finished (or workflow was canceled).
// It provides the final workflow state including completion time, any errors, and
// cancellation status.
//
// The WorkflowState should have:
// - Started: Unix timestamp when execution began (same as Init)
// - Finished: Unix timestamp when execution completed
// - Error: Error message if workflow failed, empty if successful
// - Canceled: true if workflow was canceled, false otherwise
//
// After Done() is called:
// - Server updates final workflow status in database
// - Server releases any Wait() calls for this workflow
// - Server removes workflow from active queue
// - Server notifies forge of workflow completion
//
// Context Handling:
// - MUST attempt to complete even if workflow context is canceled
// - Often called with a shutdown/cleanup context rather than workflow context
// - Critical for proper cleanup - should retry on transient failures
//
// Returns:
// - nil on success
// - error if communication fails or server rejects the state
Done(c context.Context, workflowID string, state WorkflowState) error Done(c context.Context, workflowID string, state WorkflowState) error
// Extend extends the workflow deadline // Extend extends the timeout for the workflow with the given ID in the task queue.
//
// Agents must call Extend() regularly (e.g., every constant.TaskTimeout / 3) to signal
// that the workflow is still actively executing and prevent premature timeout.
//
// If agents don't call Extend periodically, the workflow will be rescheduled to a new
// agent after the timeout period expires (specified in constant.TaskTimeout).
//
// This acts as a heartbeat mechanism to detect stuck workflow executions. If an agent
// dies or becomes unresponsive, the server will eventually timeout the workflow and
// reassign it.
//
// IMPORTANT: Don't confuse this with Workflow.Timeout returned by Next() - they serve
// different purposes!
//
// Returns:
// - nil on success (timeout was extended)
// - error if communication fails or workflow is not found
Extend(c context.Context, workflowID string) error Extend(c context.Context, workflowID string) error
// Update updates the step state // Update reports step state changes to the server as the workflow progresses.
//
// This is called multiple times per step:
// 1. When step starts (Exited=false, Finished=0)
// 2. When step completes (Exited=true, Finished and ExitCode set)
// 3. Potentially on progress updates if step has long-running operations
//
// The server uses these updates to:
// - Track step execution progress
// - Update UI with real-time status
// - Store step results in database
// - Calculate workflow completion
//
// Context Handling:
// - Failures should be logged but not block workflow execution
//
// Returns:
// - nil on success
// - error if communication fails or server rejects the state
Update(c context.Context, workflowID string, state StepState) error Update(c context.Context, workflowID string, state StepState) error
// EnqueueLog queues the step log entry for delayed sending // EnqueueLog queues a log entry for delayed batch sending to the server.
//
// Log entries are produced continuously during step execution and need to be
// transmitted efficiently. This method adds logs to an internal queue that
// batches and sends them periodically to reduce network overhead.
//
// The implementation should:
// - Queue the log entry in a memory buffer
// - Batch multiple entries together
// - Send batches periodically (e.g., every second) or when buffer fills
// - Handle backpressure if server is slow or network is congested
//
// Unlike other methods, EnqueueLog:
// - Does NOT take a context parameter (fire-and-forget)
// - Does NOT return an error (never blocks the caller)
// - Does NOT guarantee immediate transmission
//
// Thread Safety:
// - MUST be safe to call concurrently from multiple goroutines
// - May be called concurrently from different steps/workflows
// - Internal queue must be properly synchronized
EnqueueLog(logEntry *LogEntry) EnqueueLog(logEntry *LogEntry)
// RegisterAgent register our agent to the server // RegisterAgent announces this agent to the server and returns an agent ID.
//
// This is called once during agent startup to:
// - Create an agent record in the server database
// - Obtain a unique agent ID for subsequent requests
// - Declare agent capabilities (platform, backend, capacity, labels)
// - Enable server-side agent tracking and monitoring
//
// The AgentInfo should specify:
// - Version: Agent version string (e.g., "v2.0.0")
// - Platform: OS/architecture (e.g., "linux/amd64")
// - Backend: Execution backend (e.g., "docker", "kubernetes")
// - Capacity: Maximum concurrent workflows (e.g., 2)
// - CustomLabels: Additional key-value labels for filtering
//
// Context Handling:
// - Context cancellation indicates agent is aborting startup
// - Should not retry indefinitely - fail fast on persistent errors
//
// Returns:
// - agentID: Unique identifier for this agent (use in subsequent calls)
// - error: If registration fails
RegisterAgent(ctx context.Context, info AgentInfo) (int64, error) RegisterAgent(ctx context.Context, info AgentInfo) (int64, error)
// UnregisterAgent unregister our agent from the server // UnregisterAgent removes this agent from the server's registry.
//
// This is called during graceful agent shutdown to:
// - Mark agent as offline in server database
// - Allow server to stop assigning workflows to this agent
// - Clean up any agent-specific server resources
// - Provide clean shutdown signal to monitoring systems
//
// After UnregisterAgent:
// - Agent should stop calling Next() for new work
// - Agent should complete any in-progress workflows
// - Agent may call Done() to finish existing workflows
// - Agent should close network connections
//
// Context Handling:
// - MUST attempt to complete even during forced shutdown
// - Often called with a shutdown context (limited time)
// - Failure is logged but should not prevent agent exit
//
// Returns:
// - nil on success
// - error if communication fails
UnregisterAgent(ctx context.Context) error UnregisterAgent(ctx context.Context) error
// ReportHealth reports health status of the agent to the server // ReportHealth sends a periodic health status update to the server.
//
// This is called regularly (e.g., every 30 seconds) during agent operation to:
// - Prove agent is still alive and responsive
// - Allow server to detect dead or stuck agents
// - Update agent's "last seen" timestamp in database
// - Provide application-level keepalive beyond network keep-alive signals
//
// Health reporting helps the server:
// - Mark unresponsive agents as offline
// - Redistribute work from dead agents
// - Display accurate agent status in UI
// - Trigger alerts for infrastructure issues
//
// Returns:
// - nil on success
// - error if communication fails
ReportHealth(c context.Context) error ReportHealth(c context.Context) error
} }

View File

@@ -16,4 +16,4 @@ package proto
// Version is the version of the woodpecker.proto file, // Version is the version of the woodpecker.proto file,
// IMPORTANT: increased by 1 each time it get changed. // IMPORTANT: increased by 1 each time it get changed.
const Version int32 = 14 const Version int32 = 15

View File

@@ -44,6 +44,7 @@ type StepState struct {
Exited bool `protobuf:"varint,4,opt,name=exited,proto3" json:"exited,omitempty"` Exited bool `protobuf:"varint,4,opt,name=exited,proto3" json:"exited,omitempty"`
ExitCode int32 `protobuf:"varint,5,opt,name=exit_code,json=exitCode,proto3" json:"exit_code,omitempty"` ExitCode int32 `protobuf:"varint,5,opt,name=exit_code,json=exitCode,proto3" json:"exit_code,omitempty"`
Error string `protobuf:"bytes,6,opt,name=error,proto3" json:"error,omitempty"` Error string `protobuf:"bytes,6,opt,name=error,proto3" json:"error,omitempty"`
Canceled bool `protobuf:"varint,7,opt,name=canceled,proto3" json:"canceled,omitempty"`
unknownFields protoimpl.UnknownFields unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
} }
@@ -120,11 +121,19 @@ func (x *StepState) GetError() string {
return "" return ""
} }
func (x *StepState) GetCanceled() bool {
if x != nil {
return x.Canceled
}
return false
}
type WorkflowState struct { type WorkflowState struct {
state protoimpl.MessageState `protogen:"open.v1"` state protoimpl.MessageState `protogen:"open.v1"`
Started int64 `protobuf:"varint,4,opt,name=started,proto3" json:"started,omitempty"` Started int64 `protobuf:"varint,1,opt,name=started,proto3" json:"started,omitempty"`
Finished int64 `protobuf:"varint,5,opt,name=finished,proto3" json:"finished,omitempty"` Finished int64 `protobuf:"varint,2,opt,name=finished,proto3" json:"finished,omitempty"`
Error string `protobuf:"bytes,6,opt,name=error,proto3" json:"error,omitempty"` Error string `protobuf:"bytes,3,opt,name=error,proto3" json:"error,omitempty"`
Canceled bool `protobuf:"varint,4,opt,name=canceled,proto3" json:"canceled,omitempty"`
unknownFields protoimpl.UnknownFields unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
} }
@@ -180,6 +189,13 @@ func (x *WorkflowState) GetError() string {
return "" return ""
} }
func (x *WorkflowState) GetCanceled() bool {
if x != nil {
return x.Canceled
}
return false
}
type LogEntry struct { type LogEntry struct {
state protoimpl.MessageState `protogen:"open.v1"` state protoimpl.MessageState `protogen:"open.v1"`
StepUuid string `protobuf:"bytes,1,opt,name=step_uuid,json=stepUuid,proto3" json:"step_uuid,omitempty"` StepUuid string `protobuf:"bytes,1,opt,name=step_uuid,json=stepUuid,proto3" json:"step_uuid,omitempty"`
@@ -1032,6 +1048,50 @@ func (x *RegisterAgentResponse) GetAgentId() int64 {
return 0 return 0
} }
type WaitResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
Canceled bool `protobuf:"varint,1,opt,name=canceled,proto3" json:"canceled,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *WaitResponse) Reset() {
*x = WaitResponse{}
mi := &file_woodpecker_proto_msgTypes[19]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *WaitResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*WaitResponse) ProtoMessage() {}
func (x *WaitResponse) ProtoReflect() protoreflect.Message {
mi := &file_woodpecker_proto_msgTypes[19]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use WaitResponse.ProtoReflect.Descriptor instead.
func (*WaitResponse) Descriptor() ([]byte, []int) {
return file_woodpecker_proto_rawDescGZIP(), []int{19}
}
func (x *WaitResponse) GetCanceled() bool {
if x != nil {
return x.Canceled
}
return false
}
type AuthRequest struct { type AuthRequest struct {
state protoimpl.MessageState `protogen:"open.v1"` state protoimpl.MessageState `protogen:"open.v1"`
AgentToken string `protobuf:"bytes,1,opt,name=agent_token,json=agentToken,proto3" json:"agent_token,omitempty"` AgentToken string `protobuf:"bytes,1,opt,name=agent_token,json=agentToken,proto3" json:"agent_token,omitempty"`
@@ -1042,7 +1102,7 @@ type AuthRequest struct {
func (x *AuthRequest) Reset() { func (x *AuthRequest) Reset() {
*x = AuthRequest{} *x = AuthRequest{}
mi := &file_woodpecker_proto_msgTypes[19] mi := &file_woodpecker_proto_msgTypes[20]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
} }
@@ -1054,7 +1114,7 @@ func (x *AuthRequest) String() string {
func (*AuthRequest) ProtoMessage() {} func (*AuthRequest) ProtoMessage() {}
func (x *AuthRequest) ProtoReflect() protoreflect.Message { func (x *AuthRequest) ProtoReflect() protoreflect.Message {
mi := &file_woodpecker_proto_msgTypes[19] mi := &file_woodpecker_proto_msgTypes[20]
if x != nil { if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
@@ -1067,7 +1127,7 @@ func (x *AuthRequest) ProtoReflect() protoreflect.Message {
// Deprecated: Use AuthRequest.ProtoReflect.Descriptor instead. // Deprecated: Use AuthRequest.ProtoReflect.Descriptor instead.
func (*AuthRequest) Descriptor() ([]byte, []int) { func (*AuthRequest) Descriptor() ([]byte, []int) {
return file_woodpecker_proto_rawDescGZIP(), []int{19} return file_woodpecker_proto_rawDescGZIP(), []int{20}
} }
func (x *AuthRequest) GetAgentToken() string { func (x *AuthRequest) GetAgentToken() string {
@@ -1095,7 +1155,7 @@ type AuthResponse struct {
func (x *AuthResponse) Reset() { func (x *AuthResponse) Reset() {
*x = AuthResponse{} *x = AuthResponse{}
mi := &file_woodpecker_proto_msgTypes[20] mi := &file_woodpecker_proto_msgTypes[21]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
} }
@@ -1107,7 +1167,7 @@ func (x *AuthResponse) String() string {
func (*AuthResponse) ProtoMessage() {} func (*AuthResponse) ProtoMessage() {}
func (x *AuthResponse) ProtoReflect() protoreflect.Message { func (x *AuthResponse) ProtoReflect() protoreflect.Message {
mi := &file_woodpecker_proto_msgTypes[20] mi := &file_woodpecker_proto_msgTypes[21]
if x != nil { if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
@@ -1120,7 +1180,7 @@ func (x *AuthResponse) ProtoReflect() protoreflect.Message {
// Deprecated: Use AuthResponse.ProtoReflect.Descriptor instead. // Deprecated: Use AuthResponse.ProtoReflect.Descriptor instead.
func (*AuthResponse) Descriptor() ([]byte, []int) { func (*AuthResponse) Descriptor() ([]byte, []int) {
return file_woodpecker_proto_rawDescGZIP(), []int{20} return file_woodpecker_proto_rawDescGZIP(), []int{21}
} }
func (x *AuthResponse) GetStatus() string { func (x *AuthResponse) GetStatus() string {
@@ -1148,18 +1208,20 @@ var File_woodpecker_proto protoreflect.FileDescriptor
const file_woodpecker_proto_rawDesc = "" + const file_woodpecker_proto_rawDesc = "" +
"\n" + "\n" +
"\x10woodpecker.proto\x12\x05proto\"\xa9\x01\n" + "\x10woodpecker.proto\x12\x05proto\"\xc5\x01\n" +
"\tStepState\x12\x1b\n" + "\tStepState\x12\x1b\n" +
"\tstep_uuid\x18\x01 \x01(\tR\bstepUuid\x12\x18\n" + "\tstep_uuid\x18\x01 \x01(\tR\bstepUuid\x12\x18\n" +
"\astarted\x18\x02 \x01(\x03R\astarted\x12\x1a\n" + "\astarted\x18\x02 \x01(\x03R\astarted\x12\x1a\n" +
"\bfinished\x18\x03 \x01(\x03R\bfinished\x12\x16\n" + "\bfinished\x18\x03 \x01(\x03R\bfinished\x12\x16\n" +
"\x06exited\x18\x04 \x01(\bR\x06exited\x12\x1b\n" + "\x06exited\x18\x04 \x01(\bR\x06exited\x12\x1b\n" +
"\texit_code\x18\x05 \x01(\x05R\bexitCode\x12\x14\n" + "\texit_code\x18\x05 \x01(\x05R\bexitCode\x12\x14\n" +
"\x05error\x18\x06 \x01(\tR\x05error\"[\n" + "\x05error\x18\x06 \x01(\tR\x05error\x12\x1a\n" +
"\bcanceled\x18\a \x01(\bR\bcanceled\"w\n" +
"\rWorkflowState\x12\x18\n" + "\rWorkflowState\x12\x18\n" +
"\astarted\x18\x04 \x01(\x03R\astarted\x12\x1a\n" + "\astarted\x18\x01 \x01(\x03R\astarted\x12\x1a\n" +
"\bfinished\x18\x05 \x01(\x03R\bfinished\x12\x14\n" + "\bfinished\x18\x02 \x01(\x03R\bfinished\x12\x14\n" +
"\x05error\x18\x06 \x01(\tR\x05error\"w\n" + "\x05error\x18\x03 \x01(\tR\x05error\x12\x1a\n" +
"\bcanceled\x18\x04 \x01(\bR\bcanceled\"w\n" +
"\bLogEntry\x12\x1b\n" + "\bLogEntry\x12\x1b\n" +
"\tstep_uuid\x18\x01 \x01(\tR\bstepUuid\x12\x12\n" + "\tstep_uuid\x18\x01 \x01(\tR\bstepUuid\x12\x12\n" +
"\x04time\x18\x02 \x01(\x03R\x04time\x12\x12\n" + "\x04time\x18\x02 \x01(\x03R\x04time\x12\x12\n" +
@@ -1215,7 +1277,9 @@ const file_woodpecker_proto_rawDesc = "" +
"\fNextResponse\x12+\n" + "\fNextResponse\x12+\n" +
"\bworkflow\x18\x01 \x01(\v2\x0f.proto.WorkflowR\bworkflow\"2\n" + "\bworkflow\x18\x01 \x01(\v2\x0f.proto.WorkflowR\bworkflow\"2\n" +
"\x15RegisterAgentResponse\x12\x19\n" + "\x15RegisterAgentResponse\x12\x19\n" +
"\bagent_id\x18\x01 \x01(\x03R\aagentId\"I\n" + "\bagent_id\x18\x01 \x01(\x03R\aagentId\"*\n" +
"\fWaitResponse\x12\x1a\n" +
"\bcanceled\x18\x01 \x01(\bR\bcanceled\"I\n" +
"\vAuthRequest\x12\x1f\n" + "\vAuthRequest\x12\x1f\n" +
"\vagent_token\x18\x01 \x01(\tR\n" + "\vagent_token\x18\x01 \x01(\tR\n" +
"agentToken\x12\x19\n" + "agentToken\x12\x19\n" +
@@ -1223,13 +1287,13 @@ const file_woodpecker_proto_rawDesc = "" +
"\fAuthResponse\x12\x16\n" + "\fAuthResponse\x12\x16\n" +
"\x06status\x18\x01 \x01(\tR\x06status\x12\x19\n" + "\x06status\x18\x01 \x01(\tR\x06status\x12\x19\n" +
"\bagent_id\x18\x02 \x01(\x03R\aagentId\x12!\n" + "\bagent_id\x18\x02 \x01(\x03R\aagentId\x12!\n" +
"\faccess_token\x18\x03 \x01(\tR\vaccessToken2\xbb\x04\n" + "\faccess_token\x18\x03 \x01(\tR\vaccessToken2\xc2\x04\n" +
"\n" + "\n" +
"Woodpecker\x121\n" + "Woodpecker\x121\n" +
"\aVersion\x12\f.proto.Empty\x1a\x16.proto.VersionResponse\"\x00\x121\n" + "\aVersion\x12\f.proto.Empty\x1a\x16.proto.VersionResponse\"\x00\x121\n" +
"\x04Next\x12\x12.proto.NextRequest\x1a\x13.proto.NextResponse\"\x00\x12*\n" + "\x04Next\x12\x12.proto.NextRequest\x1a\x13.proto.NextResponse\"\x00\x12*\n" +
"\x04Init\x12\x12.proto.InitRequest\x1a\f.proto.Empty\"\x00\x12*\n" + "\x04Init\x12\x12.proto.InitRequest\x1a\f.proto.Empty\"\x00\x121\n" +
"\x04Wait\x12\x12.proto.WaitRequest\x1a\f.proto.Empty\"\x00\x12*\n" + "\x04Wait\x12\x12.proto.WaitRequest\x1a\x13.proto.WaitResponse\"\x00\x12*\n" +
"\x04Done\x12\x12.proto.DoneRequest\x1a\f.proto.Empty\"\x00\x12.\n" + "\x04Done\x12\x12.proto.DoneRequest\x1a\f.proto.Empty\"\x00\x12.\n" +
"\x06Extend\x12\x14.proto.ExtendRequest\x1a\f.proto.Empty\"\x00\x12.\n" + "\x06Extend\x12\x14.proto.ExtendRequest\x1a\f.proto.Empty\"\x00\x12.\n" +
"\x06Update\x12\x14.proto.UpdateRequest\x1a\f.proto.Empty\"\x00\x12(\n" + "\x06Update\x12\x14.proto.UpdateRequest\x1a\f.proto.Empty\"\x00\x12(\n" +
@@ -1252,7 +1316,7 @@ func file_woodpecker_proto_rawDescGZIP() []byte {
return file_woodpecker_proto_rawDescData return file_woodpecker_proto_rawDescData
} }
var file_woodpecker_proto_msgTypes = make([]protoimpl.MessageInfo, 23) var file_woodpecker_proto_msgTypes = make([]protoimpl.MessageInfo, 24)
var file_woodpecker_proto_goTypes = []any{ var file_woodpecker_proto_goTypes = []any{
(*StepState)(nil), // 0: proto.StepState (*StepState)(nil), // 0: proto.StepState
(*WorkflowState)(nil), // 1: proto.WorkflowState (*WorkflowState)(nil), // 1: proto.WorkflowState
@@ -1273,19 +1337,20 @@ var file_woodpecker_proto_goTypes = []any{
(*VersionResponse)(nil), // 16: proto.VersionResponse (*VersionResponse)(nil), // 16: proto.VersionResponse
(*NextResponse)(nil), // 17: proto.NextResponse (*NextResponse)(nil), // 17: proto.NextResponse
(*RegisterAgentResponse)(nil), // 18: proto.RegisterAgentResponse (*RegisterAgentResponse)(nil), // 18: proto.RegisterAgentResponse
(*AuthRequest)(nil), // 19: proto.AuthRequest (*WaitResponse)(nil), // 19: proto.WaitResponse
(*AuthResponse)(nil), // 20: proto.AuthResponse (*AuthRequest)(nil), // 20: proto.AuthRequest
nil, // 21: proto.Filter.LabelsEntry (*AuthResponse)(nil), // 21: proto.AuthResponse
nil, // 22: proto.AgentInfo.CustomLabelsEntry nil, // 22: proto.Filter.LabelsEntry
nil, // 23: proto.AgentInfo.CustomLabelsEntry
} }
var file_woodpecker_proto_depIdxs = []int32{ var file_woodpecker_proto_depIdxs = []int32{
21, // 0: proto.Filter.labels:type_name -> proto.Filter.LabelsEntry 22, // 0: proto.Filter.labels:type_name -> proto.Filter.LabelsEntry
3, // 1: proto.NextRequest.filter:type_name -> proto.Filter 3, // 1: proto.NextRequest.filter:type_name -> proto.Filter
1, // 2: proto.InitRequest.state:type_name -> proto.WorkflowState 1, // 2: proto.InitRequest.state:type_name -> proto.WorkflowState
1, // 3: proto.DoneRequest.state:type_name -> proto.WorkflowState 1, // 3: proto.DoneRequest.state:type_name -> proto.WorkflowState
0, // 4: proto.UpdateRequest.state:type_name -> proto.StepState 0, // 4: proto.UpdateRequest.state:type_name -> proto.StepState
2, // 5: proto.LogRequest.logEntries:type_name -> proto.LogEntry 2, // 5: proto.LogRequest.logEntries:type_name -> proto.LogEntry
22, // 6: proto.AgentInfo.customLabels:type_name -> proto.AgentInfo.CustomLabelsEntry 23, // 6: proto.AgentInfo.customLabels:type_name -> proto.AgentInfo.CustomLabelsEntry
14, // 7: proto.RegisterAgentRequest.info:type_name -> proto.AgentInfo 14, // 7: proto.RegisterAgentRequest.info:type_name -> proto.AgentInfo
4, // 8: proto.NextResponse.workflow:type_name -> proto.Workflow 4, // 8: proto.NextResponse.workflow:type_name -> proto.Workflow
12, // 9: proto.Woodpecker.Version:input_type -> proto.Empty 12, // 9: proto.Woodpecker.Version:input_type -> proto.Empty
@@ -1299,11 +1364,11 @@ var file_woodpecker_proto_depIdxs = []int32{
15, // 17: proto.Woodpecker.RegisterAgent:input_type -> proto.RegisterAgentRequest 15, // 17: proto.Woodpecker.RegisterAgent:input_type -> proto.RegisterAgentRequest
12, // 18: proto.Woodpecker.UnregisterAgent:input_type -> proto.Empty 12, // 18: proto.Woodpecker.UnregisterAgent:input_type -> proto.Empty
13, // 19: proto.Woodpecker.ReportHealth:input_type -> proto.ReportHealthRequest 13, // 19: proto.Woodpecker.ReportHealth:input_type -> proto.ReportHealthRequest
19, // 20: proto.WoodpeckerAuth.Auth:input_type -> proto.AuthRequest 20, // 20: proto.WoodpeckerAuth.Auth:input_type -> proto.AuthRequest
16, // 21: proto.Woodpecker.Version:output_type -> proto.VersionResponse 16, // 21: proto.Woodpecker.Version:output_type -> proto.VersionResponse
17, // 22: proto.Woodpecker.Next:output_type -> proto.NextResponse 17, // 22: proto.Woodpecker.Next:output_type -> proto.NextResponse
12, // 23: proto.Woodpecker.Init:output_type -> proto.Empty 12, // 23: proto.Woodpecker.Init:output_type -> proto.Empty
12, // 24: proto.Woodpecker.Wait:output_type -> proto.Empty 19, // 24: proto.Woodpecker.Wait:output_type -> proto.WaitResponse
12, // 25: proto.Woodpecker.Done:output_type -> proto.Empty 12, // 25: proto.Woodpecker.Done:output_type -> proto.Empty
12, // 26: proto.Woodpecker.Extend:output_type -> proto.Empty 12, // 26: proto.Woodpecker.Extend:output_type -> proto.Empty
12, // 27: proto.Woodpecker.Update:output_type -> proto.Empty 12, // 27: proto.Woodpecker.Update:output_type -> proto.Empty
@@ -1311,7 +1376,7 @@ var file_woodpecker_proto_depIdxs = []int32{
18, // 29: proto.Woodpecker.RegisterAgent:output_type -> proto.RegisterAgentResponse 18, // 29: proto.Woodpecker.RegisterAgent:output_type -> proto.RegisterAgentResponse
12, // 30: proto.Woodpecker.UnregisterAgent:output_type -> proto.Empty 12, // 30: proto.Woodpecker.UnregisterAgent:output_type -> proto.Empty
12, // 31: proto.Woodpecker.ReportHealth:output_type -> proto.Empty 12, // 31: proto.Woodpecker.ReportHealth:output_type -> proto.Empty
20, // 32: proto.WoodpeckerAuth.Auth:output_type -> proto.AuthResponse 21, // 32: proto.WoodpeckerAuth.Auth:output_type -> proto.AuthResponse
21, // [21:33] is the sub-list for method output_type 21, // [21:33] is the sub-list for method output_type
9, // [9:21] is the sub-list for method input_type 9, // [9:21] is the sub-list for method input_type
9, // [9:9] is the sub-list for extension type_name 9, // [9:9] is the sub-list for extension type_name
@@ -1330,7 +1395,7 @@ func file_woodpecker_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(), GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: unsafe.Slice(unsafe.StringData(file_woodpecker_proto_rawDesc), len(file_woodpecker_proto_rawDesc)), RawDescriptor: unsafe.Slice(unsafe.StringData(file_woodpecker_proto_rawDesc), len(file_woodpecker_proto_rawDesc)),
NumEnums: 0, NumEnums: 0,
NumMessages: 23, NumMessages: 24,
NumExtensions: 0, NumExtensions: 0,
NumServices: 2, NumServices: 2,
}, },

View File

@@ -27,7 +27,7 @@ service Woodpecker {
rpc Version (Empty) returns (VersionResponse) {} rpc Version (Empty) returns (VersionResponse) {}
rpc Next (NextRequest) returns (NextResponse) {} rpc Next (NextRequest) returns (NextResponse) {}
rpc Init (InitRequest) returns (Empty) {} rpc Init (InitRequest) returns (Empty) {}
rpc Wait (WaitRequest) returns (Empty) {} rpc Wait (WaitRequest) returns (WaitResponse) {}
rpc Done (DoneRequest) returns (Empty) {} rpc Done (DoneRequest) returns (Empty) {}
rpc Extend (ExtendRequest) returns (Empty) {} rpc Extend (ExtendRequest) returns (Empty) {}
rpc Update (UpdateRequest) returns (Empty) {} rpc Update (UpdateRequest) returns (Empty) {}
@@ -48,12 +48,14 @@ message StepState {
bool exited = 4; bool exited = 4;
int32 exit_code = 5; int32 exit_code = 5;
string error = 6; string error = 6;
bool canceled = 7;
} }
message WorkflowState { message WorkflowState {
int64 started = 4; int64 started = 1;
int64 finished = 5; int64 finished = 2;
string error = 6; string error = 3;
bool canceled = 4;
} }
message LogEntry { message LogEntry {
@@ -145,6 +147,10 @@ message RegisterAgentResponse {
int64 agent_id = 1; int64 agent_id = 1;
} }
message WaitResponse {
bool canceled = 1;
};
// Woodpecker auth service is a simple service to authenticate agents and acquire a token // Woodpecker auth service is a simple service to authenticate agents and acquire a token
service WoodpeckerAuth { service WoodpeckerAuth {

View File

@@ -56,7 +56,7 @@ type WoodpeckerClient interface {
Version(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*VersionResponse, error) Version(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*VersionResponse, error)
Next(ctx context.Context, in *NextRequest, opts ...grpc.CallOption) (*NextResponse, error) Next(ctx context.Context, in *NextRequest, opts ...grpc.CallOption) (*NextResponse, error)
Init(ctx context.Context, in *InitRequest, opts ...grpc.CallOption) (*Empty, error) Init(ctx context.Context, in *InitRequest, opts ...grpc.CallOption) (*Empty, error)
Wait(ctx context.Context, in *WaitRequest, opts ...grpc.CallOption) (*Empty, error) Wait(ctx context.Context, in *WaitRequest, opts ...grpc.CallOption) (*WaitResponse, error)
Done(ctx context.Context, in *DoneRequest, opts ...grpc.CallOption) (*Empty, error) Done(ctx context.Context, in *DoneRequest, opts ...grpc.CallOption) (*Empty, error)
Extend(ctx context.Context, in *ExtendRequest, opts ...grpc.CallOption) (*Empty, error) Extend(ctx context.Context, in *ExtendRequest, opts ...grpc.CallOption) (*Empty, error)
Update(ctx context.Context, in *UpdateRequest, opts ...grpc.CallOption) (*Empty, error) Update(ctx context.Context, in *UpdateRequest, opts ...grpc.CallOption) (*Empty, error)
@@ -104,9 +104,9 @@ func (c *woodpeckerClient) Init(ctx context.Context, in *InitRequest, opts ...gr
return out, nil return out, nil
} }
func (c *woodpeckerClient) Wait(ctx context.Context, in *WaitRequest, opts ...grpc.CallOption) (*Empty, error) { func (c *woodpeckerClient) Wait(ctx context.Context, in *WaitRequest, opts ...grpc.CallOption) (*WaitResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(Empty) out := new(WaitResponse)
err := c.cc.Invoke(ctx, Woodpecker_Wait_FullMethodName, in, out, cOpts...) err := c.cc.Invoke(ctx, Woodpecker_Wait_FullMethodName, in, out, cOpts...)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -193,7 +193,7 @@ type WoodpeckerServer interface {
Version(context.Context, *Empty) (*VersionResponse, error) Version(context.Context, *Empty) (*VersionResponse, error)
Next(context.Context, *NextRequest) (*NextResponse, error) Next(context.Context, *NextRequest) (*NextResponse, error)
Init(context.Context, *InitRequest) (*Empty, error) Init(context.Context, *InitRequest) (*Empty, error)
Wait(context.Context, *WaitRequest) (*Empty, error) Wait(context.Context, *WaitRequest) (*WaitResponse, error)
Done(context.Context, *DoneRequest) (*Empty, error) Done(context.Context, *DoneRequest) (*Empty, error)
Extend(context.Context, *ExtendRequest) (*Empty, error) Extend(context.Context, *ExtendRequest) (*Empty, error)
Update(context.Context, *UpdateRequest) (*Empty, error) Update(context.Context, *UpdateRequest) (*Empty, error)
@@ -220,7 +220,7 @@ func (UnimplementedWoodpeckerServer) Next(context.Context, *NextRequest) (*NextR
func (UnimplementedWoodpeckerServer) Init(context.Context, *InitRequest) (*Empty, error) { func (UnimplementedWoodpeckerServer) Init(context.Context, *InitRequest) (*Empty, error) {
return nil, status.Error(codes.Unimplemented, "method Init not implemented") return nil, status.Error(codes.Unimplemented, "method Init not implemented")
} }
func (UnimplementedWoodpeckerServer) Wait(context.Context, *WaitRequest) (*Empty, error) { func (UnimplementedWoodpeckerServer) Wait(context.Context, *WaitRequest) (*WaitResponse, error) {
return nil, status.Error(codes.Unimplemented, "method Wait not implemented") return nil, status.Error(codes.Unimplemented, "method Wait not implemented")
} }
func (UnimplementedWoodpeckerServer) Done(context.Context, *DoneRequest) (*Empty, error) { func (UnimplementedWoodpeckerServer) Done(context.Context, *DoneRequest) (*Empty, error) {

66
rpc/types.go Normal file
View File

@@ -0,0 +1,66 @@
// Copyright 2025 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rpc
import (
backend "go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types"
)
type (
// Filter defines filters for fetching items from the queue.
Filter struct {
Labels map[string]string `json:"labels"`
}
// StepState defines the step state.
StepState struct {
StepUUID string `json:"step_uuid"`
Started int64 `json:"started"`
Finished int64 `json:"finished"`
Exited bool `json:"exited"`
ExitCode int `json:"exit_code"`
Error string `json:"error"`
Canceled bool `json:"canceled"`
}
// WorkflowState defines the workflow state.
WorkflowState struct {
Started int64 `json:"started"`
Finished int64 `json:"finished"`
Error string `json:"error"`
Canceled bool `json:"canceled"`
}
// Workflow defines the workflow execution details.
Workflow struct {
ID string `json:"id"`
Config *backend.Config `json:"config"`
Timeout int64 `json:"timeout"`
}
Version struct {
GrpcVersion int32 `json:"grpc_version,omitempty"`
ServerVersion string `json:"server_version,omitempty"`
}
// AgentInfo represents all the metadata that should be known about an agent.
AgentInfo struct {
Version string `json:"version"`
Platform string `json:"platform"`
Backend string `json:"backend"`
Capacity int `json:"capacity"`
CustomLabels map[string]string `json:"custom_labels"`
}
)

View File

@@ -40,24 +40,13 @@ func Cancel(ctx context.Context, _forge forge.Forge, store store.Store, repo *mo
} }
// First cancel/evict workflows in the queue in one go // First cancel/evict workflows in the queue in one go
var ( var workflowsToCancel []string
workflowsToCancel []string for _, w := range workflows {
workflowsToEvict []string if w.State == model.StatusRunning || w.State == model.StatusPending {
) workflowsToCancel = append(workflowsToCancel, fmt.Sprint(w.ID))
for _, workflow := range workflows {
if workflow.State == model.StatusRunning {
workflowsToCancel = append(workflowsToCancel, fmt.Sprint(workflow.ID))
}
if workflow.State == model.StatusPending {
workflowsToEvict = append(workflowsToEvict, fmt.Sprint(workflow.ID))
} }
} }
if len(workflowsToEvict) != 0 {
if err := server.Config.Services.Queue.ErrorAtOnce(ctx, workflowsToEvict, queue.ErrCancel); err != nil {
log.Error().Err(err).Msgf("queue: evict_at_once: %v", workflowsToEvict)
}
}
if len(workflowsToCancel) != 0 { if len(workflowsToCancel) != 0 {
if err := server.Config.Services.Queue.ErrorAtOnce(ctx, workflowsToCancel, queue.ErrCancel); err != nil { if err := server.Config.Services.Queue.ErrorAtOnce(ctx, workflowsToCancel, queue.ErrCancel); err != nil {
log.Error().Err(err).Msgf("queue: evict_at_once: %v", workflowsToCancel) log.Error().Err(err).Msgf("queue: evict_at_once: %v", workflowsToCancel)

View File

@@ -16,28 +16,76 @@
package pipeline package pipeline
import ( import (
"go.woodpecker-ci.org/woodpecker/v3/pipeline" "fmt"
"time"
"github.com/rs/zerolog/log"
"go.woodpecker-ci.org/woodpecker/v3/rpc" "go.woodpecker-ci.org/woodpecker/v3/rpc"
"go.woodpecker-ci.org/woodpecker/v3/server/model" "go.woodpecker-ci.org/woodpecker/v3/server/model"
"go.woodpecker-ci.org/woodpecker/v3/server/store" "go.woodpecker-ci.org/woodpecker/v3/server/store"
) )
// UpdateStepStatus updates step status based on agent reports via RPC.
func UpdateStepStatus(store store.Store, step *model.Step, state rpc.StepState) error { func UpdateStepStatus(store store.Store, step *model.Step, state rpc.StepState) error {
if state.Exited { log.Debug().Str("StepUUID", step.UUID).Msgf("Update step %#v state %#v", *step, state)
step.Finished = state.Finished
step.ExitCode = state.ExitCode switch step.State {
step.Error = state.Error case model.StatusPending:
step.State = model.StatusSuccess // Transition from pending to running when started
if state.ExitCode != 0 || state.Error != "" { if state.Finished == 0 {
step.State = model.StatusFailure step.State = model.StatusRunning
} }
if state.ExitCode == pipeline.ExitCodeKilled {
step.State = model.StatusKilled
}
} else if step.Finished == 0 {
step.Started = state.Started step.Started = state.Started
step.State = model.StatusRunning if step.Started == 0 {
step.Started = time.Now().Unix()
}
// Handle direct transition to finished if step setup error happened
if state.Exited || state.Error != "" {
step.Finished = state.Finished
if step.Finished == 0 {
step.Finished = time.Now().Unix()
}
step.ExitCode = state.ExitCode
step.Error = state.Error
if state.ExitCode == 0 && state.Error == "" {
step.State = model.StatusSuccess
} else {
step.State = model.StatusFailure
}
}
case model.StatusRunning:
// Already running, check if it finished
if state.Exited || state.Error != "" {
step.Finished = state.Finished
if step.Finished == 0 {
step.Finished = time.Now().Unix()
}
step.ExitCode = state.ExitCode
step.Error = state.Error
if state.ExitCode == 0 && state.Error == "" {
step.State = model.StatusSuccess
} else {
step.State = model.StatusFailure
}
}
default:
return fmt.Errorf("step has state %s and does not expect rpc state updates", step.State)
} }
// Handle cancellation across both cases
if state.Canceled && step.State != model.StatusKilled {
step.State = model.StatusKilled
if step.Finished == 0 {
step.Finished = time.Now().Unix()
}
}
return store.StepUpdate(step) return store.StepUpdate(step)
} }

View File

@@ -1,5 +1,4 @@
// Copyright 2022 Woodpecker Authors // Copyright 2022 Woodpecker Authors
// Copyright 2019 mhmxs
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@@ -34,138 +33,224 @@ func mockStoreStep(t *testing.T) store.Store {
return s return s
} }
func TestUpdateStepStatusNotExited(t *testing.T) { func TestUpdateStepStatus(t *testing.T) {
t.Parallel()
// step in db before update
step := &model.Step{}
// advertised step status
state := rpc.StepState{
Started: int64(42),
Exited: false,
// Dummy data
Finished: int64(1),
ExitCode: pipeline.ExitCodeKilled,
Error: "not an error",
}
err := UpdateStepStatus(mockStoreStep(t), step, state)
assert.NoError(t, err)
assert.EqualValues(t, model.StatusRunning, step.State)
assert.EqualValues(t, 42, step.Started)
assert.EqualValues(t, 0, step.Finished)
assert.EqualValues(t, 0, step.ExitCode)
assert.EqualValues(t, "", step.Error)
}
func TestUpdateStepStatusNotExitedButStopped(t *testing.T) {
t.Parallel() t.Parallel()
// step in db before update t.Run("Pending", func(t *testing.T) {
step := &model.Step{Started: 42, Finished: 64, State: model.StatusKilled} t.Parallel()
// advertised step status t.Run("TransitionToRunning", func(t *testing.T) {
state := rpc.StepState{ t.Parallel()
Exited: false,
// Dummy data
Finished: int64(1),
ExitCode: pipeline.ExitCodeKilled,
Error: "not an error",
}
err := UpdateStepStatus(mockStoreStep(t), step, state) t.Run("WithStartTime", func(t *testing.T) {
assert.NoError(t, err) t.Parallel()
assert.EqualValues(t, model.StatusKilled, step.State) step := &model.Step{State: model.StatusPending}
assert.EqualValues(t, 42, step.Started) state := rpc.StepState{Started: 42, Finished: 0}
assert.EqualValues(t, 64, step.Finished)
assert.EqualValues(t, 0, step.ExitCode)
assert.EqualValues(t, "", step.Error)
}
func TestUpdateStepStatusExited(t *testing.T) { err := UpdateStepStatus(mockStoreStep(t), step, state)
t.Parallel()
// step in db before update assert.NoError(t, err)
step := &model.Step{Started: 42} assert.Equal(t, model.StatusRunning, step.State)
assert.Equal(t, int64(42), step.Started)
assert.Equal(t, int64(0), step.Finished)
})
// advertised step status t.Run("WithoutStartTime", func(t *testing.T) {
state := rpc.StepState{ t.Parallel()
Started: int64(42), step := &model.Step{State: model.StatusPending}
Exited: true, state := rpc.StepState{Started: 0, Finished: 0}
Finished: int64(34),
ExitCode: pipeline.ExitCodeKilled,
Error: "an error",
}
err := UpdateStepStatus(mockStoreStep(t), step, state) err := UpdateStepStatus(mockStoreStep(t), step, state)
assert.NoError(t, err)
assert.EqualValues(t, model.StatusKilled, step.State)
assert.EqualValues(t, 42, step.Started)
assert.EqualValues(t, 34, step.Finished)
assert.EqualValues(t, pipeline.ExitCodeKilled, step.ExitCode)
assert.EqualValues(t, "an error", step.Error)
}
func TestUpdateStepStatusExitedButNot137(t *testing.T) { assert.NoError(t, err)
t.Parallel() assert.Equal(t, model.StatusRunning, step.State)
assert.Greater(t, step.Started, int64(0))
})
})
// step in db before update t.Run("DirectToSuccess", func(t *testing.T) {
step := &model.Step{Started: 42} t.Parallel()
// advertised step status t.Run("WithFinishTime", func(t *testing.T) {
state := rpc.StepState{ t.Parallel()
Started: int64(42), step := &model.Step{State: model.StatusPending}
Exited: true, state := rpc.StepState{Started: 42, Exited: true, Finished: 100, ExitCode: 0, Error: ""}
Finished: int64(34),
Error: "an error",
}
err := UpdateStepStatus(mockStoreStep(t), step, state) err := UpdateStepStatus(mockStoreStep(t), step, state)
assert.NoError(t, err)
assert.EqualValues(t, model.StatusFailure, step.State)
assert.EqualValues(t, 42, step.Started)
assert.EqualValues(t, 34, step.Finished)
assert.EqualValues(t, 0, step.ExitCode)
assert.EqualValues(t, "an error", step.Error)
}
func TestUpdateStepStatusExitedWithCode(t *testing.T) { assert.NoError(t, err)
t.Parallel() assert.Equal(t, model.StatusSuccess, step.State)
assert.Equal(t, int64(42), step.Started)
assert.Equal(t, int64(100), step.Finished)
})
// advertised step status t.Run("WithoutFinishTime", func(t *testing.T) {
state := rpc.StepState{ t.Parallel()
Started: int64(42), step := &model.Step{State: model.StatusPending}
Exited: true, state := rpc.StepState{Started: 42, Exited: true, Finished: 0, ExitCode: 0, Error: ""}
Finished: int64(34),
ExitCode: 1,
Error: "an error",
}
step := &model.Step{}
err := UpdateStepStatus(mockStoreStep(t), step, state)
assert.NoError(t, err)
assert.Equal(t, model.StatusFailure, step.State) err := UpdateStepStatus(mockStoreStep(t), step, state)
assert.Equal(t, 1, step.ExitCode)
assert.NoError(t, err)
assert.Equal(t, model.StatusSuccess, step.State)
assert.Greater(t, step.Finished, int64(0))
})
})
t.Run("DirectToFailure", func(t *testing.T) {
t.Parallel()
t.Run("WithExitCode", func(t *testing.T) {
t.Parallel()
step := &model.Step{State: model.StatusPending}
state := rpc.StepState{Started: 42, Exited: true, Finished: 34, ExitCode: 1, Error: "an error"}
err := UpdateStepStatus(mockStoreStep(t), step, state)
assert.NoError(t, err)
assert.Equal(t, model.StatusFailure, step.State)
assert.Equal(t, 1, step.ExitCode)
assert.Equal(t, "an error", step.Error)
})
})
})
t.Run("Running", func(t *testing.T) {
t.Parallel()
t.Run("ToSuccess", func(t *testing.T) {
t.Parallel()
t.Run("WithFinishTime", func(t *testing.T) {
t.Parallel()
step := &model.Step{State: model.StatusRunning, Started: 42}
state := rpc.StepState{Exited: true, Finished: 100, ExitCode: 0, Error: ""}
err := UpdateStepStatus(mockStoreStep(t), step, state)
assert.NoError(t, err)
assert.Equal(t, model.StatusSuccess, step.State)
assert.Equal(t, int64(100), step.Finished)
})
t.Run("WithoutFinishTime", func(t *testing.T) {
t.Parallel()
step := &model.Step{State: model.StatusRunning, Started: 42}
state := rpc.StepState{Exited: true, Finished: 0, ExitCode: 0, Error: ""}
err := UpdateStepStatus(mockStoreStep(t), step, state)
assert.NoError(t, err)
assert.Equal(t, model.StatusSuccess, step.State)
assert.Greater(t, step.Finished, int64(0))
})
})
t.Run("ToFailure", func(t *testing.T) {
t.Parallel()
t.Run("WithExitCode137", func(t *testing.T) {
t.Parallel()
step := &model.Step{State: model.StatusRunning, Started: 42}
state := rpc.StepState{Exited: true, Finished: 34, ExitCode: pipeline.ExitCodeKilled, Error: "an error"}
err := UpdateStepStatus(mockStoreStep(t), step, state)
assert.NoError(t, err)
assert.Equal(t, model.StatusFailure, step.State)
assert.Equal(t, int64(34), step.Finished)
assert.Equal(t, pipeline.ExitCodeKilled, step.ExitCode)
})
t.Run("WithError", func(t *testing.T) {
t.Parallel()
step := &model.Step{State: model.StatusRunning, Started: 42}
state := rpc.StepState{Exited: true, Finished: 34, ExitCode: 0, Error: "an error"}
err := UpdateStepStatus(mockStoreStep(t), step, state)
assert.NoError(t, err)
assert.Equal(t, model.StatusFailure, step.State)
assert.Equal(t, "an error", step.Error)
})
})
t.Run("StillRunning", func(t *testing.T) {
t.Parallel()
step := &model.Step{State: model.StatusRunning, Started: 42}
state := rpc.StepState{Exited: false, Finished: 0}
err := UpdateStepStatus(mockStoreStep(t), step, state)
assert.NoError(t, err)
assert.Equal(t, model.StatusRunning, step.State)
assert.Equal(t, int64(0), step.Finished)
})
})
t.Run("Canceled", func(t *testing.T) {
t.Parallel()
t.Run("WithoutFinishTime", func(t *testing.T) {
t.Parallel()
step := &model.Step{State: model.StatusRunning, Started: 42}
state := rpc.StepState{Canceled: true}
err := UpdateStepStatus(mockStoreStep(t), step, state)
assert.NoError(t, err)
assert.Equal(t, model.StatusKilled, step.State)
assert.Greater(t, step.Finished, int64(0))
})
t.Run("WithExitedAndFinishTime", func(t *testing.T) {
t.Parallel()
step := &model.Step{State: model.StatusRunning, Started: 42}
state := rpc.StepState{Canceled: true, Exited: true, Finished: 100, ExitCode: 1, Error: "canceled"}
err := UpdateStepStatus(mockStoreStep(t), step, state)
assert.NoError(t, err)
assert.Equal(t, model.StatusKilled, step.State)
assert.Equal(t, int64(100), step.Finished)
assert.Equal(t, 1, step.ExitCode)
assert.Equal(t, "canceled", step.Error)
})
})
t.Run("TerminalState", func(t *testing.T) {
t.Parallel()
step := &model.Step{State: model.StatusKilled, Started: 42, Finished: 64}
state := rpc.StepState{Exited: false}
err := UpdateStepStatus(mocks.NewMockStore(t), step, state)
assert.Error(t, err)
assert.Contains(t, err.Error(), "does not expect rpc state updates")
assert.Equal(t, model.StatusKilled, step.State)
})
} }
func TestUpdateStepToStatusSkipped(t *testing.T) { func TestUpdateStepToStatusSkipped(t *testing.T) {
t.Parallel() t.Parallel()
step, _ := UpdateStepToStatusSkipped(mockStoreStep(t), model.Step{}, int64(1)) t.Run("NotStarted", func(t *testing.T) {
t.Parallel()
assert.Equal(t, model.StatusSkipped, step.State) step, err := UpdateStepToStatusSkipped(mockStoreStep(t), model.Step{}, int64(1))
assert.EqualValues(t, 0, step.Finished)
} assert.NoError(t, err)
assert.Equal(t, model.StatusSkipped, step.State)
func TestUpdateStepToStatusSkippedButStarted(t *testing.T) { assert.Equal(t, int64(0), step.Finished)
t.Parallel() })
step := &model.Step{ t.Run("AlreadyStarted", func(t *testing.T) {
Started: int64(42), t.Parallel()
}
step, err := UpdateStepToStatusSkipped(mockStoreStep(t), model.Step{Started: 42}, int64(100))
step, _ = UpdateStepToStatusSkipped(mockStoreStep(t), *step, int64(1))
assert.NoError(t, err)
assert.Equal(t, model.StatusSuccess, step.State) assert.Equal(t, model.StatusSuccess, step.State)
assert.EqualValues(t, 1, step.Finished) assert.Equal(t, int64(100), step.Finished)
})
} }

View File

@@ -42,5 +42,8 @@ func UpdateWorkflowStatusToDone(store store.Store, workflow model.Workflow, stat
if workflow.Error != "" { if workflow.Error != "" {
workflow.State = model.StatusFailure workflow.State = model.StatusFailure
} }
if state.Canceled {
workflow.State = model.StatusKilled
}
return &workflow, store.WorkflowUpdate(&workflow) return &workflow, store.WorkflowUpdate(&workflow)
} }

View File

@@ -157,6 +157,7 @@ func (q *fifo) finished(ids []string, exitStatus model.StatusValue, err error) e
} }
// Wait waits until the item is done executing. // Wait waits until the item is done executing.
// Also signals via error ErrCancel if workflow got canceled.
func (q *fifo) Wait(ctx context.Context, taskID string) error { func (q *fifo) Wait(ctx context.Context, taskID string) error {
q.Lock() q.Lock()
state := q.running[taskID] state := q.running[taskID]

View File

@@ -17,11 +17,14 @@ package queue
import ( import (
"context" "context"
"errors"
"fmt"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"go.woodpecker-ci.org/woodpecker/v3/server/model" "go.woodpecker-ci.org/woodpecker/v3/server/model"
"go.woodpecker-ci.org/woodpecker/v3/server/store" "go.woodpecker-ci.org/woodpecker/v3/server/store"
"go.woodpecker-ci.org/woodpecker/v3/server/store/types"
) )
// WithTaskStore returns a queue that is backed by the TaskStore. This // WithTaskStore returns a queue that is backed by the TaskStore. This
@@ -77,7 +80,14 @@ func (q *persistentQueue) Error(c context.Context, id string, err error) error {
if err := q.Queue.Error(c, id, err); err != nil { if err := q.Queue.Error(c, id, err); err != nil {
return err return err
} }
return q.store.TaskDelete(id)
if deleteErr := q.store.TaskDelete(id); deleteErr != nil {
if !errors.Is(deleteErr, types.RecordNotExist) {
return deleteErr
}
log.Debug().Msgf("task %s already removed from store", id)
}
return nil
} }
// ErrorAtOnce signals multiple tasks are done and complete with an error. // ErrorAtOnce signals multiple tasks are done and complete with an error.
@@ -86,10 +96,16 @@ func (q *persistentQueue) ErrorAtOnce(c context.Context, ids []string, err error
if err := q.Queue.ErrorAtOnce(c, ids, err); err != nil { if err := q.Queue.ErrorAtOnce(c, ids, err); err != nil {
return err return err
} }
var errs []error
for _, id := range ids { for _, id := range ids {
if err := q.store.TaskDelete(id); err != nil { if deleteErr := q.store.TaskDelete(id); deleteErr != nil && !errors.Is(deleteErr, types.RecordNotExist) {
return err errs = append(errs, fmt.Errorf("task id [%s]: %w", id, deleteErr))
} }
} }
if len(errs) != 0 {
return fmt.Errorf("failed to delete tasks from persistent store: %w", errors.Join(errs...))
}
return nil return nil
} }

View File

@@ -101,6 +101,7 @@ type Queue interface {
ErrorAtOnce(c context.Context, ids []string, err error) error ErrorAtOnce(c context.Context, ids []string, err error) error
// Wait waits until the task is complete. // Wait waits until the task is complete.
// Also signals via error ErrCancel if workflow got canceled.
Wait(c context.Context, id string) error Wait(c context.Context, id string) error
// Info returns internal queue information. // Info returns internal queue information.

View File

@@ -12,6 +12,38 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
// Package grpc provides gRPC server implementation with JWT-based authentication.
//
// # Authentication Flow
//
// Uses a two-token approach:
//
// 1. Agent Token (long-lived): Configured via WOODPECKER_AGENT_SECRET, used only for initial Auth() call
// 2. JWT Access Token (short-lived, 1 hour): Obtained from Auth(), included in metadata["token"] for all service calls
//
// # Interceptor Architecture
//
// Authorizer interceptors validate JWT tokens on every request:
// 1. Extract JWT from metadata["token"]
// 2. Verify signature and expiration
// 3. Extract and add agent_id to metadata for downstream handlers
//
// Auth endpoint (/proto.WoodpeckerAuth/Auth) bypasses validation to allow initial authentication.
//
// # Usage
//
// // Server setup
// jwtManager := NewJWTManager(c.String("grpc-secret"))
// authorizer := NewAuthorizer(jwtManager)
// grpcServer := grpc.NewServer(
// grpc.StreamInterceptor(authorizer.StreamInterceptor),
// grpc.UnaryInterceptor(authorizer.UnaryInterceptor),
// )
//
// // Client usage
// resp, _ := authClient.Auth(ctx, &proto.AuthRequest{AgentToken: "secret", AgentId: -1})
// ctx = metadata.AppendToOutgoingContext(ctx, "token", resp.AccessToken)
// workflow, _ := woodpeckerClient.Next(ctx, &proto.NextRequest{...})
package grpc package grpc
import ( import (
@@ -24,6 +56,7 @@ import (
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
) )
// StreamContextWrapper wraps gRPC ServerStream to allow context modification.
type StreamContextWrapper interface { type StreamContextWrapper interface {
grpc.ServerStream grpc.ServerStream
SetContext(context.Context) SetContext(context.Context)
@@ -50,14 +83,17 @@ func newStreamContextWrapper(inner grpc.ServerStream) StreamContextWrapper {
} }
} }
// Authorizer validates JWT tokens and enriches context with agent information.
type Authorizer struct { type Authorizer struct {
jwtManager *JWTManager jwtManager *JWTManager
} }
// NewAuthorizer creates a new JWT authorizer.
func NewAuthorizer(jwtManager *JWTManager) *Authorizer { func NewAuthorizer(jwtManager *JWTManager) *Authorizer {
return &Authorizer{jwtManager: jwtManager} return &Authorizer{jwtManager: jwtManager}
} }
// StreamInterceptor validates JWT tokens for streaming gRPC calls.
func (a *Authorizer) StreamInterceptor(srv any, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { func (a *Authorizer) StreamInterceptor(srv any, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
_stream := newStreamContextWrapper(stream) _stream := newStreamContextWrapper(stream)
@@ -71,7 +107,8 @@ func (a *Authorizer) StreamInterceptor(srv any, stream grpc.ServerStream, info *
return handler(srv, _stream) return handler(srv, _stream)
} }
func (a *Authorizer) UnaryInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) { // UnaryInterceptor validates JWT tokens for unary gRPC calls.
func (a *Authorizer) UnaryInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
newCtx, err := a.authorize(ctx, info.FullMethod) newCtx, err := a.authorize(ctx, info.FullMethod)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -79,6 +116,8 @@ func (a *Authorizer) UnaryInterceptor(ctx context.Context, req any, info *grpc.U
return handler(newCtx, req) return handler(newCtx, req)
} }
// authorize validates JWT and enriches context with agent_id metadata.
// Bypasses validation for /proto.WoodpeckerAuth/Auth endpoint.
func (a *Authorizer) authorize(ctx context.Context, fullMethod string) (context.Context, error) { func (a *Authorizer) authorize(ctx context.Context, fullMethod string) (context.Context, error) {
// bypass auth for token endpoint // bypass auth for token endpoint
if fullMethod == "/proto.WoodpeckerAuth/Auth" { if fullMethod == "/proto.WoodpeckerAuth/Auth" {

View File

@@ -53,6 +53,7 @@ type RPC struct {
} }
// Next blocks until it provides the next workflow to execute. // Next blocks until it provides the next workflow to execute.
// TODO (6038): Server does not release waiting agents on graceful shutdown.
func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Workflow, error) { func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Workflow, error) {
if hostname, err := s.getHostnameFromContext(c); err == nil { if hostname, err := s.getHostnameFromContext(c); err == nil {
log.Debug().Msgf("agent connected: %s: polling", hostname) log.Debug().Msgf("agent connected: %s: polling", hostname)
@@ -100,18 +101,29 @@ func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Workflow, er
} }
} }
// Wait blocks until the workflow with the given ID is done. // Wait blocks until the workflow with the given ID is completed or got canceled.
func (s *RPC) Wait(c context.Context, workflowID string) error { // Used to let agents wait for cancel signals from server side.
func (s *RPC) Wait(c context.Context, workflowID string) (canceled bool, err error) {
agent, err := s.getAgentFromContext(c) agent, err := s.getAgentFromContext(c)
if err != nil { if err != nil {
return err return false, err
} }
if err := s.checkAgentPermissionByWorkflow(c, agent, workflowID, nil, nil); err != nil { if err := s.checkAgentPermissionByWorkflow(c, agent, workflowID, nil, nil); err != nil {
return err return false, err
} }
return s.queue.Wait(c, workflowID) if err := s.queue.Wait(c, workflowID); err != nil {
if errors.Is(err, queue.ErrCancel) {
// we explicit send a cancel signal
return true, nil
}
// unknown error happened
return false, err
}
// workflow finished and on issues appeared
return false, nil
} }
// Extend extends the lease for the workflow with the given ID. // Extend extends the lease for the workflow with the given ID.
@@ -133,7 +145,7 @@ func (s *RPC) Extend(c context.Context, workflowID string) error {
return s.queue.Extend(c, agent.ID, workflowID) return s.queue.Extend(c, agent.ID, workflowID)
} }
// Update updates the state of a step. // Update let agent updates the step state at the server.
func (s *RPC) Update(c context.Context, strWorkflowID string, state rpc.StepState) error { func (s *RPC) Update(c context.Context, strWorkflowID string, state rpc.StepState) error {
workflowID, err := strconv.ParseInt(strWorkflowID, 10, 64) workflowID, err := strconv.ParseInt(strWorkflowID, 10, 64)
if err != nil { if err != nil {
@@ -213,7 +225,7 @@ func (s *RPC) Update(c context.Context, strWorkflowID string, state rpc.StepStat
return nil return nil
} }
// Init implements the rpc.Init function. // Init signals the workflow is initialized.
func (s *RPC) Init(c context.Context, strWorkflowID string, state rpc.WorkflowState) error { func (s *RPC) Init(c context.Context, strWorkflowID string, state rpc.WorkflowState) error {
workflowID, err := strconv.ParseInt(strWorkflowID, 10, 64) workflowID, err := strconv.ParseInt(strWorkflowID, 10, 64)
if err != nil { if err != nil {
@@ -286,7 +298,7 @@ func (s *RPC) Init(c context.Context, strWorkflowID string, state rpc.WorkflowSt
return s.updateAgentLastWork(agent) return s.updateAgentLastWork(agent)
} }
// Done marks the workflow with the given ID as done. // Done marks the workflow with the given ID as stope.
func (s *RPC) Done(c context.Context, strWorkflowID string, state rpc.WorkflowState) error { func (s *RPC) Done(c context.Context, strWorkflowID string, state rpc.WorkflowState) error {
workflowID, err := strconv.ParseInt(strWorkflowID, 10, 64) workflowID, err := strconv.ParseInt(strWorkflowID, 10, 64)
if err != nil { if err != nil {
@@ -331,20 +343,23 @@ func (s *RPC) Done(c context.Context, strWorkflowID string, state rpc.WorkflowSt
Str("pipeline_id", fmt.Sprint(currentPipeline.ID)). Str("pipeline_id", fmt.Sprint(currentPipeline.ID)).
Str("workflow_id", strWorkflowID).Logger() Str("workflow_id", strWorkflowID).Logger()
logger.Trace().Msgf("gRPC Done with state: %#v", state) logger.Debug().Msgf("workflow state in store: %#v", workflow)
logger.Debug().Msgf("gRPC Done with state: %#v", state)
if workflow, err = pipeline.UpdateWorkflowStatusToDone(s.store, *workflow, state); err != nil { if workflow, err = pipeline.UpdateWorkflowStatusToDone(s.store, *workflow, state); err != nil {
logger.Error().Err(err).Msgf("pipeline.UpdateWorkflowStatusToDone: cannot update workflow state: %s", err) logger.Error().Err(err).Msgf("pipeline.UpdateWorkflowStatusToDone: cannot update workflow state: %s", err)
} }
var queueErr error if !state.Canceled {
if workflow.Failing() { var queueErr error
queueErr = s.queue.Error(c, strWorkflowID, fmt.Errorf("workflow finished with error %s", state.Error)) if workflow.Failing() {
} else { queueErr = s.queue.Error(c, strWorkflowID, fmt.Errorf("workflow finished with error %s", state.Error))
queueErr = s.queue.Done(c, strWorkflowID, workflow.State) } else {
} queueErr = s.queue.Done(c, strWorkflowID, workflow.State)
if queueErr != nil { }
logger.Error().Err(queueErr).Msg("queue.Done: cannot ack workflow") if queueErr != nil {
logger.Error().Err(queueErr).Msg("queue.Done: cannot ack workflow")
}
} }
currentPipeline.Workflows, err = s.store.WorkflowGetTree(currentPipeline) currentPipeline.Workflows, err = s.store.WorkflowGetTree(currentPipeline)

View File

@@ -59,6 +59,7 @@ func NewWoodpeckerServer(queue queue.Queue, logger logging.Log, pubsub *pubsub.P
return &WoodpeckerServer{peer: peer} return &WoodpeckerServer{peer: peer}
} }
// Version returns the server- & grpc-version.
func (s *WoodpeckerServer) Version(_ context.Context, _ *proto.Empty) (*proto.VersionResponse, error) { func (s *WoodpeckerServer) Version(_ context.Context, _ *proto.Empty) (*proto.VersionResponse, error) {
return &proto.VersionResponse{ return &proto.VersionResponse{
GrpcVersion: proto.Version, GrpcVersion: proto.Version,
@@ -66,6 +67,7 @@ func (s *WoodpeckerServer) Version(_ context.Context, _ *proto.Empty) (*proto.Ve
}, nil }, nil
} }
// Next blocks until it provides the next workflow to execute from the queue.
func (s *WoodpeckerServer) Next(c context.Context, req *proto.NextRequest) (*proto.NextResponse, error) { func (s *WoodpeckerServer) Next(c context.Context, req *proto.NextRequest) (*proto.NextResponse, error) {
filter := rpc.Filter{ filter := rpc.Filter{
Labels: req.GetFilter().GetLabels(), Labels: req.GetFilter().GetLabels(),
@@ -85,6 +87,7 @@ func (s *WoodpeckerServer) Next(c context.Context, req *proto.NextRequest) (*pro
return res, err return res, err
} }
// Init let agent signals to server the workflow is initialized.
func (s *WoodpeckerServer) Init(c context.Context, req *proto.InitRequest) (*proto.Empty, error) { func (s *WoodpeckerServer) Init(c context.Context, req *proto.InitRequest) (*proto.Empty, error) {
state := rpc.WorkflowState{ state := rpc.WorkflowState{
Started: req.GetState().GetStarted(), Started: req.GetState().GetStarted(),
@@ -96,6 +99,7 @@ func (s *WoodpeckerServer) Init(c context.Context, req *proto.InitRequest) (*pro
return res, err return res, err
} }
// Update let agent updates the step state at the server.
func (s *WoodpeckerServer) Update(c context.Context, req *proto.UpdateRequest) (*proto.Empty, error) { func (s *WoodpeckerServer) Update(c context.Context, req *proto.UpdateRequest) (*proto.Empty, error) {
state := rpc.StepState{ state := rpc.StepState{
StepUUID: req.GetState().GetStepUuid(), StepUUID: req.GetState().GetStepUuid(),
@@ -104,29 +108,36 @@ func (s *WoodpeckerServer) Update(c context.Context, req *proto.UpdateRequest) (
Exited: req.GetState().GetExited(), Exited: req.GetState().GetExited(),
Error: req.GetState().GetError(), Error: req.GetState().GetError(),
ExitCode: int(req.GetState().GetExitCode()), ExitCode: int(req.GetState().GetExitCode()),
Canceled: req.GetState().GetCanceled(),
} }
res := new(proto.Empty) res := new(proto.Empty)
err := s.peer.Update(c, req.GetId(), state) err := s.peer.Update(c, req.GetId(), state)
return res, err return res, err
} }
// Done let agent signal to server the workflow has stopped.
func (s *WoodpeckerServer) Done(c context.Context, req *proto.DoneRequest) (*proto.Empty, error) { func (s *WoodpeckerServer) Done(c context.Context, req *proto.DoneRequest) (*proto.Empty, error) {
state := rpc.WorkflowState{ state := rpc.WorkflowState{
Started: req.GetState().GetStarted(), Started: req.GetState().GetStarted(),
Finished: req.GetState().GetFinished(), Finished: req.GetState().GetFinished(),
Error: req.GetState().GetError(), Error: req.GetState().GetError(),
Canceled: req.GetState().GetCanceled(),
} }
res := new(proto.Empty) res := new(proto.Empty)
err := s.peer.Done(c, req.GetId(), state) err := s.peer.Done(c, req.GetId(), state)
return res, err return res, err
} }
func (s *WoodpeckerServer) Wait(c context.Context, req *proto.WaitRequest) (*proto.Empty, error) { // Wait blocks until the workflow is complete.
res := new(proto.Empty) // Also signals via err if workflow got canceled.
err := s.peer.Wait(c, req.GetId()) func (s *WoodpeckerServer) Wait(c context.Context, req *proto.WaitRequest) (*proto.WaitResponse, error) {
res := new(proto.WaitResponse)
canceled, err := s.peer.Wait(c, req.GetId())
res.Canceled = canceled
return res, err return res, err
} }
// Extend extends the workflow deadline.
func (s *WoodpeckerServer) Extend(c context.Context, req *proto.ExtendRequest) (*proto.Empty, error) { func (s *WoodpeckerServer) Extend(c context.Context, req *proto.ExtendRequest) (*proto.Empty, error) {
res := new(proto.Empty) res := new(proto.Empty)
err := s.peer.Extend(c, req.GetId()) err := s.peer.Extend(c, req.GetId())
@@ -170,6 +181,7 @@ func (s *WoodpeckerServer) Log(c context.Context, req *proto.LogRequest) (*proto
return res, err return res, err
} }
// RegisterAgent register our agent to the server.
func (s *WoodpeckerServer) RegisterAgent(c context.Context, req *proto.RegisterAgentRequest) (*proto.RegisterAgentResponse, error) { func (s *WoodpeckerServer) RegisterAgent(c context.Context, req *proto.RegisterAgentRequest) (*proto.RegisterAgentResponse, error) {
res := new(proto.RegisterAgentResponse) res := new(proto.RegisterAgentResponse)
agentInfo := req.GetInfo() agentInfo := req.GetInfo()
@@ -184,11 +196,13 @@ func (s *WoodpeckerServer) RegisterAgent(c context.Context, req *proto.RegisterA
return res, err return res, err
} }
// UnregisterAgent unregister our agent from the server.
func (s *WoodpeckerServer) UnregisterAgent(ctx context.Context, _ *proto.Empty) (*proto.Empty, error) { func (s *WoodpeckerServer) UnregisterAgent(ctx context.Context, _ *proto.Empty) (*proto.Empty, error) {
err := s.peer.UnregisterAgent(ctx) err := s.peer.UnregisterAgent(ctx)
return new(proto.Empty), err return new(proto.Empty), err
} }
// ReportHealth reports health status of the agent to the server.
func (s *WoodpeckerServer) ReportHealth(c context.Context, req *proto.ReportHealthRequest) (*proto.Empty, error) { func (s *WoodpeckerServer) ReportHealth(c context.Context, req *proto.ReportHealthRequest) (*proto.Empty, error) {
res := new(proto.Empty) res := new(proto.Empty)
err := s.peer.ReportHealth(c, req.GetStatus()) err := s.peer.ReportHealth(c, req.GetStatus())