sanitize agent should allow done updates to canceled pipelines (#6394)

This commit is contained in:
6543
2026-04-08 17:23:33 +02:00
committed by GitHub
parent a68860058c
commit 7efe8bf510
6 changed files with 394 additions and 605 deletions

View File

@@ -28,9 +28,8 @@ import (
"go.woodpecker-ci.org/woodpecker/v3/server/store"
)
// UpdateStepStatus updates step status based on agent reports via RPC.
func UpdateStepStatus(ctx context.Context, store store.Store, step *model.Step, state rpc.StepState) error {
log.Debug().Str("StepUUID", step.UUID).Msgf("Update step %#v state %#v", *step, state)
func CalcStepStatus(step model.Step, state rpc.StepState) (_ *model.Step, cancelPipelineFromStep bool, _ error) {
log.Debug().Str("StepUUID", step.UUID).Msgf("Update step %#v state %#v", step, state)
switch step.State {
case model.StatusPending:
@@ -41,7 +40,7 @@ func UpdateStepStatus(ctx context.Context, store store.Store, step *model.Step,
if state.Finished != 0 {
step.Finished = state.Finished
}
return store.StepUpdate(step)
return &step, false, nil
}
// Transition from pending to running when started
@@ -68,10 +67,7 @@ func UpdateStepStatus(ctx context.Context, store store.Store, step *model.Step,
step.State = model.StatusFailure
if step.Failure == model.FailureCancel {
err := cancelPipelineFromStep(ctx, store, step)
if err != nil {
return err
}
cancelPipelineFromStep = true
}
}
}
@@ -92,16 +88,13 @@ func UpdateStepStatus(ctx context.Context, store store.Store, step *model.Step,
step.State = model.StatusFailure
if step.Failure == model.FailureCancel {
err := cancelPipelineFromStep(ctx, store, step)
if err != nil {
return err
}
cancelPipelineFromStep = true
}
}
}
default:
return fmt.Errorf("step has state %s and does not expect rpc state updates", step.State)
return nil, false, fmt.Errorf("step has state %s and does not expect rpc state updates", step.State)
}
// Handle cancellation across both cases
@@ -112,6 +105,24 @@ func UpdateStepStatus(ctx context.Context, store store.Store, step *model.Step,
}
}
return &step, cancelPipelineFromStep, nil
}
// UpdateStepStatus updates step status based on agent reports via RPC.
func UpdateStepStatus(ctx context.Context, store store.Store, step *model.Step, state rpc.StepState) error {
log.Debug().Str("StepUUID", step.UUID).Msgf("Update step %#v state %#v", *step, state)
updatedStep, shouldCancelPipelineFromStep, err := CalcStepStatus(*step, state)
if err != nil {
return err
}
*step = *updatedStep // update step for external callers
if shouldCancelPipelineFromStep {
if err := cancelPipelineFromStep(ctx, store, step); err != nil {
return err
}
}
return store.StepUpdate(step)
}

View File

@@ -17,14 +17,10 @@ package rpc
import "errors"
var (
ErrAgentIllegalPipelineWorkflowReRunStateChange = errors.New("workflow has parent pipeline marked as finished")
ErrAgentIllegalPipelineWorkflowRun = errors.New("workflow has parent pipeline in blocked state")
ErrAgentIllegalWorkflowReRunStateChange = errors.New("workflow was already marked as finished")
ErrAgentIllegalWorkflowRun = errors.New("workflow is currently in blocked state")
ErrAgentIllegalStepReRunStateChange = errors.New("step was already marked as finished")
ErrAgentIllegalStepRun = errors.New("step is currently in blocked state")
ErrAgentIllegalLogStreaming = errors.New("agent can not append logs to a step that is marked not running")
ErrAgentIllegalRepo = errors.New("agent is not allowed to interact with repo")
)

View File

