From 7efe8bf510cd4587b1492ee260b56a0f5e24bb84 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Wed, 8 Apr 2026 17:23:33 +0200 Subject: [PATCH] sanitize agent should allow done updates to canceled pipelines (#6394) --- server/pipeline/step_status.go | 37 +- server/rpc/errors.go | 8 +- server/rpc/rpc.go | 23 +- server/rpc/rpc_integration_test.go | 199 +++------- server/rpc/sanitize.go | 116 +++--- server/rpc/sanitize_test.go | 616 ++++++++++++----------------- 6 files changed, 394 insertions(+), 605 deletions(-) diff --git a/server/pipeline/step_status.go b/server/pipeline/step_status.go index 7adbee00b..100a8a00a 100644 --- a/server/pipeline/step_status.go +++ b/server/pipeline/step_status.go @@ -28,9 +28,8 @@ import ( "go.woodpecker-ci.org/woodpecker/v3/server/store" ) -// UpdateStepStatus updates step status based on agent reports via RPC. -func UpdateStepStatus(ctx context.Context, store store.Store, step *model.Step, state rpc.StepState) error { - log.Debug().Str("StepUUID", step.UUID).Msgf("Update step %#v state %#v", *step, state) +func CalcStepStatus(step model.Step, state rpc.StepState) (_ *model.Step, cancelPipelineFromStep bool, _ error) { + log.Debug().Str("StepUUID", step.UUID).Msgf("Update step %#v state %#v", step, state) switch step.State { case model.StatusPending: @@ -41,7 +40,7 @@ func UpdateStepStatus(ctx context.Context, store store.Store, step *model.Step, if state.Finished != 0 { step.Finished = state.Finished } - return store.StepUpdate(step) + return &step, false, nil } // Transition from pending to running when started @@ -68,10 +67,7 @@ func UpdateStepStatus(ctx context.Context, store store.Store, step *model.Step, step.State = model.StatusFailure if step.Failure == model.FailureCancel { - err := cancelPipelineFromStep(ctx, store, step) - if err != nil { - return err - } + cancelPipelineFromStep = true } } } @@ -92,16 +88,13 @@ func UpdateStepStatus(ctx context.Context, store store.Store, step *model.Step, step.State = model.StatusFailure if step.Failure == model.FailureCancel { - err := cancelPipelineFromStep(ctx, store, step) - if err != nil { - return err - } + cancelPipelineFromStep = true } } } default: - return fmt.Errorf("step has state %s and does not expect rpc state updates", step.State) + return nil, false, fmt.Errorf("step has state %s and does not expect rpc state updates", step.State) } // Handle cancellation across both cases @@ -112,6 +105,24 @@ func UpdateStepStatus(ctx context.Context, store store.Store, step *model.Step, } } + return &step, cancelPipelineFromStep, nil +} + +// UpdateStepStatus updates step status based on agent reports via RPC. +func UpdateStepStatus(ctx context.Context, store store.Store, step *model.Step, state rpc.StepState) error { + log.Debug().Str("StepUUID", step.UUID).Msgf("Update step %#v state %#v", *step, state) + + updatedStep, shouldCancelPipelineFromStep, err := CalcStepStatus(*step, state) + if err != nil { + return err + } + *step = *updatedStep // update step for external callers + + if shouldCancelPipelineFromStep { + if err := cancelPipelineFromStep(ctx, store, step); err != nil { + return err + } + } return store.StepUpdate(step) } diff --git a/server/rpc/errors.go b/server/rpc/errors.go index 2065904a7..9955d437d 100644 --- a/server/rpc/errors.go +++ b/server/rpc/errors.go @@ -17,14 +17,10 @@ package rpc import "errors" var ( - ErrAgentIllegalPipelineWorkflowReRunStateChange = errors.New("workflow has parent pipeline marked as finished") - ErrAgentIllegalPipelineWorkflowRun = errors.New("workflow has parent pipeline in blocked state") - ErrAgentIllegalWorkflowReRunStateChange = errors.New("workflow was already marked as finished") ErrAgentIllegalWorkflowRun = errors.New("workflow is currently in blocked state") - ErrAgentIllegalStepReRunStateChange = errors.New("step was already marked as finished") - ErrAgentIllegalStepRun = errors.New("step is currently in blocked state") - ErrAgentIllegalLogStreaming = errors.New("agent can not append logs to a step that is marked not running") + + ErrAgentIllegalRepo = errors.New("agent is not allowed to interact with repo") ) diff --git a/server/rpc/rpc.go b/server/rpc/rpc.go index 0bf0e2c63..d3f09082c 100644 --- a/server/rpc/rpc.go +++ b/server/rpc/rpc.go @@ -199,11 +199,8 @@ func (s *RPC) Update(c context.Context, strWorkflowID string, state rpc.StepStat return err } - // sanitize agent input - if err := checkPipelineState(currentPipeline); err != nil { - return err - } - if err := checkWorkflowStepStates(workflow, step); err != nil { + // sanitize agent input: only allow step updates that the workflow state permits + if err := checkWorkflowAllowsStepUpdate(workflow.State, step, state); err != nil { return err } @@ -260,11 +257,8 @@ func (s *RPC) Init(c context.Context, strWorkflowID string, state rpc.WorkflowSt return err } - // sanitize agent input - if err := checkPipelineState(currentPipeline); err != nil { - return err - } - if err := checkWorkflowStepStates(workflow, nil); err != nil { + // check workflow's own state to prevent re-initializing a finished or blocked workflow + if err := checkWorkflowState(workflow.State); err != nil { return err } @@ -293,7 +287,7 @@ func (s *RPC) Init(c context.Context, strWorkflowID string, state rpc.WorkflowSt return s.updateAgentLastWork(agent) } -// Done marks the workflow with the given ID as stope. +// Done marks the workflow with the given ID as stopped. func (s *RPC) Done(c context.Context, strWorkflowID string, state rpc.WorkflowState) error { workflowID, err := strconv.ParseInt(strWorkflowID, 10, 64) if err != nil { @@ -333,11 +327,8 @@ func (s *RPC) Done(c context.Context, strWorkflowID string, state rpc.WorkflowSt return err } - // sanitize agent input - if err := checkPipelineState(currentPipeline); err != nil { - return err - } - if err := checkWorkflowStepStates(workflow, nil); err != nil { + // check workflow's own state to prevent finishing an already-finished or blocked workflow + if err := checkWorkflowState(workflow.State); err != nil { return err } diff --git a/server/rpc/rpc_integration_test.go b/server/rpc/rpc_integration_test.go index f05204eca..26ec74589 100644 --- a/server/rpc/rpc_integration_test.go +++ b/server/rpc/rpc_integration_test.go @@ -144,11 +144,18 @@ func TestRPCUpdate(t *testing.T) { assert.NoError(t, err) }) - t.Run("reject pipeline already succeeded", func(t *testing.T) { + t.Run("allow terminal step update when workflow already finished", func(t *testing.T) { + // When the workflow is already finished, a step update that moves the + // step to a terminal state (e.g. reporting exit code) should be allowed. mockStore := store_mocks.NewMockStore(t) + mockLogStore := log_mocks.NewMockService(t) + origLogStore := server.Config.Services.LogStore + server.Config.Services.LogStore = mockLogStore + t.Cleanup(func() { server.Config.Services.LogStore = origLogStore }) + agent := defaultAgent() - pipeline := defaultPipeline(model.StatusSuccess) - workflow := defaultWorkflow(model.StatusRunning) + pipeline := defaultPipeline(model.StatusRunning) + workflow := defaultWorkflow(model.StatusSuccess) // finished step := defaultStep(model.StatusRunning) mockStore.On("WorkflowLoad", int64(30)).Return(workflow, nil) @@ -156,55 +163,25 @@ func TestRPCUpdate(t *testing.T) { mockStore.On("AgentFind", int64(1)).Return(agent, nil) mockStore.On("StepByUUID", "step-uuid-123").Return(step, nil) mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) + mockStore.On("StepUpdate", mock.Anything).Return(nil) + mockStore.On("WorkflowGetTree", mock.Anything).Return([]*model.Workflow{workflow}, nil) + mockLogStore.On("StepFinished", mock.Anything).Return() rpcInst := newTestRPC(t, mockStore) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) - err := rpcInst.Update(ctx, "30", rpc.StepState{StepUUID: "step-uuid-123"}) - assert.ErrorIs(t, err, ErrAgentIllegalPipelineWorkflowReRunStateChange) + // Step reports exit → it will transition to success/failure (terminal) + err := rpcInst.Update(ctx, "30", rpc.StepState{ + StepUUID: "step-uuid-123", + Exited: true, + ExitCode: 0, + }) + assert.NoError(t, err) }) - t.Run("reject pipeline already failed", func(t *testing.T) { - mockStore := store_mocks.NewMockStore(t) - agent := defaultAgent() - pipeline := defaultPipeline(model.StatusFailure) - workflow := defaultWorkflow(model.StatusRunning) - step := defaultStep(model.StatusRunning) - - mockStore.On("WorkflowLoad", int64(30)).Return(workflow, nil) - mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil) - mockStore.On("AgentFind", int64(1)).Return(agent, nil) - mockStore.On("StepByUUID", "step-uuid-123").Return(step, nil) - mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) - - rpcInst := newTestRPC(t, mockStore) - ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) - - err := rpcInst.Update(ctx, "30", rpc.StepState{StepUUID: "step-uuid-123"}) - assert.ErrorIs(t, err, ErrAgentIllegalPipelineWorkflowReRunStateChange) - }) - - t.Run("reject pipeline blocked", func(t *testing.T) { - mockStore := store_mocks.NewMockStore(t) - agent := defaultAgent() - pipeline := defaultPipeline(model.StatusBlocked) - workflow := defaultWorkflow(model.StatusRunning) - step := defaultStep(model.StatusRunning) - - mockStore.On("WorkflowLoad", int64(30)).Return(workflow, nil) - mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil) - mockStore.On("AgentFind", int64(1)).Return(agent, nil) - mockStore.On("StepByUUID", "step-uuid-123").Return(step, nil) - mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) - - rpcInst := newTestRPC(t, mockStore) - ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) - - err := rpcInst.Update(ctx, "30", rpc.StepState{StepUUID: "step-uuid-123"}) - assert.ErrorIs(t, err, ErrAgentIllegalPipelineWorkflowRun) - }) - - t.Run("reject workflow already finished", func(t *testing.T) { + t.Run("reject non-terminal step update when workflow already finished", func(t *testing.T) { + // When the workflow is already finished, a step update that would keep + // the step in a non-terminal state (e.g. just started, no exit) is rejected. mockStore := store_mocks.NewMockStore(t) agent := defaultAgent() pipeline := defaultPipeline(model.StatusRunning) @@ -220,16 +197,17 @@ func TestRPCUpdate(t *testing.T) { rpcInst := newTestRPC(t, mockStore) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) + // Step reports started but not exited → still running (non-terminal) err := rpcInst.Update(ctx, "30", rpc.StepState{StepUUID: "step-uuid-123"}) assert.ErrorIs(t, err, ErrAgentIllegalWorkflowReRunStateChange) }) - t.Run("reject step already finished", func(t *testing.T) { + t.Run("reject step update when workflow blocked", func(t *testing.T) { mockStore := store_mocks.NewMockStore(t) agent := defaultAgent() - pipeline := defaultPipeline(model.StatusRunning) - workflow := defaultWorkflow(model.StatusRunning) - step := defaultStep(model.StatusSuccess) // finished + pipeline := defaultPipeline(model.StatusBlocked) + workflow := defaultWorkflow(model.StatusBlocked) + step := defaultStep(model.StatusRunning) mockStore.On("WorkflowLoad", int64(30)).Return(workflow, nil) mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil) @@ -241,7 +219,7 @@ func TestRPCUpdate(t *testing.T) { ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) err := rpcInst.Update(ctx, "30", rpc.StepState{StepUUID: "step-uuid-123"}) - assert.ErrorIs(t, err, ErrAgentIllegalStepReRunStateChange) + assert.ErrorIs(t, err, ErrAgentIllegalWorkflowReRunStateChange) }) t.Run("reject step belongs to different pipeline", func(t *testing.T) { @@ -400,42 +378,6 @@ func TestRPCInit(t *testing.T) { assert.NoError(t, err) }) - t.Run("reject pipeline already succeeded", func(t *testing.T) { - mockStore := store_mocks.NewMockStore(t) - agent := defaultAgent() - pipeline := defaultPipeline(model.StatusSuccess) - workflow := defaultWorkflow(model.StatusPending) - - mockStore.On("WorkflowLoad", int64(30)).Return(workflow, nil) - mockStore.On("AgentFind", int64(1)).Return(agent, nil) - mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil) - mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) - - rpcInst := newTestRPC(t, mockStore) - ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) - - err := rpcInst.Init(ctx, "30", rpc.WorkflowState{Started: 100}) - assert.ErrorIs(t, err, ErrAgentIllegalPipelineWorkflowReRunStateChange) - }) - - t.Run("reject pipeline blocked", func(t *testing.T) { - mockStore := store_mocks.NewMockStore(t) - agent := defaultAgent() - pipeline := defaultPipeline(model.StatusBlocked) - workflow := defaultWorkflow(model.StatusPending) - - mockStore.On("WorkflowLoad", int64(30)).Return(workflow, nil) - mockStore.On("AgentFind", int64(1)).Return(agent, nil) - mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil) - mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) - - rpcInst := newTestRPC(t, mockStore) - ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) - - err := rpcInst.Init(ctx, "30", rpc.WorkflowState{Started: 100}) - assert.ErrorIs(t, err, ErrAgentIllegalPipelineWorkflowRun) - }) - t.Run("reject workflow already finished", func(t *testing.T) { mockStore := store_mocks.NewMockStore(t) agent := defaultAgent() @@ -536,63 +478,6 @@ func TestRPCDone(t *testing.T) { assert.NoError(t, err) }) - t.Run("reject pipeline already succeeded", func(t *testing.T) { - mockStore := store_mocks.NewMockStore(t) - agent := defaultAgent() - pipeline := defaultPipeline(model.StatusSuccess) - workflow := defaultWorkflow(model.StatusRunning) - - mockStore.On("WorkflowLoad", int64(30)).Return(workflow, nil) - mockStore.On("StepListFromWorkflowFind", mock.Anything).Return([]*model.Step{}, nil) - mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil) - mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) - mockStore.On("AgentFind", int64(1)).Return(agent, nil) - - rpcInst := newTestRPC(t, mockStore) - ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) - - err := rpcInst.Done(ctx, "30", rpc.WorkflowState{Finished: 200}) - assert.ErrorIs(t, err, ErrAgentIllegalPipelineWorkflowReRunStateChange) - }) - - t.Run("reject pipeline killed", func(t *testing.T) { - mockStore := store_mocks.NewMockStore(t) - agent := defaultAgent() - pipeline := defaultPipeline(model.StatusKilled) - workflow := defaultWorkflow(model.StatusRunning) - - mockStore.On("WorkflowLoad", int64(30)).Return(workflow, nil) - mockStore.On("StepListFromWorkflowFind", mock.Anything).Return([]*model.Step{}, nil) - mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil) - mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) - mockStore.On("AgentFind", int64(1)).Return(agent, nil) - - rpcInst := newTestRPC(t, mockStore) - ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) - - err := rpcInst.Done(ctx, "30", rpc.WorkflowState{Finished: 200}) - assert.ErrorIs(t, err, ErrAgentIllegalPipelineWorkflowReRunStateChange) - }) - - t.Run("reject pipeline blocked", func(t *testing.T) { - mockStore := store_mocks.NewMockStore(t) - agent := defaultAgent() - pipeline := defaultPipeline(model.StatusBlocked) - workflow := defaultWorkflow(model.StatusRunning) - - mockStore.On("WorkflowLoad", int64(30)).Return(workflow, nil) - mockStore.On("StepListFromWorkflowFind", mock.Anything).Return([]*model.Step{}, nil) - mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil) - mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) - mockStore.On("AgentFind", int64(1)).Return(agent, nil) - - rpcInst := newTestRPC(t, mockStore) - ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) - - err := rpcInst.Done(ctx, "30", rpc.WorkflowState{Finished: 200}) - assert.ErrorIs(t, err, ErrAgentIllegalPipelineWorkflowRun) - }) - t.Run("reject workflow already finished", func(t *testing.T) { mockStore := store_mocks.NewMockStore(t) agent := defaultAgent() @@ -612,6 +497,25 @@ func TestRPCDone(t *testing.T) { assert.ErrorIs(t, err, ErrAgentIllegalWorkflowReRunStateChange) }) + t.Run("reject workflow blocked", func(t *testing.T) { + mockStore := store_mocks.NewMockStore(t) + agent := defaultAgent() + pipeline := defaultPipeline(model.StatusRunning) + workflow := defaultWorkflow(model.StatusBlocked) + + mockStore.On("WorkflowLoad", int64(30)).Return(workflow, nil) + mockStore.On("StepListFromWorkflowFind", mock.Anything).Return([]*model.Step{}, nil) + mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil) + mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) + mockStore.On("AgentFind", int64(1)).Return(agent, nil) + + rpcInst := newTestRPC(t, mockStore) + ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) + + err := rpcInst.Done(ctx, "30", rpc.WorkflowState{Finished: 200}) + assert.ErrorIs(t, err, ErrAgentIllegalWorkflowRun) + }) + t.Run("reject agent wrong org", func(t *testing.T) { mockStore := store_mocks.NewMockStore(t) agent := orgAgent999() @@ -769,10 +673,6 @@ func TestRPCLog(t *testing.T) { }) t.Run("reject: pipeline finished stale and step not running", func(t *testing.T) { - // This replaces the old "reject pipeline already finished" test. - // Previously the rejection came from checkPipelineState returning - // ErrAgentIllegalPipelineWorkflowReRunStateChange. - // Now it comes from allowAppendingLogs returning ErrAgentIllegalLogStreaming. mockStore := store_mocks.NewMockStore(t) agent := defaultAgent() pipeline := stalePipeline(model.StatusSuccess) @@ -792,9 +692,6 @@ func TestRPCLog(t *testing.T) { require.Error(t, err) assert.Contains(t, err.Error(), "can not alter logs") assert.ErrorIs(t, err, ErrAgentIllegalLogStreaming) - // The old error is no longer returned from Log() — allowAppendingLogs - // now handles the pipeline-finished case itself. - assert.False(t, errors.Is(err, ErrAgentIllegalPipelineWorkflowReRunStateChange)) }) t.Run("reject: pipeline failed stale and step not running", func(t *testing.T) { diff --git a/server/rpc/sanitize.go b/server/rpc/sanitize.go index 84af183a8..e99775c25 100644 --- a/server/rpc/sanitize.go +++ b/server/rpc/sanitize.go @@ -16,21 +16,22 @@ package rpc import ( "context" - "errors" "fmt" "strconv" "time" "github.com/rs/zerolog/log" + "go.woodpecker-ci.org/woodpecker/v3/rpc" "go.woodpecker-ci.org/woodpecker/v3/server/model" + "go.woodpecker-ci.org/woodpecker/v3/server/pipeline" ) const logStreamDelayAllowed = 5 * time.Minute -func (s *RPC) checkAgentPermissionByWorkflow(_ context.Context, agent *model.Agent, strWorkflowID string, pipeline *model.Pipeline, repo *model.Repo) error { +func (s *RPC) checkAgentPermissionByWorkflow(_ context.Context, agent *model.Agent, strWorkflowID string, p *model.Pipeline, repo *model.Repo) error { var err error - if repo == nil && pipeline == nil { + if repo == nil && p == nil { workflowID, err := strconv.ParseInt(strWorkflowID, 10, 64) if err != nil { return err @@ -42,7 +43,7 @@ func (s *RPC) checkAgentPermissionByWorkflow(_ context.Context, agent *model.Age return err } - pipeline, err = s.store.GetPipeline(workflow.PipelineID) + p, err = s.store.GetPipeline(workflow.PipelineID) if err != nil { log.Error().Err(err).Msgf("cannot find pipeline with id %d", workflow.PipelineID) return err @@ -50,9 +51,9 @@ func (s *RPC) checkAgentPermissionByWorkflow(_ context.Context, agent *model.Age } if repo == nil { - repo, err = s.store.GetRepo(pipeline.RepoID) + repo, err = s.store.GetRepo(p.RepoID) if err != nil { - log.Error().Err(err).Msgf("cannot find repo with id %d", pipeline.RepoID) + log.Error().Err(err).Msgf("cannot find repo with id %d", p.RepoID) return err } } @@ -61,65 +62,78 @@ func (s *RPC) checkAgentPermissionByWorkflow(_ context.Context, agent *model.Age return nil } - msg := fmt.Sprintf("agent '%d' is not allowed to interact with repo[%d] '%s'", agent.ID, repo.ID, repo.FullName) - log.Error().Int64("repoId", repo.ID).Msg(msg) - return errors.New(msg) + log.Error().Err(ErrAgentIllegalRepo).Int64("agentID", agent.ID).Int64("repoId", repo.ID).Send() + return fmt.Errorf("%w: agentId=%d repoID=%d", ErrAgentIllegalRepo, agent.ID, repo.ID) } -// checkPipelineState checks if an agent is allowed to change/update a workflow/pipeline state -// by the state the parent pipeline is in. -func checkPipelineState(currPipeline *model.Pipeline) (err error) { - // check if pipeline was already run and marked finished or is blocked - switch currPipeline.Status { +// isActiveState returns true for states where work is in progress or not yet started. +func isActiveState(state model.StatusValue) bool { + switch state { case model.StatusCreated, model.StatusPending, model.StatusRunning: - break - - case model.StatusBlocked: - err = ErrAgentIllegalPipelineWorkflowRun - + return true default: - err = ErrAgentIllegalPipelineWorkflowReRunStateChange + return false } - - if err != nil { - log.Error().Err(err).Msg("caught agent performing illegal instruction") - } - return err } -// checkWorkflowStepStates checks if a workflow/step state or its logs can be altered -// depending on what state the workflow and step currently is in. -func checkWorkflowStepStates(currWorkflow *model.Workflow, currStep *model.Step) (err error) { - if currWorkflow != nil { - switch currWorkflow.State { - case model.StatusCreated, - model.StatusPending, - model.StatusRunning: - break +// isDoneState returns true for terminal states where no further work will happen. +func isDoneState(state model.StatusValue) bool { + switch state { + case model.StatusSuccess, + model.StatusFailure, + model.StatusKilled, + model.StatusCanceled, + model.StatusSkipped, + model.StatusError, + model.StatusDeclined: + return true + default: + return false + } +} - case model.StatusBlocked: - err = ErrAgentIllegalWorkflowRun - - default: - err = ErrAgentIllegalWorkflowReRunStateChange - } +// checkWorkflowAllowsStepUpdate validates whether the workflow state permits +// the given step state update. If the workflow is active (created/pending/running), +// any step update is allowed. If the workflow is in a terminal state, only +// updates that would move the step into a terminal state are permitted — this +// lets the agent report final results for steps that completed after the +// workflow was already marked done. +func checkWorkflowAllowsStepUpdate(workflowState model.StatusValue, step *model.Step, state rpc.StepState) error { + if isActiveState(workflowState) { + return nil } - if currStep != nil { - switch currStep.State { - case model.StatusCreated, - model.StatusPending, - model.StatusRunning: - break + newStep, _, err := pipeline.CalcStepStatus(*step, state) + if err != nil { + return err + } + if isDoneState(newStep.State) { + return nil + } - case model.StatusBlocked: - err = errors.Join(err, ErrAgentIllegalStepRun) + retErr := ErrAgentIllegalWorkflowReRunStateChange + log.Error().Err(retErr).Msg("caught agent performing illegal instruction") + return retErr +} - default: - err = errors.Join(err, ErrAgentIllegalStepReRunStateChange) - } +// checkWorkflowState checks if a workflow's own state allows it to be +// initialized or marked as done. A workflow that is already in a terminal +// state (success, failure, killed, …) must not be re-run, and a blocked +// workflow must not be started by an agent. +func checkWorkflowState(state model.StatusValue) (err error) { + switch state { + case model.StatusCreated, + model.StatusPending, + model.StatusRunning: + return nil + + case model.StatusBlocked: + err = ErrAgentIllegalWorkflowRun + + default: + err = ErrAgentIllegalWorkflowReRunStateChange } if err != nil { diff --git a/server/rpc/sanitize_test.go b/server/rpc/sanitize_test.go index 6791bbf25..03a2ceda8 100644 --- a/server/rpc/sanitize_test.go +++ b/server/rpc/sanitize_test.go @@ -15,419 +15,299 @@ package rpc import ( - "errors" + "fmt" "testing" "time" "github.com/stretchr/testify/assert" + "go.woodpecker-ci.org/woodpecker/v3/rpc" "go.woodpecker-ci.org/woodpecker/v3/server/model" ) -func TestCheckPipelineState(t *testing.T) { +func TestCheckWorkflowAllowsStepUpdate(t *testing.T) { t.Parallel() - tests := []struct { - name string - status model.StatusValue - wantErr error - expectNil bool - }{ - { - name: "created is allowed", - status: model.StatusCreated, - expectNil: true, - }, - { - name: "pending is allowed", - status: model.StatusPending, - expectNil: true, - }, - { - name: "running is allowed", - status: model.StatusRunning, - expectNil: true, - }, - { - name: "blocked is rejected", - status: model.StatusBlocked, - wantErr: ErrAgentIllegalPipelineWorkflowRun, - }, - { - name: "success is rejected as re-run", - status: model.StatusSuccess, - wantErr: ErrAgentIllegalPipelineWorkflowReRunStateChange, - }, - { - name: "failure is rejected as re-run", - status: model.StatusFailure, - wantErr: ErrAgentIllegalPipelineWorkflowReRunStateChange, - }, - { - name: "killed is rejected as re-run", - status: model.StatusKilled, - wantErr: ErrAgentIllegalPipelineWorkflowReRunStateChange, - }, - { - name: "error is rejected as re-run", - status: model.StatusError, - wantErr: ErrAgentIllegalPipelineWorkflowReRunStateChange, - }, - { - name: "skipped is rejected as re-run", - status: model.StatusSkipped, - wantErr: ErrAgentIllegalPipelineWorkflowReRunStateChange, - }, - { - name: "declined is rejected as re-run", - status: model.StatusDeclined, - wantErr: ErrAgentIllegalPipelineWorkflowReRunStateChange, - }, + t.Run("workflow running allows any step update", func(t *testing.T) { + t.Parallel() + step := &model.Step{State: model.StatusRunning} + // Non-terminal update (step stays running) + assert.NoError(t, checkWorkflowAllowsStepUpdate(model.StatusRunning, step, rpc.StepState{})) + }) + + t.Run("workflow pending allows any step update", func(t *testing.T) { + t.Parallel() + step := &model.Step{State: model.StatusPending} + assert.NoError(t, checkWorkflowAllowsStepUpdate(model.StatusPending, step, rpc.StepState{})) + }) + + t.Run("workflow created allows any step update", func(t *testing.T) { + t.Parallel() + step := &model.Step{State: model.StatusPending} + assert.NoError(t, checkWorkflowAllowsStepUpdate(model.StatusCreated, step, rpc.StepState{})) + }) + + t.Run("workflow finished allows terminal step update", func(t *testing.T) { + t.Parallel() + step := &model.Step{State: model.StatusRunning} + // Step exits with code 0 → CalcStepStatus produces StatusSuccess (terminal) + state := rpc.StepState{Exited: true, ExitCode: 0} + assert.NoError(t, checkWorkflowAllowsStepUpdate(model.StatusSuccess, step, state)) + }) + + t.Run("workflow finished allows failed step update", func(t *testing.T) { + t.Parallel() + step := &model.Step{State: model.StatusRunning} + state := rpc.StepState{Exited: true, ExitCode: 1} + assert.NoError(t, checkWorkflowAllowsStepUpdate(model.StatusFailure, step, state)) + }) + + t.Run("workflow finished allows canceled step update", func(t *testing.T) { + t.Parallel() + step := &model.Step{State: model.StatusRunning} + state := rpc.StepState{Canceled: true} + assert.NoError(t, checkWorkflowAllowsStepUpdate(model.StatusKilled, step, state)) + }) + + t.Run("workflow finished allows skipped step update", func(t *testing.T) { + t.Parallel() + step := &model.Step{State: model.StatusPending} + state := rpc.StepState{Skipped: true} + assert.NoError(t, checkWorkflowAllowsStepUpdate(model.StatusSuccess, step, state)) + }) + + t.Run("workflow finished rejects non-terminal step update", func(t *testing.T) { + t.Parallel() + step := &model.Step{State: model.StatusRunning} + // No exit, no cancel → step stays Running (non-terminal) + state := rpc.StepState{} + assert.ErrorIs(t, checkWorkflowAllowsStepUpdate(model.StatusSuccess, step, state), ErrAgentIllegalWorkflowReRunStateChange) + }) + + t.Run("workflow killed rejects non-terminal step update", func(t *testing.T) { + t.Parallel() + step := &model.Step{State: model.StatusRunning} + state := rpc.StepState{} + assert.ErrorIs(t, checkWorkflowAllowsStepUpdate(model.StatusKilled, step, state), ErrAgentIllegalWorkflowReRunStateChange) + }) + + t.Run("workflow blocked rejects non-terminal step update", func(t *testing.T) { + t.Parallel() + step := &model.Step{State: model.StatusRunning} + state := rpc.StepState{} + assert.ErrorIs(t, checkWorkflowAllowsStepUpdate(model.StatusBlocked, step, state), ErrAgentIllegalWorkflowReRunStateChange) + }) + + t.Run("workflow finished rejects pending-to-running transition", func(t *testing.T) { + t.Parallel() + step := &model.Step{State: model.StatusPending} + // No skip, no exit → CalcStepStatus produces Running (non-terminal) + state := rpc.StepState{Started: 100} + assert.ErrorIs(t, checkWorkflowAllowsStepUpdate(model.StatusSuccess, step, state), ErrAgentIllegalWorkflowReRunStateChange) + }) +} + +func TestCheckWorkflowState(t *testing.T) { + t.Parallel() + + t.Run("allowed states", func(t *testing.T) { + t.Parallel() + for _, s := range []model.StatusValue{ + model.StatusCreated, + model.StatusPending, + model.StatusRunning, + } { + t.Run(string(s), func(t *testing.T) { + t.Parallel() + assert.NoError(t, checkWorkflowState(s)) + }) + } + }) + + t.Run("blocked rejects", func(t *testing.T) { + t.Parallel() + assert.ErrorIs(t, checkWorkflowState(model.StatusBlocked), ErrAgentIllegalWorkflowRun) + }) + + t.Run("terminal states reject", func(t *testing.T) { + t.Parallel() + for _, s := range []model.StatusValue{ + model.StatusSuccess, + model.StatusFailure, + model.StatusKilled, + model.StatusError, + model.StatusSkipped, + model.StatusCanceled, + model.StatusDeclined, + } { + t.Run(string(s), func(t *testing.T) { + t.Parallel() + assert.ErrorIs(t, checkWorkflowState(s), ErrAgentIllegalWorkflowReRunStateChange) + }) + } + }) +} + +func TestIsActiveState(t *testing.T) { + t.Parallel() + + active := []model.StatusValue{model.StatusCreated, model.StatusPending, model.StatusRunning} + inactive := []model.StatusValue{ + model.StatusSuccess, model.StatusFailure, model.StatusKilled, + model.StatusBlocked, model.StatusCanceled, model.StatusSkipped, + model.StatusError, model.StatusDeclined, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { + for _, s := range active { + t.Run(fmt.Sprintf("%s is active", s), func(t *testing.T) { t.Parallel() - - pipeline := &model.Pipeline{Status: tt.status} - err := checkPipelineState(pipeline) - - if tt.expectNil { - assert.NoError(t, err) - } else { - assert.ErrorIs(t, err, tt.wantErr) - } + assert.True(t, isActiveState(s)) + }) + } + for _, s := range inactive { + t.Run(fmt.Sprintf("%s is not active", s), func(t *testing.T) { + t.Parallel() + assert.False(t, isActiveState(s)) }) } } -func TestCheckWorkflowStepStates(t *testing.T) { +func TestIsDoneState(t *testing.T) { t.Parallel() - t.Run("workflow only", func(t *testing.T) { - t.Parallel() + done := []model.StatusValue{ + model.StatusSuccess, model.StatusFailure, model.StatusKilled, + model.StatusCanceled, model.StatusSkipped, model.StatusError, + model.StatusDeclined, + } + notDone := []model.StatusValue{ + model.StatusCreated, model.StatusPending, model.StatusRunning, + model.StatusBlocked, + } - tests := []struct { - name string - state model.StatusValue - wantErr error - }{ - {"created allows", model.StatusCreated, nil}, - {"pending allows", model.StatusPending, nil}, - {"running allows", model.StatusRunning, nil}, - {"blocked rejects", model.StatusBlocked, ErrAgentIllegalWorkflowRun}, - {"success rejects", model.StatusSuccess, ErrAgentIllegalWorkflowReRunStateChange}, - {"failure rejects", model.StatusFailure, ErrAgentIllegalWorkflowReRunStateChange}, - {"killed rejects", model.StatusKilled, ErrAgentIllegalWorkflowReRunStateChange}, - {"error rejects", model.StatusError, ErrAgentIllegalWorkflowReRunStateChange}, - {"skipped rejects", model.StatusSkipped, ErrAgentIllegalWorkflowReRunStateChange}, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - - workflow := &model.Workflow{State: tt.state} - err := checkWorkflowStepStates(workflow, nil) - - if tt.wantErr == nil { - assert.NoError(t, err) - } else { - assert.ErrorIs(t, err, tt.wantErr) - } - }) - } - }) - - t.Run("step only (nil workflow)", func(t *testing.T) { - t.Parallel() - - tests := []struct { - name string - state model.StatusValue - wantErr error - }{ - {"created allows", model.StatusCreated, nil}, - {"pending allows", model.StatusPending, nil}, - {"running allows", model.StatusRunning, nil}, - {"blocked rejects", model.StatusBlocked, ErrAgentIllegalStepRun}, - {"success rejects", model.StatusSuccess, ErrAgentIllegalStepReRunStateChange}, - {"failure rejects", model.StatusFailure, ErrAgentIllegalStepReRunStateChange}, - {"killed rejects", model.StatusKilled, ErrAgentIllegalStepReRunStateChange}, - {"error rejects", model.StatusError, ErrAgentIllegalStepReRunStateChange}, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - - step := &model.Step{State: tt.state} - err := checkWorkflowStepStates(nil, step) - - if tt.wantErr == nil { - assert.NoError(t, err) - } else { - assert.ErrorIs(t, err, tt.wantErr) - } - }) - } - }) - - t.Run("nil workflow and nil step", func(t *testing.T) { - t.Parallel() - - assert.NoError(t, checkWorkflowStepStates(nil, nil)) - }) - - t.Run("workflow running, step running", func(t *testing.T) { - t.Parallel() - - workflow := &model.Workflow{State: model.StatusRunning} - step := &model.Step{State: model.StatusRunning} - assert.NoError(t, checkWorkflowStepStates(workflow, step)) - }) - - t.Run("workflow running, step finished", func(t *testing.T) { - t.Parallel() - - workflow := &model.Workflow{State: model.StatusRunning} - step := &model.Step{State: model.StatusSuccess} - err := checkWorkflowStepStates(workflow, step) - assert.ErrorIs(t, err, ErrAgentIllegalStepReRunStateChange) - // should not contain workflow error - assert.False(t, errors.Is(err, ErrAgentIllegalWorkflowReRunStateChange)) - }) - - t.Run("workflow running, step blocked", func(t *testing.T) { - t.Parallel() - - workflow := &model.Workflow{State: model.StatusRunning} - step := &model.Step{State: model.StatusBlocked} - err := checkWorkflowStepStates(workflow, step) - assert.ErrorIs(t, err, ErrAgentIllegalStepRun) - }) - - t.Run("both finished - joined errors", func(t *testing.T) { - t.Parallel() - - workflow := &model.Workflow{State: model.StatusSuccess} - step := &model.Step{State: model.StatusSuccess} - err := checkWorkflowStepStates(workflow, step) - assert.ErrorIs(t, err, ErrAgentIllegalWorkflowReRunStateChange) - assert.ErrorIs(t, err, ErrAgentIllegalStepReRunStateChange) - }) - - t.Run("both blocked - joined errors", func(t *testing.T) { - t.Parallel() - - workflow := &model.Workflow{State: model.StatusBlocked} - step := &model.Step{State: model.StatusBlocked} - err := checkWorkflowStepStates(workflow, step) - assert.ErrorIs(t, err, ErrAgentIllegalWorkflowRun) - assert.ErrorIs(t, err, ErrAgentIllegalStepRun) - }) - - t.Run("workflow finished, step blocked - joined errors", func(t *testing.T) { - t.Parallel() - - workflow := &model.Workflow{State: model.StatusKilled} - step := &model.Step{State: model.StatusBlocked} - err := checkWorkflowStepStates(workflow, step) - assert.ErrorIs(t, err, ErrAgentIllegalWorkflowReRunStateChange) - assert.ErrorIs(t, err, ErrAgentIllegalStepRun) - }) - - t.Run("workflow finished (failure), step finished (failure) - joined errors", func(t *testing.T) { - t.Parallel() - - workflow := &model.Workflow{State: model.StatusFailure} - step := &model.Step{State: model.StatusFailure} - err := checkWorkflowStepStates(workflow, step) - assert.ErrorIs(t, err, ErrAgentIllegalWorkflowReRunStateChange) - assert.ErrorIs(t, err, ErrAgentIllegalStepReRunStateChange) - }) + for _, s := range done { + t.Run(fmt.Sprintf("%s is done", s), func(t *testing.T) { + t.Parallel() + assert.True(t, isDoneState(s)) + }) + } + for _, s := range notDone { + t.Run(fmt.Sprintf("%s is not done", s), func(t *testing.T) { + t.Parallel() + assert.False(t, isDoneState(s)) + }) + } } -// AllowAppendingLogs — updated for the new (pipeline, step) signature -// -// New logic: -// Allow if step.State == Running (step is actively running) -// Allow if pipeline.Status == Running (pipeline still running, step may -// have just finished but pipeline hasn't caught up yet) -// Allow if pipeline.Finished is within the last logStreamDelayAllowed -// (drain window after a server restart / network blip) -// Reject otherwise. - func TestAllowAppendingLogs(t *testing.T) { t.Parallel() - // recentFinish is a pipeline.Finished timestamp just 30 seconds ago — - // well within the 5-minute drain window. recentFinish := time.Now().Add(-30 * time.Second).Unix() - - // staleFinish is a pipeline.Finished timestamp 10 minutes ago — - // outside the drain window. staleFinish := time.Now().Add(-10 * time.Minute).Unix() - tests := []struct { - name string - pipelineStatus model.StatusValue - pipelineFinish int64 - stepState model.StatusValue - wantErr error - }{ - // --- step is running: always allowed regardless of pipeline state ---- - { - name: "step running, pipeline running → allow", - pipelineStatus: model.StatusRunning, - stepState: model.StatusRunning, - }, - { - name: "step running, pipeline success → allow (step takes priority)", - pipelineStatus: model.StatusSuccess, - pipelineFinish: staleFinish, - stepState: model.StatusRunning, - }, - { - name: "step running, pipeline failure → allow", - pipelineStatus: model.StatusFailure, - pipelineFinish: staleFinish, - stepState: model.StatusRunning, - }, - { - name: "step running, pipeline killed → allow", - pipelineStatus: model.StatusKilled, - pipelineFinish: staleFinish, - stepState: model.StatusRunning, - }, + // Step running always allows logs, regardless of pipeline state or age. + t.Run("step running always allowed", func(t *testing.T) { + t.Parallel() - // --- pipeline still running: allow even if step finished ------------ - { - name: "step success, pipeline still running → allow", - pipelineStatus: model.StatusRunning, - stepState: model.StatusSuccess, - }, - { - name: "step failure, pipeline still running → allow", - pipelineStatus: model.StatusRunning, - stepState: model.StatusFailure, - }, - { - name: "step pending, pipeline still running → allow", - pipelineStatus: model.StatusRunning, - stepState: model.StatusPending, - }, - { - name: "step killed, pipeline still running → allow", - pipelineStatus: model.StatusRunning, - stepState: model.StatusKilled, - }, + for _, tc := range []struct { + name string + status model.StatusValue + finish int64 + }{ + {"pipeline running", model.StatusRunning, 0}, + {"pipeline success stale", model.StatusSuccess, staleFinish}, + {"pipeline failure stale", model.StatusFailure, staleFinish}, + {"pipeline killed stale", model.StatusKilled, staleFinish}, + } { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + p := &model.Pipeline{Status: tc.status, Finished: tc.finish} + assert.NoError(t, allowAppendingLogs(p, &model.Step{State: model.StatusRunning})) + }) + } + }) - // --- pipeline finished recently: drain window allows logs ----------- - { - name: "step success, pipeline finished recently → allow (drain window)", - pipelineStatus: model.StatusSuccess, - pipelineFinish: recentFinish, - stepState: model.StatusSuccess, - }, - { - name: "step failure, pipeline failed recently → allow (drain window)", - pipelineStatus: model.StatusFailure, - pipelineFinish: recentFinish, - stepState: model.StatusFailure, - }, - { - name: "step pending, pipeline killed recently → allow (drain window)", - pipelineStatus: model.StatusKilled, - pipelineFinish: recentFinish, - stepState: model.StatusPending, - }, + // Pipeline running allows logs for any step state. + t.Run("pipeline running any step allowed", func(t *testing.T) { + t.Parallel() - // --- pipeline finished and drain window expired: reject ------------- - { - name: "step success, pipeline success, stale finish → reject", - pipelineStatus: model.StatusSuccess, - pipelineFinish: staleFinish, - stepState: model.StatusSuccess, - wantErr: ErrAgentIllegalLogStreaming, - }, - { - name: "step failure, pipeline failure, stale finish → reject", - pipelineStatus: model.StatusFailure, - pipelineFinish: staleFinish, - stepState: model.StatusFailure, - wantErr: ErrAgentIllegalLogStreaming, - }, - { - name: "step pending, pipeline killed, stale finish → reject", - pipelineStatus: model.StatusKilled, - pipelineFinish: staleFinish, - stepState: model.StatusPending, - wantErr: ErrAgentIllegalLogStreaming, - }, - { - name: "step created, pipeline error, stale finish → reject", - pipelineStatus: model.StatusError, - pipelineFinish: staleFinish, - stepState: model.StatusCreated, - wantErr: ErrAgentIllegalLogStreaming, - }, + for _, ss := range []model.StatusValue{ + model.StatusSuccess, model.StatusFailure, model.StatusPending, model.StatusKilled, + } { + t.Run(string(ss), func(t *testing.T) { + t.Parallel() + p := &model.Pipeline{Status: model.StatusRunning} + assert.NoError(t, allowAppendingLogs(p, &model.Step{State: ss})) + }) + } + }) - // --- zero Finished timestamp (never recorded): outside drain window - - { - name: "step success, pipeline success, Finished=0 → reject", - pipelineStatus: model.StatusSuccess, - pipelineFinish: 0, - stepState: model.StatusSuccess, - wantErr: ErrAgentIllegalLogStreaming, - }, - } + // Recent finish → drain window allows logs. + t.Run("recent finish drain allowed", func(t *testing.T) { + t.Parallel() - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - t.Parallel() + for _, tc := range []struct { + pStatus model.StatusValue + sState model.StatusValue + }{ + {model.StatusSuccess, model.StatusSuccess}, + {model.StatusFailure, model.StatusFailure}, + {model.StatusKilled, model.StatusPending}, + } { + t.Run(fmt.Sprintf("%s/%s", tc.pStatus, tc.sState), func(t *testing.T) { + t.Parallel() + p := &model.Pipeline{Status: tc.pStatus, Finished: recentFinish} + assert.NoError(t, allowAppendingLogs(p, &model.Step{State: tc.sState})) + }) + } + }) - pipeline := &model.Pipeline{ - Status: tt.pipelineStatus, - Finished: tt.pipelineFinish, - } - step := &model.Step{State: tt.stepState} + // Stale finish → drain window expired → reject. + t.Run("stale finish drain rejected", func(t *testing.T) { + t.Parallel() - err := allowAppendingLogs(pipeline, step) - - if tt.wantErr == nil { - assert.NoError(t, err) - } else { - assert.ErrorIs(t, err, tt.wantErr) - } - }) - } + for _, tc := range []struct { + pStatus model.StatusValue + sState model.StatusValue + finish int64 + }{ + {model.StatusSuccess, model.StatusSuccess, staleFinish}, + {model.StatusFailure, model.StatusFailure, staleFinish}, + {model.StatusKilled, model.StatusPending, staleFinish}, + {model.StatusError, model.StatusCreated, staleFinish}, + {model.StatusSuccess, model.StatusSuccess, 0}, // zero = never recorded + } { + t.Run(fmt.Sprintf("%s/%s/fin=%d", tc.pStatus, tc.sState, tc.finish), func(t *testing.T) { + t.Parallel() + p := &model.Pipeline{Status: tc.pStatus, Finished: tc.finish} + assert.ErrorIs(t, allowAppendingLogs(p, &model.Step{State: tc.sState}), ErrAgentIllegalLogStreaming) + }) + } + }) } -// TestAllowAppendingLogsDrainBoundary checks the exact boundary of the -// 5-minute drain window to guard against off-by-one errors. +// TestAllowAppendingLogsDrainBoundary guards the exact edge of the 5-minute +// drain window against off-by-one errors. func TestAllowAppendingLogsDrainBoundary(t *testing.T) { t.Parallel() step := &model.Step{State: model.StatusSuccess} - t.Run("finished exactly at drain window boundary is allowed", func(t *testing.T) { + t.Run("just inside drain window allowed", func(t *testing.T) { t.Parallel() - - // Finished just barely inside the window (1 second of headroom). - finishedAt := time.Now().Add(-(logStreamDelayAllowed - time.Second)).Unix() - pipeline := &model.Pipeline{Status: model.StatusSuccess, Finished: finishedAt} - - assert.NoError(t, allowAppendingLogs(pipeline, step)) + p := &model.Pipeline{ + Status: model.StatusSuccess, + Finished: time.Now().Add(-(logStreamDelayAllowed - time.Second)).Unix(), + } + assert.NoError(t, allowAppendingLogs(p, step)) }) - t.Run("finished just outside drain window is rejected", func(t *testing.T) { + t.Run("just outside drain window rejected", func(t *testing.T) { t.Parallel() - - // Finished 1 second past the allowed window. - finishedAt := time.Now().Add(-(logStreamDelayAllowed + time.Second)).Unix() - pipeline := &model.Pipeline{Status: model.StatusSuccess, Finished: finishedAt} - - assert.ErrorIs(t, allowAppendingLogs(pipeline, step), ErrAgentIllegalLogStreaming) + p := &model.Pipeline{ + Status: model.StatusSuccess, + Finished: time.Now().Add(-(logStreamDelayAllowed + time.Second)).Unix(), + } + assert.ErrorIs(t, allowAppendingLogs(p, step), ErrAgentIllegalLogStreaming) }) }