diff --git a/agent/logger.go b/agent/logger.go index 32052b1e9..89ced9c71 100644 --- a/agent/logger.go +++ b/agent/logger.go @@ -36,6 +36,7 @@ func (r *Runner) createLogger(_logger zerolog.Logger, uploads *sync.WaitGroup, w Logger() uploads.Add(1) + defer uploads.Done() var secrets []string for _, secret := range workflow.Config.Secrets { @@ -50,8 +51,6 @@ func (r *Runner) createLogger(_logger zerolog.Logger, uploads *sync.WaitGroup, w } logger.Debug().Msg("log stream copied, close ...") - uploads.Done() - return nil } } diff --git a/agent/rpc/client_grpc.go b/agent/rpc/client_grpc.go index 57d1cad9c..e42c23476 100644 --- a/agent/rpc/client_grpc.go +++ b/agent/rpc/client_grpc.go @@ -148,15 +148,16 @@ func (c *client) Next(ctx context.Context, filter rpc.Filter) (*rpc.Workflow, er return w, nil } -// Wait blocks until the workflow is complete. -func (c *client) Wait(ctx context.Context, workflowID string) (err error) { +// Wait blocks until the workflow with the given ID is marked as completed or canceled by the server. +func (c *client) Wait(ctx context.Context, workflowID string) (canceled bool, err error) { retry := c.newBackOff() req := new(proto.WaitRequest) req.Id = workflowID for { - _, err = c.client.Wait(ctx, req) + resp, err := c.client.Wait(ctx, req) if err == nil { - break + // wait block was released normally as expected by server + return resp.GetCanceled(), nil } switch status.Code(err) { @@ -164,10 +165,10 @@ func (c *client) Wait(ctx context.Context, workflowID string) (err error) { if ctx.Err() != nil { // expected as context was canceled log.Debug().Err(err).Msgf("grpc error: wait(): context canceled") - return nil + return false, nil } log.Error().Err(err).Msgf("grpc error: wait(): code: %v", status.Code(err)) - return err + return false, err case codes.Aborted, codes.DataLoss, @@ -178,16 +179,15 @@ func (c *client) Wait(ctx context.Context, workflowID string) (err error) { log.Warn().Err(err).Msgf("grpc error: wait(): code: %v", status.Code(err)) default: log.Error().Err(err).Msgf("grpc error: wait(): code: %v", status.Code(err)) - return err + return false, err } select { case <-time.After(retry.NextBackOff()): case <-ctx.Done(): - return ctx.Err() + return false, ctx.Err() } } - return nil } // Init signals the workflow is initialized. @@ -199,6 +199,7 @@ func (c *client) Init(ctx context.Context, workflowID string, state rpc.Workflow req.State.Started = state.Started req.State.Finished = state.Finished req.State.Error = state.Error + req.State.Canceled = state.Canceled for { _, err = c.client.Init(ctx, req) if err == nil { @@ -238,7 +239,7 @@ func (c *client) Init(ctx context.Context, workflowID string, state rpc.Workflow return nil } -// Done signals the workflow is complete. +// Done let agent signal to server the workflow has stopped. func (c *client) Done(ctx context.Context, workflowID string, state rpc.WorkflowState) (err error) { retry := c.newBackOff() req := new(proto.DoneRequest) @@ -247,6 +248,7 @@ func (c *client) Done(ctx context.Context, workflowID string, state rpc.Workflow req.State.Started = state.Started req.State.Finished = state.Finished req.State.Error = state.Error + req.State.Canceled = state.Canceled for { _, err = c.client.Done(ctx, req) if err == nil { @@ -330,7 +332,7 @@ func (c *client) Extend(ctx context.Context, workflowID string) (err error) { return nil } -// Update updates the workflow state. +// Update let agent updates the step state at the server. func (c *client) Update(ctx context.Context, workflowID string, state rpc.StepState) (err error) { retry := c.newBackOff() req := new(proto.UpdateRequest) @@ -342,6 +344,7 @@ func (c *client) Update(ctx context.Context, workflowID string, state rpc.StepSt req.State.Exited = state.Exited req.State.ExitCode = int32(state.ExitCode) req.State.Error = state.Error + req.State.Canceled = state.Canceled for { _, err = c.client.Update(ctx, req) if err == nil { diff --git a/agent/runner.go b/agent/runner.go index a618dc7ef..9d3e205d5 100644 --- a/agent/runner.go +++ b/agent/runner.go @@ -20,7 +20,6 @@ import ( "errors" "fmt" "sync" - "sync/atomic" "time" "github.com/rs/zerolog/log" @@ -51,6 +50,7 @@ func NewRunner(workEngine rpc.Peer, f rpc.Filter, h string, state *State, backen } } +// Run executes a workflow using a backend, tracks its state and reports the state back to the server. func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { log.Debug().Msg("request next execution") @@ -90,34 +90,32 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { // Workflow execution context. // This context is the SINGLE source of truth for cancellation. - workflowCtx, cancel := context.WithTimeout(ctxMeta, timeout) - defer cancel() + workflowCtx, _ := context.WithTimeout(ctxMeta, timeout) //nolint:govet + workflowCtx, cancelWorkflowCtx := context.WithCancelCause(workflowCtx) + defer cancelWorkflowCtx(nil) - // Handle SIGTERM (k8s, docker, system shutdown) + // Add sigterm support for internal context. + // Required to be able to terminate the running workflow by external signals. workflowCtx = utils.WithContextSigtermCallback(workflowCtx, func() { logger.Error().Msg("received sigterm termination signal") - cancel() + // WithContextSigtermCallback would cancel the context too, but we want our own custom error + cancelWorkflowCtx(pipeline.ErrCancel) }) - // canceled indicates whether the workflow was canceled remotely (UI/API). - // Must be atomic because it is written from a goroutine and read later. - var canceled atomic.Bool - // Listen for remote cancel events (UI / API). // When canceled, we MUST cancel the workflow context - // so that pipeline execution and backend processes stop immediately. + // so that workflow execution stop immediately. go func() { - logger.Debug().Msg("listening for cancel signal") + logger.Debug().Msg("start listening for server side cancel signal") - if err := r.client.Wait(workflowCtx, workflow.ID); err != nil { - logger.Warn().Err(err).Msg("cancel signal received from server") - - // Mark workflow as canceled (thread-safe) - canceled.Store(true) - - // Propagate cancellation to pipeline + backend - cancel() + if canceled, err := r.client.Wait(workflowCtx, workflow.ID); err != nil { + logger.Error().Err(err).Msg("server returned unexpected err while waiting for workflow to finish run") + cancelWorkflowCtx(err) } else { + if canceled { + logger.Debug().Err(err).Msg("server side cancel signal received") + cancelWorkflowCtx(pipeline.ErrCancel) + } // Wait returned without error, meaning the workflow finished normally logger.Debug().Msg("cancel listener exited normally") } @@ -143,9 +141,13 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { state := rpc.WorkflowState{ Started: time.Now().Unix(), } + if err := r.client.Init(runnerCtx, workflow.ID, state); err != nil { - logger.Error().Err(err).Msg("workflow initialization failed") - // TODO: should we return here? + logger.Error().Err(err).Msg("signaling workflow initialization to server failed") + // We have an error, maybe the server is currently unreachable or other server-side errors occurred. + // So let's clean up and end this not yet started workflow run. + cancelWorkflowCtx(err) + return err } var uploads sync.WaitGroup @@ -167,19 +169,18 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { state.Finished = time.Now().Unix() - // Normalize cancellation error - if errors.Is(err, pipeline.ErrCancel) || canceled.Load() { - canceled.Store(true) - err = pipeline.ErrCancel - } - if err != nil { state.Error = err.Error() + if errors.Is(err, pipeline.ErrCancel) { + state.Canceled = true + // cleanup joined error messages + state.Error = pipeline.ErrCancel.Error() + } } logger.Debug(). Str("error", state.Error). - Bool("canceled", canceled.Load()). + Bool("canceled", state.Canceled). Msg("workflow finished") // Ensure all logs/traces are uploaded before finishing @@ -195,6 +196,8 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { if err := r.client.Done(doneCtx, workflow.ID, state); err != nil { logger.Error().Err(err).Msg("failed to update workflow status") + } else { + logger.Debug().Msg("signaling workflow stopped done") } return nil diff --git a/agent/tracer.go b/agent/tracer.go index 2fc7308b4..69a9f6539 100644 --- a/agent/tracer.go +++ b/agent/tracer.go @@ -16,6 +16,7 @@ package agent import ( "context" + "errors" "runtime" "strconv" "sync" @@ -30,6 +31,7 @@ import ( func (r *Runner) createTracer(ctxMeta context.Context, uploads *sync.WaitGroup, logger zerolog.Logger, workflow *rpc.Workflow) pipeline.TraceFunc { return func(state *pipeline.State) error { uploads.Add(1) + defer uploads.Done() stepLogger := logger.With(). Str("image", state.Pipeline.Step.Image). @@ -43,12 +45,15 @@ func (r *Runner) createTracer(ctxMeta context.Context, uploads *sync.WaitGroup, StepUUID: state.Pipeline.Step.UUID, Exited: state.Process.Exited, ExitCode: state.Process.ExitCode, - Started: time.Now().Unix(), // TODO: do not do this - Finished: time.Now().Unix(), + Started: state.Process.Started, + Canceled: errors.Is(state.Process.Error, pipeline.ErrCancel), } if state.Process.Error != nil { stepState.Error = state.Process.Error.Error() } + if state.Process.Exited { + stepState.Finished = time.Now().Unix() + } defer func() { stepLogger.Debug().Msg("update step status") @@ -60,7 +65,6 @@ func (r *Runner) createTracer(ctxMeta context.Context, uploads *sync.WaitGroup, } stepLogger.Debug().Msg("update step status complete") - uploads.Done() }() if state.Process.Exited { return nil diff --git a/pipeline/backend/docker/docker.go b/pipeline/backend/docker/docker.go index d51eb351d..8a48fdd72 100644 --- a/pipeline/backend/docker/docker.go +++ b/pipeline/backend/docker/docker.go @@ -250,29 +250,11 @@ func (e *docker) StartStep(ctx context.Context, step *backend.Step, taskUUID str return e.client.ContainerStart(ctx, containerName, container.StartOptions{}) } -// WaitStep waits for a step container to exit. -// -// When the context is canceled, the container is immediately killed to prevent -// orphaned containers from continuing to run after agent shutdown. func (e *docker) WaitStep(ctx context.Context, step *backend.Step, taskUUID string) (*backend.State, error) { - log := log.Logger.With(). - Str("taskUUID", taskUUID). - Str("stepUUID", step.UUID). - Logger() - + log := log.Logger.With().Str("taskUUID", taskUUID).Str("stepUUID", step.UUID).Logger() log.Trace().Msgf("wait for step %s", step.Name) containerName := toContainerName(step) - done := make(chan struct{}) - - // Ensure container is killed if context is canceled (SIGTERM / pipeline cancel) - go func() { - select { - case <-ctx.Done(): - _ = e.client.ContainerKill(context.Background(), containerName, "9") //nolint:contextcheck - case <-done: - } - }() wait, errC := e.client.ContainerWait(ctx, containerName, "") select { @@ -282,9 +264,6 @@ func (e *docker) WaitStep(ctx context.Context, step *backend.Step, taskUUID stri log.Trace().Msgf("ContainerWait returned with err: %v", err) } - // Stop cancellation watcher - close(done) - info, err := e.client.ContainerInspect(ctx, containerName) if err != nil { return nil, err diff --git a/pipeline/backend/local/local.go b/pipeline/backend/local/local.go index b6c769d0a..252969819 100644 --- a/pipeline/backend/local/local.go +++ b/pipeline/backend/local/local.go @@ -171,9 +171,18 @@ func (e *local) StartStep(ctx context.Context, step *types.Step, taskUUID string } } -func (e *local) WaitStep(_ context.Context, step *types.Step, taskUUID string) (*types.State, error) { +func (e *local) WaitStep(ctx context.Context, step *types.Step, taskUUID string) (*types.State, error) { log.Trace().Str("taskUUID", taskUUID).Msgf("wait for step %s", step.Name) + stepState := &types.State{ + Exited: true, + } + + if err := ctx.Err(); err != nil { + stepState.Error = err + return stepState, nil + } + state, err := e.getStepState(taskUUID, step.UUID) if err != nil { return nil, err @@ -183,10 +192,6 @@ func (e *local) WaitStep(_ context.Context, step *types.Step, taskUUID string) ( return nil, errors.New("exec: step command not set up") } - stepState := &types.State{ - Exited: true, - } - // normally we use cmd.Wait() to wait for *exec.Cmd, but cmd.StdoutPipe() tells us not // as Wait() would close the io pipe even if not all logs where read and send back // so we have to do use the underlying functions diff --git a/pipeline/backend/types/state.go b/pipeline/backend/types/state.go index 3dc8b711b..9f4ff1b33 100644 --- a/pipeline/backend/types/state.go +++ b/pipeline/backend/types/state.go @@ -16,11 +16,14 @@ package types // State defines a container state. type State struct { + // Unix start time + Started int64 `json:"started"` // Container exit code ExitCode int `json:"exit_code"` // Container exited, true or false Exited bool `json:"exited"` // Container is oom killed, true or false + // TODO (6024): well known errors as string enum into ./errors.go OOMKilled bool `json:"oom_killed"` // Container error Error error diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 622917be2..93e8f9467 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -47,18 +47,23 @@ type ( } // Current process state. - Process *backend.State + Process backend.State } ) -// Runtime is a configuration runtime. +// Runtime represents a workflow state executed by a specific backend. +// Each workflow gets its own state configuration at runtime. type Runtime struct { err error spec *backend.Config engine backend.Backend started int64 - ctx context.Context + // The context a workflow is being executed with. + // All normal (non cleanup) operations must use this. + // Cleanup operations should use the runnerCtx passed to Run() + ctx context.Context + tracer Tracer logger Logger @@ -122,7 +127,7 @@ func (r *Runtime) Run(runnerCtx context.Context) error { state := new(State) state.Pipeline.Step = stepErr.Step state.Pipeline.Error = stepErr.Err - state.Process = &backend.State{ + state.Process = backend.State{ Error: stepErr.Err, Exited: true, ExitCode: 1, @@ -143,7 +148,7 @@ func (r *Runtime) Run(runnerCtx context.Context) error { select { case <-r.ctx.Done(): return ErrCancel - case err := <-r.execAll(stage.Steps): + case err := <-r.execAll(runnerCtx, stage.Steps): if err != nil { r.err = err } @@ -154,28 +159,30 @@ func (r *Runtime) Run(runnerCtx context.Context) error { } // Updates the current status of a step. +// If processState is nil, we assume the step did not start. +// If step did not started and err exists, it's a step start issue and step is done. func (r *Runtime) traceStep(processState *backend.State, err error, step *backend.Step) error { if r.tracer == nil { // no tracer nothing to trace :) return nil } - if processState == nil { - processState = new(backend.State) - if err != nil { - processState.Error = err - processState.Exited = true - processState.OOMKilled = false - processState.ExitCode = 126 // command invoked cannot be executed. - } - } - state := new(State) state.Pipeline.Started = r.started state.Pipeline.Step = step - state.Process = processState // empty state.Pipeline.Error = r.err + // We have an error while starting the step + if processState == nil && err != nil { + state.Process = backend.State{ + Error: err, + Exited: true, + OOMKilled: false, + } + } else if processState != nil { + state.Process = *processState + } + if traceErr := r.tracer.Trace(state); traceErr != nil { return traceErr } @@ -183,7 +190,7 @@ func (r *Runtime) traceStep(processState *backend.State, err error, step *backen } // Executes a set of parallel steps. -func (r *Runtime) execAll(steps []*backend.Step) <-chan error { +func (r *Runtime) execAll(runnerCtx context.Context, steps []*backend.Step) <-chan error { var g errgroup.Group done := make(chan error) logger := r.MakeLogger() @@ -226,12 +233,17 @@ func (r *Runtime) execAll(steps []*backend.Step) <-chan error { Str("step", step.Name). Msg("executing") - processState, err := r.exec(step) + processState, err := r.exec(runnerCtx, step) logger.Debug(). Str("step", step.Name). Msg("complete") + // normalize context cancel error + if errors.Is(err, context.Canceled) { + err = ErrCancel + } + // Return the error after tracing it. err = r.traceStep(processState, err, step) if err != nil && step.Failure == metadata.FailureIgnore { @@ -245,18 +257,21 @@ func (r *Runtime) execAll(steps []*backend.Step) <-chan error { done <- g.Wait() close(done) }() + return done } // Executes the step and returns the state and error. -func (r *Runtime) exec(step *backend.Step) (*backend.State, error) { - if err := r.engine.StartStep(r.ctx, step, r.taskUUID); err != nil { +func (r *Runtime) exec(runnerCtx context.Context, step *backend.Step) (*backend.State, error) { + if err := r.engine.StartStep(r.ctx, step, r.taskUUID); err != nil { //nolint:contextcheck return nil, err } + startTime := time.Now().Unix() + logger := r.MakeLogger() var wg sync.WaitGroup if r.logger != nil { - rc, err := r.engine.TailStep(r.ctx, step, r.taskUUID) + rc, err := r.engine.TailStep(r.ctx, step, r.taskUUID) //nolint:contextcheck if err != nil { return nil, err } @@ -264,7 +279,6 @@ func (r *Runtime) exec(step *backend.Step) (*backend.State, error) { wg.Add(1) go func() { defer wg.Done() - logger := r.MakeLogger() if err := r.logger(step, rc); err != nil { logger.Error().Err(err).Msg("process logging failed") @@ -281,16 +295,27 @@ func (r *Runtime) exec(step *backend.Step) (*backend.State, error) { // We wait until all data was logged. (Needed for some backends like local as WaitStep kills the log stream) wg.Wait() - waitState, err := r.engine.WaitStep(r.ctx, step, r.taskUUID) + waitState, err := r.engine.WaitStep(r.ctx, step, r.taskUUID) //nolint:contextcheck if err != nil { if errors.Is(err, context.Canceled) { - return waitState, ErrCancel + waitState.Error = ErrCancel + } else { + return nil, err } + } + + // It is important to use the runnerCtx here because + // in case the workflow was canceled we still have the docker daemon to stop the container. + if err := r.engine.DestroyStep(runnerCtx, step, r.taskUUID); err != nil { return nil, err } - if err := r.engine.DestroyStep(r.ctx, step, r.taskUUID); err != nil { - return nil, err + // we update with our start time here + waitState.Started = startTime + + // we handle cancel case + if ctxErr := r.ctx.Err(); ctxErr != nil && errors.Is(ctxErr, context.Canceled) { + waitState.Error = ErrCancel } if waitState.OOMKilled { diff --git a/pipeline/shutdown.go b/pipeline/shutdown.go index b9fd98384..9479ab971 100644 --- a/pipeline/shutdown.go +++ b/pipeline/shutdown.go @@ -23,26 +23,15 @@ import ( const shutdownTimeout = time.Second * 5 var ( - shutdownCtx context.Context - shutdownCtxCancel context.CancelFunc - shutdownCtxLock sync.Mutex + shutdownCtx context.Context + shutdownCtxLock sync.Mutex ) func GetShutdownCtx() context.Context { shutdownCtxLock.Lock() defer shutdownCtxLock.Unlock() if shutdownCtx == nil { - shutdownCtx, shutdownCtxCancel = context.WithTimeout(context.Background(), shutdownTimeout) + shutdownCtx, _ = context.WithTimeout(context.Background(), shutdownTimeout) //nolint:govet } return shutdownCtx } - -func CancelShutdown() { - shutdownCtxLock.Lock() - defer shutdownCtxLock.Unlock() - if shutdownCtxCancel == nil { - // we create an canceled context - shutdownCtx, shutdownCtxCancel = context.WithCancel(context.Background()) //nolint:forbidigo - } - shutdownCtxCancel() -} diff --git a/rpc/mocks/mock_Peer.go b/rpc/mocks/mock_Peer.go index 1596f5b40..c66458a8c 100644 --- a/rpc/mocks/mock_Peer.go +++ b/rpc/mocks/mock_Peer.go @@ -623,20 +623,29 @@ func (_c *MockPeer_Version_Call) RunAndReturn(run func(c context.Context) (*rpc. } // Wait provides a mock function for the type MockPeer -func (_mock *MockPeer) Wait(c context.Context, workflowID string) error { +func (_mock *MockPeer) Wait(c context.Context, workflowID string) (bool, error) { ret := _mock.Called(c, workflowID) if len(ret) == 0 { panic("no return value specified for Wait") } - var r0 error - if returnFunc, ok := ret.Get(0).(func(context.Context, string) error); ok { + var r0 bool + var r1 error + if returnFunc, ok := ret.Get(0).(func(context.Context, string) (bool, error)); ok { + return returnFunc(c, workflowID) + } + if returnFunc, ok := ret.Get(0).(func(context.Context, string) bool); ok { r0 = returnFunc(c, workflowID) } else { - r0 = ret.Error(0) + r0 = ret.Get(0).(bool) } - return r0 + if returnFunc, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = returnFunc(c, workflowID) + } else { + r1 = ret.Error(1) + } + return r0, r1 } // MockPeer_Wait_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Wait' @@ -669,12 +678,12 @@ func (_c *MockPeer_Wait_Call) Run(run func(c context.Context, workflowID string) return _c } -func (_c *MockPeer_Wait_Call) Return(err error) *MockPeer_Wait_Call { - _c.Call.Return(err) +func (_c *MockPeer_Wait_Call) Return(canceled bool, err error) *MockPeer_Wait_Call { + _c.Call.Return(canceled, err) return _c } -func (_c *MockPeer_Wait_Call) RunAndReturn(run func(c context.Context, workflowID string) error) *MockPeer_Wait_Call { +func (_c *MockPeer_Wait_Call) RunAndReturn(run func(c context.Context, workflowID string) (bool, error)) *MockPeer_Wait_Call { _c.Call.Return(run) return _c } diff --git a/rpc/peer.go b/rpc/peer.go index 6dee992e1..e1568f692 100644 --- a/rpc/peer.go +++ b/rpc/peer.go @@ -15,89 +15,291 @@ package rpc -import ( - "context" +import "context" - backend "go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types" -) - -type ( - // Filter defines filters for fetching items from the queue. - Filter struct { - Labels map[string]string `json:"labels"` - } - - // StepState defines the step state. - StepState struct { - StepUUID string `json:"step_uuid"` - Started int64 `json:"started"` - Finished int64 `json:"finished"` - Exited bool `json:"exited"` - ExitCode int `json:"exit_code"` - Error string `json:"error"` - } - - // WorkflowState defines the workflow state. - WorkflowState struct { - Started int64 `json:"started"` - Finished int64 `json:"finished"` - Error string `json:"error"` - } - - // Workflow defines the workflow execution details. - Workflow struct { - ID string `json:"id"` - Config *backend.Config `json:"config"` - Timeout int64 `json:"timeout"` - } - - Version struct { - GrpcVersion int32 `json:"grpc_version,omitempty"` - ServerVersion string `json:"server_version,omitempty"` - } - - // AgentInfo represents all the metadata that should be known about an agent. - AgentInfo struct { - Version string `json:"version"` - Platform string `json:"platform"` - Backend string `json:"backend"` - Capacity int `json:"capacity"` - CustomLabels map[string]string `json:"custom_labels"` - } -) - -// Peer defines a peer-to-peer connection. +// Peer defines the bidirectional communication interface between Woodpecker agents and servers. +// +// # Architecture and Implementations +// +// The Peer interface is implemented differently on each side of the communication: +// +// - Agent side: Implemented by agent/rpc/client_grpc.go's client struct, which wraps +// a gRPC client connection to make RPC calls to the server. +// +// - Server side: Implemented by server/rpc/rpc.go's RPC struct, which contains the +// business logic and is wrapped by server/rpc/server.go's WoodpeckerServer struct +// to handle incoming gRPC requests. +// +// # Thread Safety and Concurrency +// +// - Implementations must be safe for concurrent calls across different workflows +// - The same Peer instance may be called concurrently from multiple goroutines +// - Each workflow is identified by a unique workflowID string +// - Implementations must properly isolate workflow state using workflowID +// +// # Error Handling Conventions +// +// - Methods return errors for communication failures, validation errors, or server-side issues +// - Errors should not be used for business logic +// - Network/transport errors should be retried by the caller when appropriate +// - Nil error indicates successful operation +// - Context cancellation should return nil or context.Canceled, not a custom error +// - Business logic errors (e.g., workflow not found) return specific error types +// +// # Intended Execution Flow +// +// 1. Agent Lifecycle: +// - Version() checks compatibility with server +// - RegisterAgent() announces agent availability +// - ReportHealth() periodically confirms agent is alive +// - UnregisterAgent() gracefully disconnects agent +// +// 2. Workflow Execution (may happen concurrently for multiple workflows): +// - Next() blocks until server assigns a workflow +// - Init() signals workflow execution has started +// - Wait() (in background goroutine) monitors for cancellation signals +// - Update() reports step state changes as workflow progresses +// - EnqueueLog() streams log output from steps +// - Extend() extends workflow timeout if needed so queue does not reschedule it as retry +// - Done() signals workflow has completed +// +// 3. Cancellation Flow: +// - Server can cancel workflow by releasing Wait() with canceled=true +// - Agent detects cancellation from Wait() return value +// - Agent stops workflow execution and calls Done() with canceled state type Peer interface { - // Version returns the server- & grpc-version + // Version returns the server and gRPC protocol version information. + // + // This is typically called once during agent initialization to verify + // compatibility between agent and server versions. + // + // Returns: + // - Version with server version string and gRPC protocol version number + // - Error if communication fails or server is unreachable Version(c context.Context) (*Version, error) - // Next returns the next workflow in the queue + // Next blocks until the server provides the next workflow to execute from the queue. + // + // This is the primary work-polling mechanism. Agents call this repeatedly in a loop, + // and it blocks until either: + // 1. A workflow matching the filter becomes available + // 2. The context is canceled (agent shutdown, network timeout, etc.) + // + // The filter allows agents to specify capabilities via labels (e.g., platform, + // backend type) so the server only assigns compatible workflows. + // + // Context Handling: + // - This is a long-polling operation that may block for extended periods + // - Implementations MUST check context regularly (not just at entry) + // - When context is canceled, must return nil workflow and nil error + // - Server may send keep-alive signals or periodically return nil to allow reconnection + // + // Returns: + // - Workflow object with ID, Config, and Workflow.Timeout if work is available + // - nil, nil if context is canceled or no work available (retry expected) + // - nil, error if a non-retryable error occurs Next(c context.Context, f Filter) (*Workflow, error) - // Wait blocks until the workflow is complete - Wait(c context.Context, workflowID string) error + // Wait blocks until the workflow with the given ID completes or is canceled by the server. + // + // This is used by agents to monitor for server-side cancellation signals. Typically + // called in a background goroutine immediately after Init(), running concurrently + // with workflow execution. + // + // The method serves two purposes: + // 1. Signals when server wants to cancel workflow (canceled=true) + // 2. Unblocks when workflow completes normally on agent (canceled=false) + // + // Context Handling: + // - This is a long-running blocking operation for the workflow duration + // - Context cancellation indicates shutdown, not workflow cancellation + // - When context is canceled, should return (false, nil) or (false, ctx.Err()) + // - Must not confuse context cancellation with workflow cancellation signal + // + // Cancellation Flow: + // - Server releases Wait() with canceled=true → agent should stop workflow + // - Agent completes workflow normally → Done() is called → server releases Wait() with canceled=false + // - Agent context canceled → Wait() returns immediately, workflow may continue on agent + // + // Returns: + // - canceled=true, err=nil: Server initiated cancellation, agent should stop workflow + // - canceled=false, err=nil: Workflow completed normally (Wait unblocked by Done call) + // - canceled=false, err!=nil: Communication error, agent should retry or handle error + Wait(c context.Context, workflowID string) (canceled bool, err error) - // Init signals the workflow is initialized + // Init signals to the server that the workflow has been initialized and execution has started. + // + // This is called once per workflow immediately after the agent accepts it from Next() + // and before starting step execution. It allows the server to track workflow start time + // and update workflow status to "running". + // + // The WorkflowState should have: + // - Started: Unix timestamp when execution began + // - Finished: 0 (not finished yet) + // - Error: empty string (no error yet) + // - Canceled: false (not canceled yet) + // + // Returns: + // - nil on success + // - error if communication fails or server rejects the state Init(c context.Context, workflowID string, state WorkflowState) error - // Done signals the workflow is complete + // Done signals to the server that the workflow has completed execution. + // + // This is called once per workflow after all steps have finished (or workflow was canceled). + // It provides the final workflow state including completion time, any errors, and + // cancellation status. + // + // The WorkflowState should have: + // - Started: Unix timestamp when execution began (same as Init) + // - Finished: Unix timestamp when execution completed + // - Error: Error message if workflow failed, empty if successful + // - Canceled: true if workflow was canceled, false otherwise + // + // After Done() is called: + // - Server updates final workflow status in database + // - Server releases any Wait() calls for this workflow + // - Server removes workflow from active queue + // - Server notifies forge of workflow completion + // + // Context Handling: + // - MUST attempt to complete even if workflow context is canceled + // - Often called with a shutdown/cleanup context rather than workflow context + // - Critical for proper cleanup - should retry on transient failures + // + // Returns: + // - nil on success + // - error if communication fails or server rejects the state Done(c context.Context, workflowID string, state WorkflowState) error - // Extend extends the workflow deadline + // Extend extends the timeout for the workflow with the given ID in the task queue. + // + // Agents must call Extend() regularly (e.g., every constant.TaskTimeout / 3) to signal + // that the workflow is still actively executing and prevent premature timeout. + // + // If agents don't call Extend periodically, the workflow will be rescheduled to a new + // agent after the timeout period expires (specified in constant.TaskTimeout). + // + // This acts as a heartbeat mechanism to detect stuck workflow executions. If an agent + // dies or becomes unresponsive, the server will eventually timeout the workflow and + // reassign it. + // + // IMPORTANT: Don't confuse this with Workflow.Timeout returned by Next() - they serve + // different purposes! + // + // Returns: + // - nil on success (timeout was extended) + // - error if communication fails or workflow is not found Extend(c context.Context, workflowID string) error - // Update updates the step state + // Update reports step state changes to the server as the workflow progresses. + // + // This is called multiple times per step: + // 1. When step starts (Exited=false, Finished=0) + // 2. When step completes (Exited=true, Finished and ExitCode set) + // 3. Potentially on progress updates if step has long-running operations + // + // The server uses these updates to: + // - Track step execution progress + // - Update UI with real-time status + // - Store step results in database + // - Calculate workflow completion + // + // Context Handling: + // - Failures should be logged but not block workflow execution + // + // Returns: + // - nil on success + // - error if communication fails or server rejects the state Update(c context.Context, workflowID string, state StepState) error - // EnqueueLog queues the step log entry for delayed sending + // EnqueueLog queues a log entry for delayed batch sending to the server. + // + // Log entries are produced continuously during step execution and need to be + // transmitted efficiently. This method adds logs to an internal queue that + // batches and sends them periodically to reduce network overhead. + // + // The implementation should: + // - Queue the log entry in a memory buffer + // - Batch multiple entries together + // - Send batches periodically (e.g., every second) or when buffer fills + // - Handle backpressure if server is slow or network is congested + // + // Unlike other methods, EnqueueLog: + // - Does NOT take a context parameter (fire-and-forget) + // - Does NOT return an error (never blocks the caller) + // - Does NOT guarantee immediate transmission + // + // Thread Safety: + // - MUST be safe to call concurrently from multiple goroutines + // - May be called concurrently from different steps/workflows + // - Internal queue must be properly synchronized EnqueueLog(logEntry *LogEntry) - // RegisterAgent register our agent to the server + // RegisterAgent announces this agent to the server and returns an agent ID. + // + // This is called once during agent startup to: + // - Create an agent record in the server database + // - Obtain a unique agent ID for subsequent requests + // - Declare agent capabilities (platform, backend, capacity, labels) + // - Enable server-side agent tracking and monitoring + // + // The AgentInfo should specify: + // - Version: Agent version string (e.g., "v2.0.0") + // - Platform: OS/architecture (e.g., "linux/amd64") + // - Backend: Execution backend (e.g., "docker", "kubernetes") + // - Capacity: Maximum concurrent workflows (e.g., 2) + // - CustomLabels: Additional key-value labels for filtering + // + // Context Handling: + // - Context cancellation indicates agent is aborting startup + // - Should not retry indefinitely - fail fast on persistent errors + // + // Returns: + // - agentID: Unique identifier for this agent (use in subsequent calls) + // - error: If registration fails RegisterAgent(ctx context.Context, info AgentInfo) (int64, error) - // UnregisterAgent unregister our agent from the server + // UnregisterAgent removes this agent from the server's registry. + // + // This is called during graceful agent shutdown to: + // - Mark agent as offline in server database + // - Allow server to stop assigning workflows to this agent + // - Clean up any agent-specific server resources + // - Provide clean shutdown signal to monitoring systems + // + // After UnregisterAgent: + // - Agent should stop calling Next() for new work + // - Agent should complete any in-progress workflows + // - Agent may call Done() to finish existing workflows + // - Agent should close network connections + // + // Context Handling: + // - MUST attempt to complete even during forced shutdown + // - Often called with a shutdown context (limited time) + // - Failure is logged but should not prevent agent exit + // + // Returns: + // - nil on success + // - error if communication fails UnregisterAgent(ctx context.Context) error - // ReportHealth reports health status of the agent to the server + // ReportHealth sends a periodic health status update to the server. + // + // This is called regularly (e.g., every 30 seconds) during agent operation to: + // - Prove agent is still alive and responsive + // - Allow server to detect dead or stuck agents + // - Update agent's "last seen" timestamp in database + // - Provide application-level keepalive beyond network keep-alive signals + // + // Health reporting helps the server: + // - Mark unresponsive agents as offline + // - Redistribute work from dead agents + // - Display accurate agent status in UI + // - Trigger alerts for infrastructure issues + // + // Returns: + // - nil on success + // - error if communication fails ReportHealth(c context.Context) error } diff --git a/rpc/proto/version.go b/rpc/proto/version.go index 20e1c9513..ee63ab7cd 100644 --- a/rpc/proto/version.go +++ b/rpc/proto/version.go @@ -16,4 +16,4 @@ package proto // Version is the version of the woodpecker.proto file, // IMPORTANT: increased by 1 each time it get changed. -const Version int32 = 14 +const Version int32 = 15 diff --git a/rpc/proto/woodpecker.pb.go b/rpc/proto/woodpecker.pb.go index b13b8b3ab..f5126303d 100644 --- a/rpc/proto/woodpecker.pb.go +++ b/rpc/proto/woodpecker.pb.go @@ -44,6 +44,7 @@ type StepState struct { Exited bool `protobuf:"varint,4,opt,name=exited,proto3" json:"exited,omitempty"` ExitCode int32 `protobuf:"varint,5,opt,name=exit_code,json=exitCode,proto3" json:"exit_code,omitempty"` Error string `protobuf:"bytes,6,opt,name=error,proto3" json:"error,omitempty"` + Canceled bool `protobuf:"varint,7,opt,name=canceled,proto3" json:"canceled,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -120,11 +121,19 @@ func (x *StepState) GetError() string { return "" } +func (x *StepState) GetCanceled() bool { + if x != nil { + return x.Canceled + } + return false +} + type WorkflowState struct { state protoimpl.MessageState `protogen:"open.v1"` - Started int64 `protobuf:"varint,4,opt,name=started,proto3" json:"started,omitempty"` - Finished int64 `protobuf:"varint,5,opt,name=finished,proto3" json:"finished,omitempty"` - Error string `protobuf:"bytes,6,opt,name=error,proto3" json:"error,omitempty"` + Started int64 `protobuf:"varint,1,opt,name=started,proto3" json:"started,omitempty"` + Finished int64 `protobuf:"varint,2,opt,name=finished,proto3" json:"finished,omitempty"` + Error string `protobuf:"bytes,3,opt,name=error,proto3" json:"error,omitempty"` + Canceled bool `protobuf:"varint,4,opt,name=canceled,proto3" json:"canceled,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -180,6 +189,13 @@ func (x *WorkflowState) GetError() string { return "" } +func (x *WorkflowState) GetCanceled() bool { + if x != nil { + return x.Canceled + } + return false +} + type LogEntry struct { state protoimpl.MessageState `protogen:"open.v1"` StepUuid string `protobuf:"bytes,1,opt,name=step_uuid,json=stepUuid,proto3" json:"step_uuid,omitempty"` @@ -1032,6 +1048,50 @@ func (x *RegisterAgentResponse) GetAgentId() int64 { return 0 } +type WaitResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Canceled bool `protobuf:"varint,1,opt,name=canceled,proto3" json:"canceled,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *WaitResponse) Reset() { + *x = WaitResponse{} + mi := &file_woodpecker_proto_msgTypes[19] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *WaitResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WaitResponse) ProtoMessage() {} + +func (x *WaitResponse) ProtoReflect() protoreflect.Message { + mi := &file_woodpecker_proto_msgTypes[19] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WaitResponse.ProtoReflect.Descriptor instead. +func (*WaitResponse) Descriptor() ([]byte, []int) { + return file_woodpecker_proto_rawDescGZIP(), []int{19} +} + +func (x *WaitResponse) GetCanceled() bool { + if x != nil { + return x.Canceled + } + return false +} + type AuthRequest struct { state protoimpl.MessageState `protogen:"open.v1"` AgentToken string `protobuf:"bytes,1,opt,name=agent_token,json=agentToken,proto3" json:"agent_token,omitempty"` @@ -1042,7 +1102,7 @@ type AuthRequest struct { func (x *AuthRequest) Reset() { *x = AuthRequest{} - mi := &file_woodpecker_proto_msgTypes[19] + mi := &file_woodpecker_proto_msgTypes[20] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1054,7 +1114,7 @@ func (x *AuthRequest) String() string { func (*AuthRequest) ProtoMessage() {} func (x *AuthRequest) ProtoReflect() protoreflect.Message { - mi := &file_woodpecker_proto_msgTypes[19] + mi := &file_woodpecker_proto_msgTypes[20] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1067,7 +1127,7 @@ func (x *AuthRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use AuthRequest.ProtoReflect.Descriptor instead. func (*AuthRequest) Descriptor() ([]byte, []int) { - return file_woodpecker_proto_rawDescGZIP(), []int{19} + return file_woodpecker_proto_rawDescGZIP(), []int{20} } func (x *AuthRequest) GetAgentToken() string { @@ -1095,7 +1155,7 @@ type AuthResponse struct { func (x *AuthResponse) Reset() { *x = AuthResponse{} - mi := &file_woodpecker_proto_msgTypes[20] + mi := &file_woodpecker_proto_msgTypes[21] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1107,7 +1167,7 @@ func (x *AuthResponse) String() string { func (*AuthResponse) ProtoMessage() {} func (x *AuthResponse) ProtoReflect() protoreflect.Message { - mi := &file_woodpecker_proto_msgTypes[20] + mi := &file_woodpecker_proto_msgTypes[21] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1120,7 +1180,7 @@ func (x *AuthResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use AuthResponse.ProtoReflect.Descriptor instead. func (*AuthResponse) Descriptor() ([]byte, []int) { - return file_woodpecker_proto_rawDescGZIP(), []int{20} + return file_woodpecker_proto_rawDescGZIP(), []int{21} } func (x *AuthResponse) GetStatus() string { @@ -1148,18 +1208,20 @@ var File_woodpecker_proto protoreflect.FileDescriptor const file_woodpecker_proto_rawDesc = "" + "\n" + - "\x10woodpecker.proto\x12\x05proto\"\xa9\x01\n" + + "\x10woodpecker.proto\x12\x05proto\"\xc5\x01\n" + "\tStepState\x12\x1b\n" + "\tstep_uuid\x18\x01 \x01(\tR\bstepUuid\x12\x18\n" + "\astarted\x18\x02 \x01(\x03R\astarted\x12\x1a\n" + "\bfinished\x18\x03 \x01(\x03R\bfinished\x12\x16\n" + "\x06exited\x18\x04 \x01(\bR\x06exited\x12\x1b\n" + "\texit_code\x18\x05 \x01(\x05R\bexitCode\x12\x14\n" + - "\x05error\x18\x06 \x01(\tR\x05error\"[\n" + + "\x05error\x18\x06 \x01(\tR\x05error\x12\x1a\n" + + "\bcanceled\x18\a \x01(\bR\bcanceled\"w\n" + "\rWorkflowState\x12\x18\n" + - "\astarted\x18\x04 \x01(\x03R\astarted\x12\x1a\n" + - "\bfinished\x18\x05 \x01(\x03R\bfinished\x12\x14\n" + - "\x05error\x18\x06 \x01(\tR\x05error\"w\n" + + "\astarted\x18\x01 \x01(\x03R\astarted\x12\x1a\n" + + "\bfinished\x18\x02 \x01(\x03R\bfinished\x12\x14\n" + + "\x05error\x18\x03 \x01(\tR\x05error\x12\x1a\n" + + "\bcanceled\x18\x04 \x01(\bR\bcanceled\"w\n" + "\bLogEntry\x12\x1b\n" + "\tstep_uuid\x18\x01 \x01(\tR\bstepUuid\x12\x12\n" + "\x04time\x18\x02 \x01(\x03R\x04time\x12\x12\n" + @@ -1215,7 +1277,9 @@ const file_woodpecker_proto_rawDesc = "" + "\fNextResponse\x12+\n" + "\bworkflow\x18\x01 \x01(\v2\x0f.proto.WorkflowR\bworkflow\"2\n" + "\x15RegisterAgentResponse\x12\x19\n" + - "\bagent_id\x18\x01 \x01(\x03R\aagentId\"I\n" + + "\bagent_id\x18\x01 \x01(\x03R\aagentId\"*\n" + + "\fWaitResponse\x12\x1a\n" + + "\bcanceled\x18\x01 \x01(\bR\bcanceled\"I\n" + "\vAuthRequest\x12\x1f\n" + "\vagent_token\x18\x01 \x01(\tR\n" + "agentToken\x12\x19\n" + @@ -1223,13 +1287,13 @@ const file_woodpecker_proto_rawDesc = "" + "\fAuthResponse\x12\x16\n" + "\x06status\x18\x01 \x01(\tR\x06status\x12\x19\n" + "\bagent_id\x18\x02 \x01(\x03R\aagentId\x12!\n" + - "\faccess_token\x18\x03 \x01(\tR\vaccessToken2\xbb\x04\n" + + "\faccess_token\x18\x03 \x01(\tR\vaccessToken2\xc2\x04\n" + "\n" + "Woodpecker\x121\n" + "\aVersion\x12\f.proto.Empty\x1a\x16.proto.VersionResponse\"\x00\x121\n" + "\x04Next\x12\x12.proto.NextRequest\x1a\x13.proto.NextResponse\"\x00\x12*\n" + - "\x04Init\x12\x12.proto.InitRequest\x1a\f.proto.Empty\"\x00\x12*\n" + - "\x04Wait\x12\x12.proto.WaitRequest\x1a\f.proto.Empty\"\x00\x12*\n" + + "\x04Init\x12\x12.proto.InitRequest\x1a\f.proto.Empty\"\x00\x121\n" + + "\x04Wait\x12\x12.proto.WaitRequest\x1a\x13.proto.WaitResponse\"\x00\x12*\n" + "\x04Done\x12\x12.proto.DoneRequest\x1a\f.proto.Empty\"\x00\x12.\n" + "\x06Extend\x12\x14.proto.ExtendRequest\x1a\f.proto.Empty\"\x00\x12.\n" + "\x06Update\x12\x14.proto.UpdateRequest\x1a\f.proto.Empty\"\x00\x12(\n" + @@ -1252,7 +1316,7 @@ func file_woodpecker_proto_rawDescGZIP() []byte { return file_woodpecker_proto_rawDescData } -var file_woodpecker_proto_msgTypes = make([]protoimpl.MessageInfo, 23) +var file_woodpecker_proto_msgTypes = make([]protoimpl.MessageInfo, 24) var file_woodpecker_proto_goTypes = []any{ (*StepState)(nil), // 0: proto.StepState (*WorkflowState)(nil), // 1: proto.WorkflowState @@ -1273,19 +1337,20 @@ var file_woodpecker_proto_goTypes = []any{ (*VersionResponse)(nil), // 16: proto.VersionResponse (*NextResponse)(nil), // 17: proto.NextResponse (*RegisterAgentResponse)(nil), // 18: proto.RegisterAgentResponse - (*AuthRequest)(nil), // 19: proto.AuthRequest - (*AuthResponse)(nil), // 20: proto.AuthResponse - nil, // 21: proto.Filter.LabelsEntry - nil, // 22: proto.AgentInfo.CustomLabelsEntry + (*WaitResponse)(nil), // 19: proto.WaitResponse + (*AuthRequest)(nil), // 20: proto.AuthRequest + (*AuthResponse)(nil), // 21: proto.AuthResponse + nil, // 22: proto.Filter.LabelsEntry + nil, // 23: proto.AgentInfo.CustomLabelsEntry } var file_woodpecker_proto_depIdxs = []int32{ - 21, // 0: proto.Filter.labels:type_name -> proto.Filter.LabelsEntry + 22, // 0: proto.Filter.labels:type_name -> proto.Filter.LabelsEntry 3, // 1: proto.NextRequest.filter:type_name -> proto.Filter 1, // 2: proto.InitRequest.state:type_name -> proto.WorkflowState 1, // 3: proto.DoneRequest.state:type_name -> proto.WorkflowState 0, // 4: proto.UpdateRequest.state:type_name -> proto.StepState 2, // 5: proto.LogRequest.logEntries:type_name -> proto.LogEntry - 22, // 6: proto.AgentInfo.customLabels:type_name -> proto.AgentInfo.CustomLabelsEntry + 23, // 6: proto.AgentInfo.customLabels:type_name -> proto.AgentInfo.CustomLabelsEntry 14, // 7: proto.RegisterAgentRequest.info:type_name -> proto.AgentInfo 4, // 8: proto.NextResponse.workflow:type_name -> proto.Workflow 12, // 9: proto.Woodpecker.Version:input_type -> proto.Empty @@ -1299,11 +1364,11 @@ var file_woodpecker_proto_depIdxs = []int32{ 15, // 17: proto.Woodpecker.RegisterAgent:input_type -> proto.RegisterAgentRequest 12, // 18: proto.Woodpecker.UnregisterAgent:input_type -> proto.Empty 13, // 19: proto.Woodpecker.ReportHealth:input_type -> proto.ReportHealthRequest - 19, // 20: proto.WoodpeckerAuth.Auth:input_type -> proto.AuthRequest + 20, // 20: proto.WoodpeckerAuth.Auth:input_type -> proto.AuthRequest 16, // 21: proto.Woodpecker.Version:output_type -> proto.VersionResponse 17, // 22: proto.Woodpecker.Next:output_type -> proto.NextResponse 12, // 23: proto.Woodpecker.Init:output_type -> proto.Empty - 12, // 24: proto.Woodpecker.Wait:output_type -> proto.Empty + 19, // 24: proto.Woodpecker.Wait:output_type -> proto.WaitResponse 12, // 25: proto.Woodpecker.Done:output_type -> proto.Empty 12, // 26: proto.Woodpecker.Extend:output_type -> proto.Empty 12, // 27: proto.Woodpecker.Update:output_type -> proto.Empty @@ -1311,7 +1376,7 @@ var file_woodpecker_proto_depIdxs = []int32{ 18, // 29: proto.Woodpecker.RegisterAgent:output_type -> proto.RegisterAgentResponse 12, // 30: proto.Woodpecker.UnregisterAgent:output_type -> proto.Empty 12, // 31: proto.Woodpecker.ReportHealth:output_type -> proto.Empty - 20, // 32: proto.WoodpeckerAuth.Auth:output_type -> proto.AuthResponse + 21, // 32: proto.WoodpeckerAuth.Auth:output_type -> proto.AuthResponse 21, // [21:33] is the sub-list for method output_type 9, // [9:21] is the sub-list for method input_type 9, // [9:9] is the sub-list for extension type_name @@ -1330,7 +1395,7 @@ func file_woodpecker_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_woodpecker_proto_rawDesc), len(file_woodpecker_proto_rawDesc)), NumEnums: 0, - NumMessages: 23, + NumMessages: 24, NumExtensions: 0, NumServices: 2, }, diff --git a/rpc/proto/woodpecker.proto b/rpc/proto/woodpecker.proto index 38cf2352f..b348c0957 100644 --- a/rpc/proto/woodpecker.proto +++ b/rpc/proto/woodpecker.proto @@ -27,7 +27,7 @@ service Woodpecker { rpc Version (Empty) returns (VersionResponse) {} rpc Next (NextRequest) returns (NextResponse) {} rpc Init (InitRequest) returns (Empty) {} - rpc Wait (WaitRequest) returns (Empty) {} + rpc Wait (WaitRequest) returns (WaitResponse) {} rpc Done (DoneRequest) returns (Empty) {} rpc Extend (ExtendRequest) returns (Empty) {} rpc Update (UpdateRequest) returns (Empty) {} @@ -48,12 +48,14 @@ message StepState { bool exited = 4; int32 exit_code = 5; string error = 6; + bool canceled = 7; } message WorkflowState { - int64 started = 4; - int64 finished = 5; - string error = 6; + int64 started = 1; + int64 finished = 2; + string error = 3; + bool canceled = 4; } message LogEntry { @@ -145,6 +147,10 @@ message RegisterAgentResponse { int64 agent_id = 1; } +message WaitResponse { + bool canceled = 1; +}; + // Woodpecker auth service is a simple service to authenticate agents and acquire a token service WoodpeckerAuth { diff --git a/rpc/proto/woodpecker_grpc.pb.go b/rpc/proto/woodpecker_grpc.pb.go index 033af6877..88ca6e7a9 100644 --- a/rpc/proto/woodpecker_grpc.pb.go +++ b/rpc/proto/woodpecker_grpc.pb.go @@ -56,7 +56,7 @@ type WoodpeckerClient interface { Version(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*VersionResponse, error) Next(ctx context.Context, in *NextRequest, opts ...grpc.CallOption) (*NextResponse, error) Init(ctx context.Context, in *InitRequest, opts ...grpc.CallOption) (*Empty, error) - Wait(ctx context.Context, in *WaitRequest, opts ...grpc.CallOption) (*Empty, error) + Wait(ctx context.Context, in *WaitRequest, opts ...grpc.CallOption) (*WaitResponse, error) Done(ctx context.Context, in *DoneRequest, opts ...grpc.CallOption) (*Empty, error) Extend(ctx context.Context, in *ExtendRequest, opts ...grpc.CallOption) (*Empty, error) Update(ctx context.Context, in *UpdateRequest, opts ...grpc.CallOption) (*Empty, error) @@ -104,9 +104,9 @@ func (c *woodpeckerClient) Init(ctx context.Context, in *InitRequest, opts ...gr return out, nil } -func (c *woodpeckerClient) Wait(ctx context.Context, in *WaitRequest, opts ...grpc.CallOption) (*Empty, error) { +func (c *woodpeckerClient) Wait(ctx context.Context, in *WaitRequest, opts ...grpc.CallOption) (*WaitResponse, error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) - out := new(Empty) + out := new(WaitResponse) err := c.cc.Invoke(ctx, Woodpecker_Wait_FullMethodName, in, out, cOpts...) if err != nil { return nil, err @@ -193,7 +193,7 @@ type WoodpeckerServer interface { Version(context.Context, *Empty) (*VersionResponse, error) Next(context.Context, *NextRequest) (*NextResponse, error) Init(context.Context, *InitRequest) (*Empty, error) - Wait(context.Context, *WaitRequest) (*Empty, error) + Wait(context.Context, *WaitRequest) (*WaitResponse, error) Done(context.Context, *DoneRequest) (*Empty, error) Extend(context.Context, *ExtendRequest) (*Empty, error) Update(context.Context, *UpdateRequest) (*Empty, error) @@ -220,7 +220,7 @@ func (UnimplementedWoodpeckerServer) Next(context.Context, *NextRequest) (*NextR func (UnimplementedWoodpeckerServer) Init(context.Context, *InitRequest) (*Empty, error) { return nil, status.Error(codes.Unimplemented, "method Init not implemented") } -func (UnimplementedWoodpeckerServer) Wait(context.Context, *WaitRequest) (*Empty, error) { +func (UnimplementedWoodpeckerServer) Wait(context.Context, *WaitRequest) (*WaitResponse, error) { return nil, status.Error(codes.Unimplemented, "method Wait not implemented") } func (UnimplementedWoodpeckerServer) Done(context.Context, *DoneRequest) (*Empty, error) { diff --git a/rpc/types.go b/rpc/types.go new file mode 100644 index 000000000..18dc6ca23 --- /dev/null +++ b/rpc/types.go @@ -0,0 +1,66 @@ +// Copyright 2025 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rpc + +import ( + backend "go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types" +) + +type ( + // Filter defines filters for fetching items from the queue. + Filter struct { + Labels map[string]string `json:"labels"` + } + + // StepState defines the step state. + StepState struct { + StepUUID string `json:"step_uuid"` + Started int64 `json:"started"` + Finished int64 `json:"finished"` + Exited bool `json:"exited"` + ExitCode int `json:"exit_code"` + Error string `json:"error"` + Canceled bool `json:"canceled"` + } + + // WorkflowState defines the workflow state. + WorkflowState struct { + Started int64 `json:"started"` + Finished int64 `json:"finished"` + Error string `json:"error"` + Canceled bool `json:"canceled"` + } + + // Workflow defines the workflow execution details. + Workflow struct { + ID string `json:"id"` + Config *backend.Config `json:"config"` + Timeout int64 `json:"timeout"` + } + + Version struct { + GrpcVersion int32 `json:"grpc_version,omitempty"` + ServerVersion string `json:"server_version,omitempty"` + } + + // AgentInfo represents all the metadata that should be known about an agent. + AgentInfo struct { + Version string `json:"version"` + Platform string `json:"platform"` + Backend string `json:"backend"` + Capacity int `json:"capacity"` + CustomLabels map[string]string `json:"custom_labels"` + } +) diff --git a/server/pipeline/cancel.go b/server/pipeline/cancel.go index d351d5c90..87c01a51d 100644 --- a/server/pipeline/cancel.go +++ b/server/pipeline/cancel.go @@ -40,24 +40,13 @@ func Cancel(ctx context.Context, _forge forge.Forge, store store.Store, repo *mo } // First cancel/evict workflows in the queue in one go - var ( - workflowsToCancel []string - workflowsToEvict []string - ) - for _, workflow := range workflows { - if workflow.State == model.StatusRunning { - workflowsToCancel = append(workflowsToCancel, fmt.Sprint(workflow.ID)) - } - if workflow.State == model.StatusPending { - workflowsToEvict = append(workflowsToEvict, fmt.Sprint(workflow.ID)) + var workflowsToCancel []string + for _, w := range workflows { + if w.State == model.StatusRunning || w.State == model.StatusPending { + workflowsToCancel = append(workflowsToCancel, fmt.Sprint(w.ID)) } } - if len(workflowsToEvict) != 0 { - if err := server.Config.Services.Queue.ErrorAtOnce(ctx, workflowsToEvict, queue.ErrCancel); err != nil { - log.Error().Err(err).Msgf("queue: evict_at_once: %v", workflowsToEvict) - } - } if len(workflowsToCancel) != 0 { if err := server.Config.Services.Queue.ErrorAtOnce(ctx, workflowsToCancel, queue.ErrCancel); err != nil { log.Error().Err(err).Msgf("queue: evict_at_once: %v", workflowsToCancel) diff --git a/server/pipeline/step_status.go b/server/pipeline/step_status.go index fabce7d86..a45c13616 100644 --- a/server/pipeline/step_status.go +++ b/server/pipeline/step_status.go @@ -16,28 +16,76 @@ package pipeline import ( - "go.woodpecker-ci.org/woodpecker/v3/pipeline" + "fmt" + "time" + + "github.com/rs/zerolog/log" + "go.woodpecker-ci.org/woodpecker/v3/rpc" "go.woodpecker-ci.org/woodpecker/v3/server/model" "go.woodpecker-ci.org/woodpecker/v3/server/store" ) +// UpdateStepStatus updates step status based on agent reports via RPC. func UpdateStepStatus(store store.Store, step *model.Step, state rpc.StepState) error { - if state.Exited { - step.Finished = state.Finished - step.ExitCode = state.ExitCode - step.Error = state.Error - step.State = model.StatusSuccess - if state.ExitCode != 0 || state.Error != "" { - step.State = model.StatusFailure + log.Debug().Str("StepUUID", step.UUID).Msgf("Update step %#v state %#v", *step, state) + + switch step.State { + case model.StatusPending: + // Transition from pending to running when started + if state.Finished == 0 { + step.State = model.StatusRunning } - if state.ExitCode == pipeline.ExitCodeKilled { - step.State = model.StatusKilled - } - } else if step.Finished == 0 { step.Started = state.Started - step.State = model.StatusRunning + if step.Started == 0 { + step.Started = time.Now().Unix() + } + + // Handle direct transition to finished if step setup error happened + if state.Exited || state.Error != "" { + step.Finished = state.Finished + if step.Finished == 0 { + step.Finished = time.Now().Unix() + } + step.ExitCode = state.ExitCode + step.Error = state.Error + + if state.ExitCode == 0 && state.Error == "" { + step.State = model.StatusSuccess + } else { + step.State = model.StatusFailure + } + } + + case model.StatusRunning: + // Already running, check if it finished + if state.Exited || state.Error != "" { + step.Finished = state.Finished + if step.Finished == 0 { + step.Finished = time.Now().Unix() + } + step.ExitCode = state.ExitCode + step.Error = state.Error + + if state.ExitCode == 0 && state.Error == "" { + step.State = model.StatusSuccess + } else { + step.State = model.StatusFailure + } + } + + default: + return fmt.Errorf("step has state %s and does not expect rpc state updates", step.State) } + + // Handle cancellation across both cases + if state.Canceled && step.State != model.StatusKilled { + step.State = model.StatusKilled + if step.Finished == 0 { + step.Finished = time.Now().Unix() + } + } + return store.StepUpdate(step) } diff --git a/server/pipeline/step_status_test.go b/server/pipeline/step_status_test.go index 08a122826..2a00aef87 100644 --- a/server/pipeline/step_status_test.go +++ b/server/pipeline/step_status_test.go @@ -1,5 +1,4 @@ // Copyright 2022 Woodpecker Authors -// Copyright 2019 mhmxs // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -34,138 +33,224 @@ func mockStoreStep(t *testing.T) store.Store { return s } -func TestUpdateStepStatusNotExited(t *testing.T) { - t.Parallel() - // step in db before update - step := &model.Step{} - - // advertised step status - state := rpc.StepState{ - Started: int64(42), - Exited: false, - // Dummy data - Finished: int64(1), - ExitCode: pipeline.ExitCodeKilled, - Error: "not an error", - } - - err := UpdateStepStatus(mockStoreStep(t), step, state) - assert.NoError(t, err) - assert.EqualValues(t, model.StatusRunning, step.State) - assert.EqualValues(t, 42, step.Started) - assert.EqualValues(t, 0, step.Finished) - assert.EqualValues(t, 0, step.ExitCode) - assert.EqualValues(t, "", step.Error) -} - -func TestUpdateStepStatusNotExitedButStopped(t *testing.T) { +func TestUpdateStepStatus(t *testing.T) { t.Parallel() - // step in db before update - step := &model.Step{Started: 42, Finished: 64, State: model.StatusKilled} + t.Run("Pending", func(t *testing.T) { + t.Parallel() - // advertised step status - state := rpc.StepState{ - Exited: false, - // Dummy data - Finished: int64(1), - ExitCode: pipeline.ExitCodeKilled, - Error: "not an error", - } + t.Run("TransitionToRunning", func(t *testing.T) { + t.Parallel() - err := UpdateStepStatus(mockStoreStep(t), step, state) - assert.NoError(t, err) - assert.EqualValues(t, model.StatusKilled, step.State) - assert.EqualValues(t, 42, step.Started) - assert.EqualValues(t, 64, step.Finished) - assert.EqualValues(t, 0, step.ExitCode) - assert.EqualValues(t, "", step.Error) -} + t.Run("WithStartTime", func(t *testing.T) { + t.Parallel() + step := &model.Step{State: model.StatusPending} + state := rpc.StepState{Started: 42, Finished: 0} -func TestUpdateStepStatusExited(t *testing.T) { - t.Parallel() + err := UpdateStepStatus(mockStoreStep(t), step, state) - // step in db before update - step := &model.Step{Started: 42} + assert.NoError(t, err) + assert.Equal(t, model.StatusRunning, step.State) + assert.Equal(t, int64(42), step.Started) + assert.Equal(t, int64(0), step.Finished) + }) - // advertised step status - state := rpc.StepState{ - Started: int64(42), - Exited: true, - Finished: int64(34), - ExitCode: pipeline.ExitCodeKilled, - Error: "an error", - } + t.Run("WithoutStartTime", func(t *testing.T) { + t.Parallel() + step := &model.Step{State: model.StatusPending} + state := rpc.StepState{Started: 0, Finished: 0} - err := UpdateStepStatus(mockStoreStep(t), step, state) - assert.NoError(t, err) - assert.EqualValues(t, model.StatusKilled, step.State) - assert.EqualValues(t, 42, step.Started) - assert.EqualValues(t, 34, step.Finished) - assert.EqualValues(t, pipeline.ExitCodeKilled, step.ExitCode) - assert.EqualValues(t, "an error", step.Error) -} + err := UpdateStepStatus(mockStoreStep(t), step, state) -func TestUpdateStepStatusExitedButNot137(t *testing.T) { - t.Parallel() + assert.NoError(t, err) + assert.Equal(t, model.StatusRunning, step.State) + assert.Greater(t, step.Started, int64(0)) + }) + }) - // step in db before update - step := &model.Step{Started: 42} + t.Run("DirectToSuccess", func(t *testing.T) { + t.Parallel() - // advertised step status - state := rpc.StepState{ - Started: int64(42), - Exited: true, - Finished: int64(34), - Error: "an error", - } + t.Run("WithFinishTime", func(t *testing.T) { + t.Parallel() + step := &model.Step{State: model.StatusPending} + state := rpc.StepState{Started: 42, Exited: true, Finished: 100, ExitCode: 0, Error: ""} - err := UpdateStepStatus(mockStoreStep(t), step, state) - assert.NoError(t, err) - assert.EqualValues(t, model.StatusFailure, step.State) - assert.EqualValues(t, 42, step.Started) - assert.EqualValues(t, 34, step.Finished) - assert.EqualValues(t, 0, step.ExitCode) - assert.EqualValues(t, "an error", step.Error) -} + err := UpdateStepStatus(mockStoreStep(t), step, state) -func TestUpdateStepStatusExitedWithCode(t *testing.T) { - t.Parallel() + assert.NoError(t, err) + assert.Equal(t, model.StatusSuccess, step.State) + assert.Equal(t, int64(42), step.Started) + assert.Equal(t, int64(100), step.Finished) + }) - // advertised step status - state := rpc.StepState{ - Started: int64(42), - Exited: true, - Finished: int64(34), - ExitCode: 1, - Error: "an error", - } - step := &model.Step{} - err := UpdateStepStatus(mockStoreStep(t), step, state) - assert.NoError(t, err) + t.Run("WithoutFinishTime", func(t *testing.T) { + t.Parallel() + step := &model.Step{State: model.StatusPending} + state := rpc.StepState{Started: 42, Exited: true, Finished: 0, ExitCode: 0, Error: ""} - assert.Equal(t, model.StatusFailure, step.State) - assert.Equal(t, 1, step.ExitCode) + err := UpdateStepStatus(mockStoreStep(t), step, state) + + assert.NoError(t, err) + assert.Equal(t, model.StatusSuccess, step.State) + assert.Greater(t, step.Finished, int64(0)) + }) + }) + + t.Run("DirectToFailure", func(t *testing.T) { + t.Parallel() + + t.Run("WithExitCode", func(t *testing.T) { + t.Parallel() + step := &model.Step{State: model.StatusPending} + state := rpc.StepState{Started: 42, Exited: true, Finished: 34, ExitCode: 1, Error: "an error"} + + err := UpdateStepStatus(mockStoreStep(t), step, state) + + assert.NoError(t, err) + assert.Equal(t, model.StatusFailure, step.State) + assert.Equal(t, 1, step.ExitCode) + assert.Equal(t, "an error", step.Error) + }) + }) + }) + + t.Run("Running", func(t *testing.T) { + t.Parallel() + + t.Run("ToSuccess", func(t *testing.T) { + t.Parallel() + + t.Run("WithFinishTime", func(t *testing.T) { + t.Parallel() + step := &model.Step{State: model.StatusRunning, Started: 42} + state := rpc.StepState{Exited: true, Finished: 100, ExitCode: 0, Error: ""} + + err := UpdateStepStatus(mockStoreStep(t), step, state) + + assert.NoError(t, err) + assert.Equal(t, model.StatusSuccess, step.State) + assert.Equal(t, int64(100), step.Finished) + }) + + t.Run("WithoutFinishTime", func(t *testing.T) { + t.Parallel() + step := &model.Step{State: model.StatusRunning, Started: 42} + state := rpc.StepState{Exited: true, Finished: 0, ExitCode: 0, Error: ""} + + err := UpdateStepStatus(mockStoreStep(t), step, state) + + assert.NoError(t, err) + assert.Equal(t, model.StatusSuccess, step.State) + assert.Greater(t, step.Finished, int64(0)) + }) + }) + + t.Run("ToFailure", func(t *testing.T) { + t.Parallel() + + t.Run("WithExitCode137", func(t *testing.T) { + t.Parallel() + step := &model.Step{State: model.StatusRunning, Started: 42} + state := rpc.StepState{Exited: true, Finished: 34, ExitCode: pipeline.ExitCodeKilled, Error: "an error"} + + err := UpdateStepStatus(mockStoreStep(t), step, state) + + assert.NoError(t, err) + assert.Equal(t, model.StatusFailure, step.State) + assert.Equal(t, int64(34), step.Finished) + assert.Equal(t, pipeline.ExitCodeKilled, step.ExitCode) + }) + + t.Run("WithError", func(t *testing.T) { + t.Parallel() + step := &model.Step{State: model.StatusRunning, Started: 42} + state := rpc.StepState{Exited: true, Finished: 34, ExitCode: 0, Error: "an error"} + + err := UpdateStepStatus(mockStoreStep(t), step, state) + + assert.NoError(t, err) + assert.Equal(t, model.StatusFailure, step.State) + assert.Equal(t, "an error", step.Error) + }) + }) + + t.Run("StillRunning", func(t *testing.T) { + t.Parallel() + step := &model.Step{State: model.StatusRunning, Started: 42} + state := rpc.StepState{Exited: false, Finished: 0} + + err := UpdateStepStatus(mockStoreStep(t), step, state) + + assert.NoError(t, err) + assert.Equal(t, model.StatusRunning, step.State) + assert.Equal(t, int64(0), step.Finished) + }) + }) + + t.Run("Canceled", func(t *testing.T) { + t.Parallel() + + t.Run("WithoutFinishTime", func(t *testing.T) { + t.Parallel() + step := &model.Step{State: model.StatusRunning, Started: 42} + state := rpc.StepState{Canceled: true} + + err := UpdateStepStatus(mockStoreStep(t), step, state) + + assert.NoError(t, err) + assert.Equal(t, model.StatusKilled, step.State) + assert.Greater(t, step.Finished, int64(0)) + }) + + t.Run("WithExitedAndFinishTime", func(t *testing.T) { + t.Parallel() + step := &model.Step{State: model.StatusRunning, Started: 42} + state := rpc.StepState{Canceled: true, Exited: true, Finished: 100, ExitCode: 1, Error: "canceled"} + + err := UpdateStepStatus(mockStoreStep(t), step, state) + + assert.NoError(t, err) + assert.Equal(t, model.StatusKilled, step.State) + assert.Equal(t, int64(100), step.Finished) + assert.Equal(t, 1, step.ExitCode) + assert.Equal(t, "canceled", step.Error) + }) + }) + + t.Run("TerminalState", func(t *testing.T) { + t.Parallel() + step := &model.Step{State: model.StatusKilled, Started: 42, Finished: 64} + state := rpc.StepState{Exited: false} + + err := UpdateStepStatus(mocks.NewMockStore(t), step, state) + + assert.Error(t, err) + assert.Contains(t, err.Error(), "does not expect rpc state updates") + assert.Equal(t, model.StatusKilled, step.State) + }) } func TestUpdateStepToStatusSkipped(t *testing.T) { t.Parallel() - step, _ := UpdateStepToStatusSkipped(mockStoreStep(t), model.Step{}, int64(1)) + t.Run("NotStarted", func(t *testing.T) { + t.Parallel() - assert.Equal(t, model.StatusSkipped, step.State) - assert.EqualValues(t, 0, step.Finished) -} - -func TestUpdateStepToStatusSkippedButStarted(t *testing.T) { - t.Parallel() - - step := &model.Step{ - Started: int64(42), - } - - step, _ = UpdateStepToStatusSkipped(mockStoreStep(t), *step, int64(1)) - - assert.Equal(t, model.StatusSuccess, step.State) - assert.EqualValues(t, 1, step.Finished) + step, err := UpdateStepToStatusSkipped(mockStoreStep(t), model.Step{}, int64(1)) + + assert.NoError(t, err) + assert.Equal(t, model.StatusSkipped, step.State) + assert.Equal(t, int64(0), step.Finished) + }) + + t.Run("AlreadyStarted", func(t *testing.T) { + t.Parallel() + + step, err := UpdateStepToStatusSkipped(mockStoreStep(t), model.Step{Started: 42}, int64(100)) + + assert.NoError(t, err) + assert.Equal(t, model.StatusSuccess, step.State) + assert.Equal(t, int64(100), step.Finished) + }) } diff --git a/server/pipeline/workflow_status.go b/server/pipeline/workflow_status.go index 3bf81d135..c65982e49 100644 --- a/server/pipeline/workflow_status.go +++ b/server/pipeline/workflow_status.go @@ -42,5 +42,8 @@ func UpdateWorkflowStatusToDone(store store.Store, workflow model.Workflow, stat if workflow.Error != "" { workflow.State = model.StatusFailure } + if state.Canceled { + workflow.State = model.StatusKilled + } return &workflow, store.WorkflowUpdate(&workflow) } diff --git a/server/queue/fifo.go b/server/queue/fifo.go index 25acb25bf..3b370186e 100644 --- a/server/queue/fifo.go +++ b/server/queue/fifo.go @@ -157,6 +157,7 @@ func (q *fifo) finished(ids []string, exitStatus model.StatusValue, err error) e } // Wait waits until the item is done executing. +// Also signals via error ErrCancel if workflow got canceled. func (q *fifo) Wait(ctx context.Context, taskID string) error { q.Lock() state := q.running[taskID] diff --git a/server/queue/persistent.go b/server/queue/persistent.go index 2178389e7..0c18f9af1 100644 --- a/server/queue/persistent.go +++ b/server/queue/persistent.go @@ -17,11 +17,14 @@ package queue import ( "context" + "errors" + "fmt" "github.com/rs/zerolog/log" "go.woodpecker-ci.org/woodpecker/v3/server/model" "go.woodpecker-ci.org/woodpecker/v3/server/store" + "go.woodpecker-ci.org/woodpecker/v3/server/store/types" ) // WithTaskStore returns a queue that is backed by the TaskStore. This @@ -77,7 +80,14 @@ func (q *persistentQueue) Error(c context.Context, id string, err error) error { if err := q.Queue.Error(c, id, err); err != nil { return err } - return q.store.TaskDelete(id) + + if deleteErr := q.store.TaskDelete(id); deleteErr != nil { + if !errors.Is(deleteErr, types.RecordNotExist) { + return deleteErr + } + log.Debug().Msgf("task %s already removed from store", id) + } + return nil } // ErrorAtOnce signals multiple tasks are done and complete with an error. @@ -86,10 +96,16 @@ func (q *persistentQueue) ErrorAtOnce(c context.Context, ids []string, err error if err := q.Queue.ErrorAtOnce(c, ids, err); err != nil { return err } + + var errs []error for _, id := range ids { - if err := q.store.TaskDelete(id); err != nil { - return err + if deleteErr := q.store.TaskDelete(id); deleteErr != nil && !errors.Is(deleteErr, types.RecordNotExist) { + errs = append(errs, fmt.Errorf("task id [%s]: %w", id, deleteErr)) } } + + if len(errs) != 0 { + return fmt.Errorf("failed to delete tasks from persistent store: %w", errors.Join(errs...)) + } return nil } diff --git a/server/queue/queue.go b/server/queue/queue.go index d924eddcf..77fa3ed8e 100644 --- a/server/queue/queue.go +++ b/server/queue/queue.go @@ -101,6 +101,7 @@ type Queue interface { ErrorAtOnce(c context.Context, ids []string, err error) error // Wait waits until the task is complete. + // Also signals via error ErrCancel if workflow got canceled. Wait(c context.Context, id string) error // Info returns internal queue information. diff --git a/server/rpc/authorizer.go b/server/rpc/authorizer.go index 545e8e98e..bdf21d432 100644 --- a/server/rpc/authorizer.go +++ b/server/rpc/authorizer.go @@ -12,6 +12,38 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Package grpc provides gRPC server implementation with JWT-based authentication. +// +// # Authentication Flow +// +// Uses a two-token approach: +// +// 1. Agent Token (long-lived): Configured via WOODPECKER_AGENT_SECRET, used only for initial Auth() call +// 2. JWT Access Token (short-lived, 1 hour): Obtained from Auth(), included in metadata["token"] for all service calls +// +// # Interceptor Architecture +// +// Authorizer interceptors validate JWT tokens on every request: +// 1. Extract JWT from metadata["token"] +// 2. Verify signature and expiration +// 3. Extract and add agent_id to metadata for downstream handlers +// +// Auth endpoint (/proto.WoodpeckerAuth/Auth) bypasses validation to allow initial authentication. +// +// # Usage +// +// // Server setup +// jwtManager := NewJWTManager(c.String("grpc-secret")) +// authorizer := NewAuthorizer(jwtManager) +// grpcServer := grpc.NewServer( +// grpc.StreamInterceptor(authorizer.StreamInterceptor), +// grpc.UnaryInterceptor(authorizer.UnaryInterceptor), +// ) +// +// // Client usage +// resp, _ := authClient.Auth(ctx, &proto.AuthRequest{AgentToken: "secret", AgentId: -1}) +// ctx = metadata.AppendToOutgoingContext(ctx, "token", resp.AccessToken) +// workflow, _ := woodpeckerClient.Next(ctx, &proto.NextRequest{...}) package grpc import ( @@ -24,6 +56,7 @@ import ( "google.golang.org/grpc/status" ) +// StreamContextWrapper wraps gRPC ServerStream to allow context modification. type StreamContextWrapper interface { grpc.ServerStream SetContext(context.Context) @@ -50,14 +83,17 @@ func newStreamContextWrapper(inner grpc.ServerStream) StreamContextWrapper { } } +// Authorizer validates JWT tokens and enriches context with agent information. type Authorizer struct { jwtManager *JWTManager } +// NewAuthorizer creates a new JWT authorizer. func NewAuthorizer(jwtManager *JWTManager) *Authorizer { return &Authorizer{jwtManager: jwtManager} } +// StreamInterceptor validates JWT tokens for streaming gRPC calls. func (a *Authorizer) StreamInterceptor(srv any, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { _stream := newStreamContextWrapper(stream) @@ -71,7 +107,8 @@ func (a *Authorizer) StreamInterceptor(srv any, stream grpc.ServerStream, info * return handler(srv, _stream) } -func (a *Authorizer) UnaryInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) { +// UnaryInterceptor validates JWT tokens for unary gRPC calls. +func (a *Authorizer) UnaryInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { newCtx, err := a.authorize(ctx, info.FullMethod) if err != nil { return nil, err @@ -79,6 +116,8 @@ func (a *Authorizer) UnaryInterceptor(ctx context.Context, req any, info *grpc.U return handler(newCtx, req) } +// authorize validates JWT and enriches context with agent_id metadata. +// Bypasses validation for /proto.WoodpeckerAuth/Auth endpoint. func (a *Authorizer) authorize(ctx context.Context, fullMethod string) (context.Context, error) { // bypass auth for token endpoint if fullMethod == "/proto.WoodpeckerAuth/Auth" { diff --git a/server/rpc/rpc.go b/server/rpc/rpc.go index c32693de1..5ff78e275 100644 --- a/server/rpc/rpc.go +++ b/server/rpc/rpc.go @@ -53,6 +53,7 @@ type RPC struct { } // Next blocks until it provides the next workflow to execute. +// TODO (6038): Server does not release waiting agents on graceful shutdown. func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Workflow, error) { if hostname, err := s.getHostnameFromContext(c); err == nil { log.Debug().Msgf("agent connected: %s: polling", hostname) @@ -100,18 +101,29 @@ func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Workflow, er } } -// Wait blocks until the workflow with the given ID is done. -func (s *RPC) Wait(c context.Context, workflowID string) error { +// Wait blocks until the workflow with the given ID is completed or got canceled. +// Used to let agents wait for cancel signals from server side. +func (s *RPC) Wait(c context.Context, workflowID string) (canceled bool, err error) { agent, err := s.getAgentFromContext(c) if err != nil { - return err + return false, err } if err := s.checkAgentPermissionByWorkflow(c, agent, workflowID, nil, nil); err != nil { - return err + return false, err } - return s.queue.Wait(c, workflowID) + if err := s.queue.Wait(c, workflowID); err != nil { + if errors.Is(err, queue.ErrCancel) { + // we explicit send a cancel signal + return true, nil + } + // unknown error happened + return false, err + } + + // workflow finished and on issues appeared + return false, nil } // Extend extends the lease for the workflow with the given ID. @@ -133,7 +145,7 @@ func (s *RPC) Extend(c context.Context, workflowID string) error { return s.queue.Extend(c, agent.ID, workflowID) } -// Update updates the state of a step. +// Update let agent updates the step state at the server. func (s *RPC) Update(c context.Context, strWorkflowID string, state rpc.StepState) error { workflowID, err := strconv.ParseInt(strWorkflowID, 10, 64) if err != nil { @@ -213,7 +225,7 @@ func (s *RPC) Update(c context.Context, strWorkflowID string, state rpc.StepStat return nil } -// Init implements the rpc.Init function. +// Init signals the workflow is initialized. func (s *RPC) Init(c context.Context, strWorkflowID string, state rpc.WorkflowState) error { workflowID, err := strconv.ParseInt(strWorkflowID, 10, 64) if err != nil { @@ -286,7 +298,7 @@ func (s *RPC) Init(c context.Context, strWorkflowID string, state rpc.WorkflowSt return s.updateAgentLastWork(agent) } -// Done marks the workflow with the given ID as done. +// Done marks the workflow with the given ID as stope. func (s *RPC) Done(c context.Context, strWorkflowID string, state rpc.WorkflowState) error { workflowID, err := strconv.ParseInt(strWorkflowID, 10, 64) if err != nil { @@ -331,20 +343,23 @@ func (s *RPC) Done(c context.Context, strWorkflowID string, state rpc.WorkflowSt Str("pipeline_id", fmt.Sprint(currentPipeline.ID)). Str("workflow_id", strWorkflowID).Logger() - logger.Trace().Msgf("gRPC Done with state: %#v", state) + logger.Debug().Msgf("workflow state in store: %#v", workflow) + logger.Debug().Msgf("gRPC Done with state: %#v", state) if workflow, err = pipeline.UpdateWorkflowStatusToDone(s.store, *workflow, state); err != nil { logger.Error().Err(err).Msgf("pipeline.UpdateWorkflowStatusToDone: cannot update workflow state: %s", err) } - var queueErr error - if workflow.Failing() { - queueErr = s.queue.Error(c, strWorkflowID, fmt.Errorf("workflow finished with error %s", state.Error)) - } else { - queueErr = s.queue.Done(c, strWorkflowID, workflow.State) - } - if queueErr != nil { - logger.Error().Err(queueErr).Msg("queue.Done: cannot ack workflow") + if !state.Canceled { + var queueErr error + if workflow.Failing() { + queueErr = s.queue.Error(c, strWorkflowID, fmt.Errorf("workflow finished with error %s", state.Error)) + } else { + queueErr = s.queue.Done(c, strWorkflowID, workflow.State) + } + if queueErr != nil { + logger.Error().Err(queueErr).Msg("queue.Done: cannot ack workflow") + } } currentPipeline.Workflows, err = s.store.WorkflowGetTree(currentPipeline) diff --git a/server/rpc/server.go b/server/rpc/server.go index b5414db62..c9115ede5 100644 --- a/server/rpc/server.go +++ b/server/rpc/server.go @@ -59,6 +59,7 @@ func NewWoodpeckerServer(queue queue.Queue, logger logging.Log, pubsub *pubsub.P return &WoodpeckerServer{peer: peer} } +// Version returns the server- & grpc-version. func (s *WoodpeckerServer) Version(_ context.Context, _ *proto.Empty) (*proto.VersionResponse, error) { return &proto.VersionResponse{ GrpcVersion: proto.Version, @@ -66,6 +67,7 @@ func (s *WoodpeckerServer) Version(_ context.Context, _ *proto.Empty) (*proto.Ve }, nil } +// Next blocks until it provides the next workflow to execute from the queue. func (s *WoodpeckerServer) Next(c context.Context, req *proto.NextRequest) (*proto.NextResponse, error) { filter := rpc.Filter{ Labels: req.GetFilter().GetLabels(), @@ -85,6 +87,7 @@ func (s *WoodpeckerServer) Next(c context.Context, req *proto.NextRequest) (*pro return res, err } +// Init let agent signals to server the workflow is initialized. func (s *WoodpeckerServer) Init(c context.Context, req *proto.InitRequest) (*proto.Empty, error) { state := rpc.WorkflowState{ Started: req.GetState().GetStarted(), @@ -96,6 +99,7 @@ func (s *WoodpeckerServer) Init(c context.Context, req *proto.InitRequest) (*pro return res, err } +// Update let agent updates the step state at the server. func (s *WoodpeckerServer) Update(c context.Context, req *proto.UpdateRequest) (*proto.Empty, error) { state := rpc.StepState{ StepUUID: req.GetState().GetStepUuid(), @@ -104,29 +108,36 @@ func (s *WoodpeckerServer) Update(c context.Context, req *proto.UpdateRequest) ( Exited: req.GetState().GetExited(), Error: req.GetState().GetError(), ExitCode: int(req.GetState().GetExitCode()), + Canceled: req.GetState().GetCanceled(), } res := new(proto.Empty) err := s.peer.Update(c, req.GetId(), state) return res, err } +// Done let agent signal to server the workflow has stopped. func (s *WoodpeckerServer) Done(c context.Context, req *proto.DoneRequest) (*proto.Empty, error) { state := rpc.WorkflowState{ Started: req.GetState().GetStarted(), Finished: req.GetState().GetFinished(), Error: req.GetState().GetError(), + Canceled: req.GetState().GetCanceled(), } res := new(proto.Empty) err := s.peer.Done(c, req.GetId(), state) return res, err } -func (s *WoodpeckerServer) Wait(c context.Context, req *proto.WaitRequest) (*proto.Empty, error) { - res := new(proto.Empty) - err := s.peer.Wait(c, req.GetId()) +// Wait blocks until the workflow is complete. +// Also signals via err if workflow got canceled. +func (s *WoodpeckerServer) Wait(c context.Context, req *proto.WaitRequest) (*proto.WaitResponse, error) { + res := new(proto.WaitResponse) + canceled, err := s.peer.Wait(c, req.GetId()) + res.Canceled = canceled return res, err } +// Extend extends the workflow deadline. func (s *WoodpeckerServer) Extend(c context.Context, req *proto.ExtendRequest) (*proto.Empty, error) { res := new(proto.Empty) err := s.peer.Extend(c, req.GetId()) @@ -170,6 +181,7 @@ func (s *WoodpeckerServer) Log(c context.Context, req *proto.LogRequest) (*proto return res, err } +// RegisterAgent register our agent to the server. func (s *WoodpeckerServer) RegisterAgent(c context.Context, req *proto.RegisterAgentRequest) (*proto.RegisterAgentResponse, error) { res := new(proto.RegisterAgentResponse) agentInfo := req.GetInfo() @@ -184,11 +196,13 @@ func (s *WoodpeckerServer) RegisterAgent(c context.Context, req *proto.RegisterA return res, err } +// UnregisterAgent unregister our agent from the server. func (s *WoodpeckerServer) UnregisterAgent(ctx context.Context, _ *proto.Empty) (*proto.Empty, error) { err := s.peer.UnregisterAgent(ctx) return new(proto.Empty), err } +// ReportHealth reports health status of the agent to the server. func (s *WoodpeckerServer) ReportHealth(c context.Context, req *proto.ReportHealthRequest) (*proto.Empty, error) { res := new(proto.Empty) err := s.peer.ReportHealth(c, req.GetStatus())