@@ -199,11 +199,8 @@ func (s *RPC) Update(c context.Context, strWorkflowID string, state rpc.StepStat
return err
}
// sanitize agent input
if err := checkPipelineState(currentPipeline); err != nil {
return err
}
if err := checkWorkflowStepStates(workflow, step); err != nil {
// sanitize agent input: only allow step updates that the workflow state permits
if err := checkWorkflowAllowsStepUpdate(workflow.State, step, state); err != nil {
return err
}
@@ -260,11 +257,8 @@ func (s *RPC) Init(c context.Context, strWorkflowID string, state rpc.WorkflowSt
return err
}
// sanitize agent input
if err := checkPipelineState(currentPipeline); err != nil {
return err
}
if err := checkWorkflowStepStates(workflow, nil); err != nil {
// check workflow's own state to prevent re-initializing a finished or blocked workflow
if err := checkWorkflowState(workflow.State); err != nil {
return err
}
@@ -293,7 +287,7 @@ func (s *RPC) Init(c context.Context, strWorkflowID string, state rpc.WorkflowSt
return s.updateAgentLastWork(agent)
}
// Done marks the workflow with the given ID as stope.
// Done marks the workflow with the given ID as stopped.
func (s *RPC) Done(c context.Context, strWorkflowID string, state rpc.WorkflowState) error {
workflowID, err := strconv.ParseInt(strWorkflowID, 10, 64)
if err != nil {
@@ -333,11 +327,8 @@ func (s *RPC) Done(c context.Context, strWorkflowID string, state rpc.WorkflowSt
return err
}
// sanitize agent input
if err := checkPipelineState(currentPipeline); err != nil {
return err
}
if err := checkWorkflowStepStates(workflow, nil); err != nil {
// check workflow's own state to prevent finishing an already-finished or blocked workflow
if err := checkWorkflowState(workflow.State); err != nil {
return err
}

View File

@@ -144,11 +144,18 @@ func TestRPCUpdate(t *testing.T) {
assert.NoError(t, err)
})
t.Run("reject pipeline already succeeded", func(t *testing.T) {
t.Run("allow terminal step update when workflow already finished", func(t *testing.T) {
// When the workflow is already finished, a step update that moves the
// step to a terminal state (e.g. reporting exit code) should be allowed.
mockStore := store_mocks.NewMockStore(t)
mockLogStore := log_mocks.NewMockService(t)
origLogStore := server.Config.Services.LogStore
server.Config.Services.LogStore = mockLogStore
t.Cleanup(func() { server.Config.Services.LogStore = origLogStore })
agent := defaultAgent()
pipeline := defaultPipeline(model.StatusSuccess)
workflow := defaultWorkflow(model.StatusRunning)
pipeline := defaultPipeline(model.StatusRunning)
workflow := defaultWorkflow(model.StatusSuccess) // finished
step := defaultStep(model.StatusRunning)
mockStore.On("WorkflowLoad", int64(30)).Return(workflow, nil)
@@ -156,55 +163,25 @@ func TestRPCUpdate(t *testing.T) {
mockStore.On("AgentFind", int64(1)).Return(agent, nil)
mockStore.On("StepByUUID", "step-uuid-123").Return(step, nil)
mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil)
mockStore.On("StepUpdate", mock.Anything).Return(nil)
mockStore.On("WorkflowGetTree", mock.Anything).Return([]*model.Workflow{workflow}, nil)
mockLogStore.On("StepFinished", mock.Anything).Return()
rpcInst := newTestRPC(t, mockStore)
ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1"))
err := rpcInst.Update(ctx, "30", rpc.StepState{StepUUID: "step-uuid-123"})
assert.ErrorIs(t, err, ErrAgentIllegalPipelineWorkflowReRunStateChange)
// Step reports exit → it will transition to success/failure (terminal)
err := rpcInst.Update(ctx, "30", rpc.StepState{
StepUUID: "step-uuid-123",
Exited: true,
ExitCode: 0,
})
assert.NoError(t, err)
})
t.Run("reject pipeline already failed", func(t *testing.T) {
mockStore := store_mocks.NewMockStore(t)
agent := defaultAgent()
pipeline := defaultPipeline(model.StatusFailure)
workflow := defaultWorkflow(model.StatusRunning)
step := defaultStep(model.StatusRunning)
mockStore.On("WorkflowLoad", int64(30)).Return(workflow, nil)
mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil)
mockStore.On("AgentFind", int64(1)).Return(agent, nil)
mockStore.On("StepByUUID", "step-uuid-123").Return(step, nil)
mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil)
rpcInst := newTestRPC(t, mockStore)
ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1"))
err := rpcInst.Update(ctx, "30", rpc.StepState{StepUUID: "step-uuid-123"})
assert.ErrorIs(t, err, ErrAgentIllegalPipelineWorkflowReRunStateChange)
})
t.Run("reject pipeline blocked", func(t *testing.T) {
mockStore := store_mocks.NewMockStore(t)
agent := defaultAgent()
pipeline := defaultPipeline(model.StatusBlocked)
workflow := defaultWorkflow(model.StatusRunning)
step := defaultStep(model.StatusRunning)
mockStore.On("WorkflowLoad", int64(30)).Return(workflow, nil)
mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil)
mockStore.On("AgentFind", int64(1)).Return(agent, nil)
mockStore.On("StepByUUID", "step-uuid-123").Return(step, nil)
mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil)
rpcInst := newTestRPC(t, mockStore)
ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1"))
err := rpcInst.Update(ctx, "30", rpc.StepState{StepUUID: "step-uuid-123"})
assert.ErrorIs(t, err, ErrAgentIllegalPipelineWorkflowRun)
})
t.Run("reject workflow already finished", func(t *testing.T) {
t.Run("reject non-terminal step update when workflow already finished", func(t *testing.T) {
// When the workflow is already finished, a step update that would keep
// the step in a non-terminal state (e.g. just started, no exit) is rejected.
mockStore := store_mocks.NewMockStore(t)
agent := defaultAgent()
pipeline := defaultPipeline(model.StatusRunning)
@@ -220,16 +197,17 @@ func TestRPCUpdate(t *testing.T) {
rpcInst := newTestRPC(t, mockStore)
ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1"))
// Step reports started but not exited → still running (non-terminal)
err := rpcInst.Update(ctx, "30", rpc.StepState{StepUUID: "step-uuid-123"})
assert.ErrorIs(t, err, ErrAgentIllegalWorkflowReRunStateChange)
})
t.Run("reject step already finished", func(t *testing.T) {
t.Run("reject step update when workflow blocked", func(t *testing.T) {
mockStore := store_mocks.NewMockStore(t)
agent := defaultAgent()
pipeline := defaultPipeline(model.StatusRunning)
workflow := defaultWorkflow(model.StatusRunning)
step := defaultStep(model.StatusSuccess) // finished
pipeline := defaultPipeline(model.StatusBlocked)
workflow := defaultWorkflow(model.StatusBlocked)
step := defaultStep(model.StatusRunning)
mockStore.On("WorkflowLoad", int64(30)).Return(workflow, nil)
mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil)
@@ -241,7 +219,7 @@ func TestRPCUpdate(t *testing.T) {
ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1"))
err := rpcInst.Update(ctx, "30", rpc.StepState{StepUUID: "step-uuid-123"})
assert.ErrorIs(t, err, ErrAgentIllegalStepReRunStateChange)
assert.ErrorIs(t, err, ErrAgentIllegalWorkflowReRunStateChange)
})
t.Run("reject step belongs to different pipeline", func(t *testing.T) {
@@ -400,42 +378,6 @@ func TestRPCInit(t *testing.T) {
assert.NoError(t, err)
})
t.Run("reject pipeline already succeeded", func(t *testing.T) {
mockStore := store_mocks.NewMockStore(t)
agent := defaultAgent()
pipeline := defaultPipeline(model.StatusSuccess)
workflow := defaultWorkflow(model.StatusPending)
mockStore.On("WorkflowLoad", int64(30)).Return(workflow, nil)
mockStore.On("AgentFind", int64(1)).Return(agent, nil)
mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil)
mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil)
rpcInst := newTestRPC(t, mockStore)
ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1"))
err := rpcInst.Init(ctx, "30", rpc.WorkflowState{Started: 100})
assert.ErrorIs(t, err, ErrAgentIllegalPipelineWorkflowReRunStateChange)
})
t.Run("reject pipeline blocked", func(t *testing.T) {
mockStore := store_mocks.NewMockStore(t)
agent := defaultAgent()
pipeline := defaultPipeline(model.StatusBlocked)
workflow := defaultWorkflow(model.StatusPending)
mockStore.On("WorkflowLoad", int64(30)).Return(workflow, nil)
mockStore.On("AgentFind", int64(1)).Return(agent, nil)
mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil)
mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil)
rpcInst := newTestRPC(t, mockStore)
ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1"))
err := rpcInst.Init(ctx, "30", rpc.WorkflowState{Started: 100})
assert.ErrorIs(t, err, ErrAgentIllegalPipelineWorkflowRun)
})
t.Run("reject workflow already finished", func(t *testing.T) {
mockStore := store_mocks.NewMockStore(t)
agent := defaultAgent()
@@ -536,63 +478,6 @@ func TestRPCDone(t *testing.T) {
assert.NoError(t, err)
})
t.Run("reject pipeline already succeeded", func(t *testing.T) {
mockStore := store_mocks.NewMockStore(t)
agent := defaultAgent()
pipeline := defaultPipeline(model.StatusSuccess)
workflow := defaultWorkflow(model.StatusRunning)
mockStore.On("WorkflowLoad", int64(30)).Return(workflow, nil)
mockStore.On("StepListFromWorkflowFind", mock.Anything).Return([]*model.Step{}, nil)
mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil)
mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil)
mockStore.On("AgentFind", int64(1)).Return(agent, nil)
rpcInst := newTestRPC(t, mockStore)
ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1"))
err := rpcInst.Done(ctx, "30", rpc.WorkflowState{Finished: 200})
assert.ErrorIs(t, err, ErrAgentIllegalPipelineWorkflowReRunStateChange)
})
t.Run("reject pipeline killed", func(t *testing.T) {
mockStore := store_mocks.NewMockStore(t)
agent := defaultAgent()
pipeline := defaultPipeline(model.StatusKilled)
workflow := defaultWorkflow(model.StatusRunning)
mockStore.On("WorkflowLoad", int64(30)).Return(workflow, nil)
mockStore.On("StepListFromWorkflowFind", mock.Anything).Return([]*model.Step{}, nil)
mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil)
mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil)
mockStore.On("AgentFind", int64(1)).Return(agent, nil)
rpcInst := newTestRPC(t, mockStore)
ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1"))
err := rpcInst.Done(ctx, "30", rpc.WorkflowState{Finished: 200})
assert.ErrorIs(t, err, ErrAgentIllegalPipelineWorkflowReRunStateChange)
})
t.Run("reject pipeline blocked", func(t *testing.T) {
mockStore := store_mocks.NewMockStore(t)
agent := defaultAgent()
pipeline := defaultPipeline(model.StatusBlocked)
workflow := defaultWorkflow(model.StatusRunning)
mockStore.On("WorkflowLoad", int64(30)).Return(workflow, nil)
mockStore.On("StepListFromWorkflowFind", mock.Anything).Return([]*model.Step{}, nil)
mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil)
mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil)
mockStore.On("AgentFind", int64(1)).Return(agent, nil)
rpcInst := newTestRPC(t, mockStore)
ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1"))
err := rpcInst.Done(ctx, "30", rpc.WorkflowState{Finished: 200})
assert.ErrorIs(t, err, ErrAgentIllegalPipelineWorkflowRun)
})
t.Run("reject workflow already finished", func(t *testing.T) {
mockStore := store_mocks.NewMockStore(t)
agent := defaultAgent()
@@ -612,6 +497,25 @@ func TestRPCDone(t *testing.T) {
assert.ErrorIs(t, err, ErrAgentIllegalWorkflowReRunStateChange)
})
t.Run("reject workflow blocked", func(t *testing.T) {
mockStore := store_mocks.NewMockStore(t)
agent := defaultAgent()
pipeline := defaultPipeline(model.StatusRunning)
workflow := defaultWorkflow(model.StatusBlocked)
mockStore.On("WorkflowLoad", int64(30)).Return(workflow, nil)
mockStore.On("StepListFromWorkflowFind", mock.Anything).Return([]*model.Step{}, nil)
mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil)
mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil)
mockStore.On("AgentFind", int64(1)).Return(agent, nil)
rpcInst := newTestRPC(t, mockStore)
ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1"))
err := rpcInst.Done(ctx, "30", rpc.WorkflowState{Finished: 200})
assert.ErrorIs(t, err, ErrAgentIllegalWorkflowRun)
})
t.Run("reject agent wrong org", func(t *testing.T) {
mockStore := store_mocks.NewMockStore(t)
agent := orgAgent999()
@@ -769,10 +673,6 @@ func TestRPCLog(t *testing.T) {
})
t.Run("reject: pipeline finished stale and step not running", func(t *testing.T) {
// This replaces the old "reject pipeline already finished" test.
// Previously the rejection came from checkPipelineState returning
// ErrAgentIllegalPipelineWorkflowReRunStateChange.
// Now it comes from allowAppendingLogs returning ErrAgentIllegalLogStreaming.
mockStore := store_mocks.NewMockStore(t)
agent := defaultAgent()
pipeline := stalePipeline(model.StatusSuccess)
@@ -792,9 +692,6 @@ func TestRPCLog(t *testing.T) {
require.Error(t, err)
assert.Contains(t, err.Error(), "can not alter logs")
assert.ErrorIs(t, err, ErrAgentIllegalLogStreaming)
// The old error is no longer returned from Log() — allowAppendingLogs
// now handles the pipeline-finished case itself.
assert.False(t, errors.Is(err, ErrAgentIllegalPipelineWorkflowReRunStateChange))
})
t.Run("reject: pipeline failed stale and step not running", func(t *testing.T) {

View File

@@ -16,21 +16,22 @@ package rpc
import (
"context"
"errors"
"fmt"
"strconv"
"time"
"github.com/rs/zerolog/log"
"go.woodpecker-ci.org/woodpecker/v3/rpc"
"go.woodpecker-ci.org/woodpecker/v3/server/model"
"go.woodpecker-ci.org/woodpecker/v3/server/pipeline"
)
const logStreamDelayAllowed = 5 * time.Minute
func (s *RPC) checkAgentPermissionByWorkflow(_ context.Context, agent *model.Agent, strWorkflowID string, pipeline *model.Pipeline, repo *model.Repo) error {
func (s *RPC) checkAgentPermissionByWorkflow(_ context.Context, agent *model.Agent, strWorkflowID string, p *model.Pipeline, repo *model.Repo) error {
var err error
if repo == nil && pipeline == nil {
if repo == nil && p == nil {
workflowID, err := strconv.ParseInt(strWorkflowID, 10, 64)
if err != nil {
return err
@@ -42,7 +43,7 @@ func (s *RPC) checkAgentPermissionByWorkflow(_ context.Context, agent *model.Age
return err
}
pipeline, err = s.store.GetPipeline(workflow.PipelineID)
p, err = s.store.GetPipeline(workflow.PipelineID)
if err != nil {
log.Error().Err(err).Msgf("cannot find pipeline with id %d", workflow.PipelineID)
return err
@@ -50,9 +51,9 @@ func (s *RPC) checkAgentPermissionByWorkflow(_ context.Context, agent *model.Age
}
if repo == nil {
repo, err = s.store.GetRepo(pipeline.RepoID)
repo, err = s.store.GetRepo(p.RepoID)
if err != nil {
log.Error().Err(err).Msgf("cannot find repo with id %d", pipeline.RepoID)
log.Error().Err(err).Msgf("cannot find repo with id %d", p.RepoID)
return err
}
}
@@ -61,65 +62,78 @@ func (s *RPC) checkAgentPermissionByWorkflow(_ context.Context, agent *model.Age
return nil
}
msg := fmt.Sprintf("agent '%d' is not allowed to interact with repo[%d] '%s'", agent.ID, repo.ID, repo.FullName)
log.Error().Int64("repoId", repo.ID).Msg(msg)
return errors.New(msg)
log.Error().Err(ErrAgentIllegalRepo).Int64("agentID", agent.ID).Int64("repoId", repo.ID).Send()
return fmt.Errorf("%w: agentId=%d repoID=%d", ErrAgentIllegalRepo, agent.ID, repo.ID)
}
// checkPipelineState checks if an agent is allowed to change/update a workflow/pipeline state
// by the state the parent pipeline is in.
func checkPipelineState(currPipeline *model.Pipeline) (err error) {
// check if pipeline was already run and marked finished or is blocked
switch currPipeline.Status {
// isActiveState returns true for states where work is in progress or not yet started.
func isActiveState(state model.StatusValue) bool {
switch state {
case model.StatusCreated,
model.StatusPending,
model.StatusRunning:
break
case model.StatusBlocked:
err = ErrAgentIllegalPipelineWorkflowRun
return true
default:
err = ErrAgentIllegalPipelineWorkflowReRunStateChange
return false
}
if err != nil {
log.Error().Err(err).Msg("caught agent performing illegal instruction")
}
return err
}
// checkWorkflowStepStates checks if a workflow/step state or its logs can be altered
// depending on what state the workflow and step currently is in.
func checkWorkflowStepStates(currWorkflow *model.Workflow, currStep *model.Step) (err error) {
if currWorkflow != nil {
switch currWorkflow.State {
case model.StatusCreated,
model.StatusPending,
model.StatusRunning:
break
// isDoneState returns true for terminal states where no further work will happen.
func isDoneState(state model.StatusValue) bool {
switch state {
case model.StatusSuccess,
model.StatusFailure,
model.StatusKilled,
model.StatusCanceled,
model.StatusSkipped,
model.StatusError,
model.StatusDeclined:
return true
default:
return false
}
}
case model.StatusBlocked:
err = ErrAgentIllegalWorkflowRun
default:
err = ErrAgentIllegalWorkflowReRunStateChange
}
// checkWorkflowAllowsStepUpdate validates whether the workflow state permits
// the given step state update. If the workflow is active (created/pending/running),
// any step update is allowed. If the workflow is in a terminal state, only
// updates that would move the step into a terminal state are permitted — this
// lets the agent report final results for steps that completed after the
// workflow was already marked done.
func checkWorkflowAllowsStepUpdate(workflowState model.StatusValue, step *model.Step, state rpc.StepState) error {
if isActiveState(workflowState) {
return nil
}
if currStep != nil {
switch currStep.State {
case model.StatusCreated,
model.StatusPending,
model.StatusRunning:
break
newStep, _, err := pipeline.CalcStepStatus(*step, state)
if err != nil {
return err
}
if isDoneState(newStep.State) {
return nil
}
case model.StatusBlocked:
err = errors.Join(err, ErrAgentIllegalStepRun)
retErr := ErrAgentIllegalWorkflowReRunStateChange
log.Error().Err(retErr).Msg("caught agent performing illegal instruction")
return retErr
}
default:
err = errors.Join(err, ErrAgentIllegalStepReRunStateChange)
}
// checkWorkflowState checks if a workflow's own state allows it to be
// initialized or marked as done. A workflow that is already in a terminal
// state (success, failure, killed, …) must not be re-run, and a blocked
// workflow must not be started by an agent.
func checkWorkflowState(state model.StatusValue) (err error) {
switch state {
case model.StatusCreated,
model.StatusPending,
model.StatusRunning:
return nil
case model.StatusBlocked:
err = ErrAgentIllegalWorkflowRun
default:
err = ErrAgentIllegalWorkflowReRunStateChange
}
if err != nil {

View File

@@ -15,419 +15,299 @@
package rpc
import (
"errors"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.woodpecker-ci.org/woodpecker/v3/rpc"
"go.woodpecker-ci.org/woodpecker/v3/server/model"
)
func TestCheckPipelineState(t *testing.T) {
func TestCheckWorkflowAllowsStepUpdate(t *testing.T) {
t.Parallel()
tests := []struct {
name string
status model.StatusValue
wantErr error
expectNil bool
}{
{
name: "created is allowed",
status: model.StatusCreated,
expectNil: true,
},
{
name: "pending is allowed",
status: model.StatusPending,
expectNil: true,
},
{
name: "running is allowed",
status: model.StatusRunning,
expectNil: true,
},
{
name: "blocked is rejected",
status: model.StatusBlocked,
wantErr: ErrAgentIllegalPipelineWorkflowRun,
},
{
name: "success is rejected as re-run",
status: model.StatusSuccess,
wantErr: ErrAgentIllegalPipelineWorkflowReRunStateChange,
},
{
name: "failure is rejected as re-run",
status: model.StatusFailure,
wantErr: ErrAgentIllegalPipelineWorkflowReRunStateChange,
},
{
name: "killed is rejected as re-run",
status: model.StatusKilled,
wantErr: ErrAgentIllegalPipelineWorkflowReRunStateChange,
},
{
name: "error is rejected as re-run",
status: model.StatusError,
wantErr: ErrAgentIllegalPipelineWorkflowReRunStateChange,
},
{
name: "skipped is rejected as re-run",
status: model.StatusSkipped,
wantErr: ErrAgentIllegalPipelineWorkflowReRunStateChange,
},
{
name: "declined is rejected as re-run",
status: model.StatusDeclined,
wantErr: ErrAgentIllegalPipelineWorkflowReRunStateChange,
},
t.Run("workflow running allows any step update", func(t *testing.T) {
t.Parallel()
step := &model.Step{State: model.StatusRunning}
// Non-terminal update (step stays running)
assert.NoError(t, checkWorkflowAllowsStepUpdate(model.StatusRunning, step, rpc.StepState{}))
})
t.Run("workflow pending allows any step update", func(t *testing.T) {
t.Parallel()
step := &model.Step{State: model.StatusPending}
assert.NoError(t, checkWorkflowAllowsStepUpdate(model.StatusPending, step, rpc.StepState{}))
})
t.Run("workflow created allows any step update", func(t *testing.T) {
t.Parallel()
step := &model.Step{State: model.StatusPending}
assert.NoError(t, checkWorkflowAllowsStepUpdate(model.StatusCreated, step, rpc.StepState{}))
})
t.Run("workflow finished allows terminal step update", func(t *testing.T) {
t.Parallel()
step := &model.Step{State: model.StatusRunning}
// Step exits with code 0 → CalcStepStatus produces StatusSuccess (terminal)
state := rpc.StepState{Exited: true, ExitCode: 0}
assert.NoError(t, checkWorkflowAllowsStepUpdate(model.StatusSuccess, step, state))
})
t.Run("workflow finished allows failed step update", func(t *testing.T) {
t.Parallel()
step := &model.Step{State: model.StatusRunning}
state := rpc.StepState{Exited: true, ExitCode: 1}
assert.NoError(t, checkWorkflowAllowsStepUpdate(model.StatusFailure, step, state))
})
t.Run("workflow finished allows canceled step update", func(t *testing.T) {
t.Parallel()
step := &model.Step{State: model.StatusRunning}
state := rpc.StepState{Canceled: true}
assert.NoError(t, checkWorkflowAllowsStepUpdate(model.StatusKilled, step, state))
})
t.Run("workflow finished allows skipped step update", func(t *testing.T) {
t.Parallel()
step := &model.Step{State: model.StatusPending}
state := rpc.StepState{Skipped: true}
assert.NoError(t, checkWorkflowAllowsStepUpdate(model.StatusSuccess, step, state))
})
t.Run("workflow finished rejects non-terminal step update", func(t *testing.T) {
t.Parallel()
step := &model.Step{State: model.StatusRunning}
// No exit, no cancel → step stays Running (non-terminal)
state := rpc.StepState{}
assert.ErrorIs(t, checkWorkflowAllowsStepUpdate(model.StatusSuccess, step, state), ErrAgentIllegalWorkflowReRunStateChange)
})
t.Run("workflow killed rejects non-terminal step update", func(t *testing.T) {
t.Parallel()
step := &model.Step{State: model.StatusRunning}
state := rpc.StepState{}
assert.ErrorIs(t, checkWorkflowAllowsStepUpdate(model.StatusKilled, step, state), ErrAgentIllegalWorkflowReRunStateChange)
})
t.Run("workflow blocked rejects non-terminal step update", func(t *testing.T) {
t.Parallel()
step := &model.Step{State: model.StatusRunning}
state := rpc.StepState{}
assert.ErrorIs(t, checkWorkflowAllowsStepUpdate(model.StatusBlocked, step, state), ErrAgentIllegalWorkflowReRunStateChange)
})
t.Run("workflow finished rejects pending-to-running transition", func(t *testing.T) {
t.Parallel()
step := &model.Step{State: model.StatusPending}
// No skip, no exit → CalcStepStatus produces Running (non-terminal)
state := rpc.StepState{Started: 100}
assert.ErrorIs(t, checkWorkflowAllowsStepUpdate(model.StatusSuccess, step, state), ErrAgentIllegalWorkflowReRunStateChange)
})
}
func TestCheckWorkflowState(t *testing.T) {
t.Parallel()
t.Run("allowed states", func(t *testing.T) {
t.Parallel()
for _, s := range []model.StatusValue{
model.StatusCreated,
model.StatusPending,
model.StatusRunning,
} {
t.Run(string(s), func(t *testing.T) {
t.Parallel()
assert.NoError(t, checkWorkflowState(s))
})
}
})
t.Run("blocked rejects", func(t *testing.T) {
t.Parallel()
assert.ErrorIs(t, checkWorkflowState(model.StatusBlocked), ErrAgentIllegalWorkflowRun)
})
t.Run("terminal states reject", func(t *testing.T) {
t.Parallel()
for _, s := range []model.StatusValue{
model.StatusSuccess,
model.StatusFailure,
model.StatusKilled,
model.StatusError,
model.StatusSkipped,
model.StatusCanceled,
model.StatusDeclined,
} {
t.Run(string(s), func(t *testing.T) {
t.Parallel()
assert.ErrorIs(t, checkWorkflowState(s), ErrAgentIllegalWorkflowReRunStateChange)
})
}
})
}
func TestIsActiveState(t *testing.T) {
t.Parallel()
active := []model.StatusValue{model.StatusCreated, model.StatusPending, model.StatusRunning}
inactive := []model.StatusValue{
model.StatusSuccess, model.StatusFailure, model.StatusKilled,
model.StatusBlocked, model.StatusCanceled, model.StatusSkipped,
model.StatusError, model.StatusDeclined,
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
for _, s := range active {
t.Run(fmt.Sprintf("%s is active", s), func(t *testing.T) {
t.Parallel()
pipeline := &model.Pipeline{Status: tt.status}
err := checkPipelineState(pipeline)
if tt.expectNil {
assert.NoError(t, err)
} else {
assert.ErrorIs(t, err, tt.wantErr)
}
assert.True(t, isActiveState(s))
})
}
for _, s := range inactive {
t.Run(fmt.Sprintf("%s is not active", s), func(t *testing.T) {
t.Parallel()
assert.False(t, isActiveState(s))
})
}
}
func TestCheckWorkflowStepStates(t *testing.T) {
func TestIsDoneState(t *testing.T) {
t.Parallel()
t.Run("workflow only", func(t *testing.T) {
t.Parallel()
done := []model.StatusValue{
model.StatusSuccess, model.StatusFailure, model.StatusKilled,
model.StatusCanceled, model.StatusSkipped, model.StatusError,
model.StatusDeclined,
}
notDone := []model.StatusValue{
model.StatusCreated, model.StatusPending, model.StatusRunning,
model.StatusBlocked,
}
tests := []struct {
name string
state model.StatusValue
wantErr error
}{
{"created allows", model.StatusCreated, nil},
{"pending allows", model.StatusPending, nil},
{"running allows", model.StatusRunning, nil},
{"blocked rejects", model.StatusBlocked, ErrAgentIllegalWorkflowRun},
{"success rejects", model.StatusSuccess, ErrAgentIllegalWorkflowReRunStateChange},
{"failure rejects", model.StatusFailure, ErrAgentIllegalWorkflowReRunStateChange},
{"killed rejects", model.StatusKilled, ErrAgentIllegalWorkflowReRunStateChange},
{"error rejects", model.StatusError, ErrAgentIllegalWorkflowReRunStateChange},
{"skipped rejects", model.StatusSkipped, ErrAgentIllegalWorkflowReRunStateChange},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
workflow := &model.Workflow{State: tt.state}
err := checkWorkflowStepStates(workflow, nil)
if tt.wantErr == nil {
assert.NoError(t, err)
} else {
assert.ErrorIs(t, err, tt.wantErr)
}
})
}
})
t.Run("step only (nil workflow)", func(t *testing.T) {
t.Parallel()
tests := []struct {
name string
state model.StatusValue
wantErr error
}{
{"created allows", model.StatusCreated, nil},
{"pending allows", model.StatusPending, nil},
{"running allows", model.StatusRunning, nil},
{"blocked rejects", model.StatusBlocked, ErrAgentIllegalStepRun},
{"success rejects", model.StatusSuccess, ErrAgentIllegalStepReRunStateChange},
{"failure rejects", model.StatusFailure, ErrAgentIllegalStepReRunStateChange},
{"killed rejects", model.StatusKilled, ErrAgentIllegalStepReRunStateChange},
{"error rejects", model.StatusError, ErrAgentIllegalStepReRunStateChange},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
step := &model.Step{State: tt.state}
err := checkWorkflowStepStates(nil, step)
if tt.wantErr == nil {
assert.NoError(t, err)
} else {
assert.ErrorIs(t, err, tt.wantErr)
}
})
}
})
t.Run("nil workflow and nil step", func(t *testing.T) {
t.Parallel()
assert.NoError(t, checkWorkflowStepStates(nil, nil))
})
t.Run("workflow running, step running", func(t *testing.T) {
t.Parallel()
workflow := &model.Workflow{State: model.StatusRunning}
step := &model.Step{State: model.StatusRunning}
assert.NoError(t, checkWorkflowStepStates(workflow, step))
})
t.Run("workflow running, step finished", func(t *testing.T) {
t.Parallel()
workflow := &model.Workflow{State: model.StatusRunning}
step := &model.Step{State: model.StatusSuccess}
err := checkWorkflowStepStates(workflow, step)
assert.ErrorIs(t, err, ErrAgentIllegalStepReRunStateChange)
// should not contain workflow error
assert.False(t, errors.Is(err, ErrAgentIllegalWorkflowReRunStateChange))
})
t.Run("workflow running, step blocked", func(t *testing.T) {
t.Parallel()
workflow := &model.Workflow{State: model.StatusRunning}
step := &model.Step{State: model.StatusBlocked}
err := checkWorkflowStepStates(workflow, step)
assert.ErrorIs(t, err, ErrAgentIllegalStepRun)
})
t.Run("both finished - joined errors", func(t *testing.T) {
t.Parallel()
workflow := &model.Workflow{State: model.StatusSuccess}
step := &model.Step{State: model.StatusSuccess}
err := checkWorkflowStepStates(workflow, step)
assert.ErrorIs(t, err, ErrAgentIllegalWorkflowReRunStateChange)
assert.ErrorIs(t, err, ErrAgentIllegalStepReRunStateChange)
})
t.Run("both blocked - joined errors", func(t *testing.T) {
t.Parallel()
workflow := &model.Workflow{State: model.StatusBlocked}
step := &model.Step{State: model.StatusBlocked}
err := checkWorkflowStepStates(workflow, step)
assert.ErrorIs(t, err, ErrAgentIllegalWorkflowRun)
assert.ErrorIs(t, err, ErrAgentIllegalStepRun)
})
t.Run("workflow finished, step blocked - joined errors", func(t *testing.T) {
t.Parallel()
workflow := &model.Workflow{State: model.StatusKilled}
step := &model.Step{State: model.StatusBlocked}
err := checkWorkflowStepStates(workflow, step)
assert.ErrorIs(t, err, ErrAgentIllegalWorkflowReRunStateChange)
assert.ErrorIs(t, err, ErrAgentIllegalStepRun)
})
t.Run("workflow finished (failure), step finished (failure) - joined errors", func(t *testing.T) {
t.Parallel()
workflow := &model.Workflow{State: model.StatusFailure}
step := &model.Step{State: model.StatusFailure}
err := checkWorkflowStepStates(workflow, step)
assert.ErrorIs(t, err, ErrAgentIllegalWorkflowReRunStateChange)
assert.ErrorIs(t, err, ErrAgentIllegalStepReRunStateChange)
})
for _, s := range done {
t.Run(fmt.Sprintf("%s is done", s), func(t *testing.T) {
t.Parallel()
assert.True(t, isDoneState(s))
})
}
for _, s := range notDone {
t.Run(fmt.Sprintf("%s is not done", s), func(t *testing.T) {
t.Parallel()
assert.False(t, isDoneState(s))
})
}
}
// AllowAppendingLogs — updated for the new (pipeline, step) signature
//
// New logic:
// Allow if step.State == Running (step is actively running)
// Allow if pipeline.Status == Running (pipeline still running, step may
// have just finished but pipeline hasn't caught up yet)
// Allow if pipeline.Finished is within the last logStreamDelayAllowed
// (drain window after a server restart / network blip)
// Reject otherwise.
func TestAllowAppendingLogs(t *testing.T) {
t.Parallel()
// recentFinish is a pipeline.Finished timestamp just 30 seconds ago —
// well within the 5-minute drain window.
recentFinish := time.Now().Add(-30 * time.Second).Unix()
// staleFinish is a pipeline.Finished timestamp 10 minutes ago —
// outside the drain window.
staleFinish := time.Now().Add(-10 * time.Minute).Unix()
tests := []struct {
name string
pipelineStatus model.StatusValue
pipelineFinish int64
stepState model.StatusValue
wantErr error
}{
// --- step is running: always allowed regardless of pipeline state ----
{
name: "step running, pipeline running → allow",
pipelineStatus: model.StatusRunning,
stepState: model.StatusRunning,
},
{
name: "step running, pipeline success → allow (step takes priority)",
pipelineStatus: model.StatusSuccess,
pipelineFinish: staleFinish,
stepState: model.StatusRunning,
},
{
name: "step running, pipeline failure → allow",
pipelineStatus: model.StatusFailure,
pipelineFinish: staleFinish,
stepState: model.StatusRunning,
},
{
name: "step running, pipeline killed → allow",
pipelineStatus: model.StatusKilled,
pipelineFinish: staleFinish,
stepState: model.StatusRunning,
},
// Step running always allows logs, regardless of pipeline state or age.
t.Run("step running always allowed", func(t *testing.T) {
t.Parallel()
// --- pipeline still running: allow even if step finished ------------
{
name: "step success, pipeline still running → allow",
pipelineStatus: model.StatusRunning,
stepState: model.StatusSuccess,
},
{
name: "step failure, pipeline still running → allow",
pipelineStatus: model.StatusRunning,
stepState: model.StatusFailure,
},
{
name: "step pending, pipeline still running → allow",
pipelineStatus: model.StatusRunning,
stepState: model.StatusPending,
},
{
name: "step killed, pipeline still running → allow",
pipelineStatus: model.StatusRunning,
stepState: model.StatusKilled,
},
for _, tc := range []struct {
name string
status model.StatusValue
finish int64
}{
{"pipeline running", model.StatusRunning, 0},
{"pipeline success stale", model.StatusSuccess, staleFinish},
{"pipeline failure stale", model.StatusFailure, staleFinish},
{"pipeline killed stale", model.StatusKilled, staleFinish},
} {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
p := &model.Pipeline{Status: tc.status, Finished: tc.finish}
assert.NoError(t, allowAppendingLogs(p, &model.Step{State: model.StatusRunning}))
})
}
})
// --- pipeline finished recently: drain window allows logs -----------
{
name: "step success, pipeline finished recently → allow (drain window)",
pipelineStatus: model.StatusSuccess,
pipelineFinish: recentFinish,
stepState: model.StatusSuccess,
},
{
name: "step failure, pipeline failed recently → allow (drain window)",
pipelineStatus: model.StatusFailure,
pipelineFinish: recentFinish,
stepState: model.StatusFailure,
},
{
name: "step pending, pipeline killed recently → allow (drain window)",
pipelineStatus: model.StatusKilled,
pipelineFinish: recentFinish,
stepState: model.StatusPending,
},
// Pipeline running allows logs for any step state.
t.Run("pipeline running any step allowed", func(t *testing.T) {
t.Parallel()
// --- pipeline finished and drain window expired: reject -------------
{
name: "step success, pipeline success, stale finish → reject",
pipelineStatus: model.StatusSuccess,
pipelineFinish: staleFinish,
stepState: model.StatusSuccess,
wantErr: ErrAgentIllegalLogStreaming,
},
{
name: "step failure, pipeline failure, stale finish → reject",
pipelineStatus: model.StatusFailure,
pipelineFinish: staleFinish,
stepState: model.StatusFailure,
wantErr: ErrAgentIllegalLogStreaming,
},
{
name: "step pending, pipeline killed, stale finish → reject",
pipelineStatus: model.StatusKilled,
pipelineFinish: staleFinish,
stepState: model.StatusPending,
wantErr: ErrAgentIllegalLogStreaming,
},
{
name: "step created, pipeline error, stale finish → reject",
pipelineStatus: model.StatusError,
pipelineFinish: staleFinish,
stepState: model.StatusCreated,
wantErr: ErrAgentIllegalLogStreaming,
},
for _, ss := range []model.StatusValue{
model.StatusSuccess, model.StatusFailure, model.StatusPending, model.StatusKilled,
} {
t.Run(string(ss), func(t *testing.T) {
t.Parallel()
p := &model.Pipeline{Status: model.StatusRunning}
assert.NoError(t, allowAppendingLogs(p, &model.Step{State: ss}))
})
}
})
// --- zero Finished timestamp (never recorded): outside drain window -
{
name: "step success, pipeline success, Finished=0 → reject",
pipelineStatus: model.StatusSuccess,
pipelineFinish: 0,
stepState: model.StatusSuccess,
wantErr: ErrAgentIllegalLogStreaming,
},
}
// Recent finish → drain window allows logs.
t.Run("recent finish drain allowed", func(t *testing.T) {
t.Parallel()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
for _, tc := range []struct {
pStatus model.StatusValue
sState model.StatusValue
}{
{model.StatusSuccess, model.StatusSuccess},
{model.StatusFailure, model.StatusFailure},
{model.StatusKilled, model.StatusPending},
} {
t.Run(fmt.Sprintf("%s/%s", tc.pStatus, tc.sState), func(t *testing.T) {
t.Parallel()
p := &model.Pipeline{Status: tc.pStatus, Finished: recentFinish}
assert.NoError(t, allowAppendingLogs(p, &model.Step{State: tc.sState}))
})
}
})
pipeline := &model.Pipeline{
Status: tt.pipelineStatus,
Finished: tt.pipelineFinish,
}
step := &model.Step{State: tt.stepState}
// Stale finish → drain window expired → reject.
t.Run("stale finish drain rejected", func(t *testing.T) {
t.Parallel()
err := allowAppendingLogs(pipeline, step)
if tt.wantErr == nil {
assert.NoError(t, err)
} else {
assert.ErrorIs(t, err, tt.wantErr)
}
})
}
for _, tc := range []struct {
pStatus model.StatusValue
sState model.StatusValue
finish int64
}{
{model.StatusSuccess, model.StatusSuccess, staleFinish},
{model.StatusFailure, model.StatusFailure, staleFinish},
{model.StatusKilled, model.StatusPending, staleFinish},
{model.StatusError, model.StatusCreated, staleFinish},
{model.StatusSuccess, model.StatusSuccess, 0}, // zero = never recorded
} {
t.Run(fmt.Sprintf("%s/%s/fin=%d", tc.pStatus, tc.sState, tc.finish), func(t *testing.T) {
t.Parallel()
p := &model.Pipeline{Status: tc.pStatus, Finished: tc.finish}
assert.ErrorIs(t, allowAppendingLogs(p, &model.Step{State: tc.sState}), ErrAgentIllegalLogStreaming)
})
}
})
}
// TestAllowAppendingLogsDrainBoundary checks the exact boundary of the
// 5-minute drain window to guard against off-by-one errors.
// TestAllowAppendingLogsDrainBoundary guards the exact edge of the 5-minute
// drain window against off-by-one errors.
func TestAllowAppendingLogsDrainBoundary(t *testing.T) {
t.Parallel()
step := &model.Step{State: model.StatusSuccess}
t.Run("finished exactly at drain window boundary is allowed", func(t *testing.T) {
t.Run("just inside drain window allowed", func(t *testing.T) {
t.Parallel()
// Finished just barely inside the window (1 second of headroom).
finishedAt := time.Now().Add(-(logStreamDelayAllowed - time.Second)).Unix()
pipeline := &model.Pipeline{Status: model.StatusSuccess, Finished: finishedAt}
assert.NoError(t, allowAppendingLogs(pipeline, step))
p := &model.Pipeline{
Status: model.StatusSuccess,
Finished: time.Now().Add(-(logStreamDelayAllowed - time.Second)).Unix(),
}
assert.NoError(t, allowAppendingLogs(p, step))
})
t.Run("finished just outside drain window is rejected", func(t *testing.T) {
t.Run("just outside drain window rejected", func(t *testing.T) {
t.Parallel()
// Finished 1 second past the allowed window.
finishedAt := time.Now().Add(-(logStreamDelayAllowed + time.Second)).Unix()
pipeline := &model.Pipeline{Status: model.StatusSuccess, Finished: finishedAt}
assert.ErrorIs(t, allowAppendingLogs(pipeline, step), ErrAgentIllegalLogStreaming)
p := &model.Pipeline{
Status: model.StatusSuccess,
Finished: time.Now().Add(-(logStreamDelayAllowed + time.Second)).Unix(),
}
assert.ErrorIs(t, allowAppendingLogs(p, step), ErrAgentIllegalLogStreaming)
})
}