mirror of
https://github.com/woodpecker-ci/woodpecker.git
synced 2026-02-13 21:00:00 +00:00
Merge branch 'origin/main' into 'next-release/main'
This commit is contained in:
@@ -16,6 +16,8 @@ package docker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
@@ -35,12 +37,15 @@ import (
|
||||
"github.com/moby/term"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/urfave/cli/v3"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
backend "go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types"
|
||||
"go.woodpecker-ci.org/woodpecker/v3/shared/httputil"
|
||||
"go.woodpecker-ci.org/woodpecker/v3/shared/utils"
|
||||
)
|
||||
|
||||
var containerKillTimeout = 5 // seconds
|
||||
|
||||
type docker struct {
|
||||
client client.APIClient
|
||||
info system.Info
|
||||
@@ -304,13 +309,24 @@ func (e *docker) DestroyStep(ctx context.Context, step *backend.Step, taskUUID s
|
||||
log.Trace().Str("taskUUID", taskUUID).Msgf("stop step %s", step.Name)
|
||||
|
||||
containerName := toContainerName(step)
|
||||
var stopErr error
|
||||
|
||||
if err := e.client.ContainerKill(ctx, containerName, "9"); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
|
||||
return err
|
||||
// we first signal to the container to stop ...
|
||||
if err := e.client.ContainerStop(ctx, containerName, container.StopOptions{
|
||||
Timeout: &containerKillTimeout,
|
||||
}); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
|
||||
// we do not return error yet as we try to kill it first
|
||||
stopErr = fmt.Errorf("could not stop container '%s': %w", step.Name, err)
|
||||
}
|
||||
|
||||
// ... and if stop does not work just force kill it
|
||||
if err := e.client.ContainerKill(ctx, containerName, "9"); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
|
||||
return errors.Join(stopErr, fmt.Errorf("could not kill container '%s': %w", step.Name, err))
|
||||
}
|
||||
|
||||
// now we clean up files left
|
||||
if err := e.client.ContainerRemove(ctx, containerName, removeOpts); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
|
||||
return err
|
||||
return fmt.Errorf("could not remove container '%s': %w", step.Name, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -319,17 +335,20 @@ func (e *docker) DestroyStep(ctx context.Context, step *backend.Step, taskUUID s
|
||||
func (e *docker) DestroyWorkflow(ctx context.Context, conf *backend.Config, taskUUID string) error {
|
||||
log.Trace().Str("taskUUID", taskUUID).Msgf("delete workflow environment")
|
||||
|
||||
errWG := errgroup.Group{}
|
||||
|
||||
for _, stage := range conf.Stages {
|
||||
for _, step := range stage.Steps {
|
||||
containerName := toContainerName(step)
|
||||
if err := e.client.ContainerKill(ctx, containerName, "9"); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
|
||||
log.Error().Err(err).Msgf("could not kill container '%s'", step.Name)
|
||||
}
|
||||
if err := e.client.ContainerRemove(ctx, containerName, removeOpts); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
|
||||
log.Error().Err(err).Msgf("could not remove container '%s'", step.Name)
|
||||
}
|
||||
errWG.Go(func() error {
|
||||
return e.DestroyStep(ctx, step, taskUUID)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
if err := errWG.Wait(); err != nil {
|
||||
log.Error().Err(err).Msgf("could not destroy all containers")
|
||||
}
|
||||
|
||||
if err := e.client.VolumeRemove(ctx, conf.Volume, true); err != nil {
|
||||
log.Error().Err(err).Msgf("could not remove volume '%s'", conf.Volume)
|
||||
}
|
||||
@@ -349,8 +368,13 @@ func isErrContainerNotFoundOrNotRunning(err error) bool {
|
||||
// Error response from daemon: Cannot kill container: ...: No such container: ...
|
||||
// Error response from daemon: Cannot kill container: ...: Container ... is not running"
|
||||
// Error response from podman daemon: can only kill running containers. ... is in state exited
|
||||
// Error response from daemon: removal of container ... is already in progress
|
||||
// Error: No such container: ...
|
||||
return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers"))
|
||||
return err != nil &&
|
||||
(strings.Contains(err.Error(), "No such container") ||
|
||||
strings.Contains(err.Error(), "is not running") ||
|
||||
strings.Contains(err.Error(), "can only kill running containers") ||
|
||||
(strings.Contains(err.Error(), "removal of container") && strings.Contains(err.Error(), "is already in progress")))
|
||||
}
|
||||
|
||||
// normalizeArchType converts the arch type reported by docker info into
|
||||
|
||||
@@ -200,7 +200,7 @@ func (e *dummy) DestroyStep(_ context.Context, step *backend.Step, taskUUID stri
|
||||
|
||||
_, exist := e.kv.Load("task_" + taskUUID)
|
||||
if !exist {
|
||||
return fmt.Errorf("expect env of workflow %s to exist but found none to destroy", taskUUID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// check state
|
||||
|
||||
@@ -47,9 +47,6 @@ func TestSmalPipelineDummyRun(t *testing.T) {
|
||||
|
||||
_, err = dummyEngine.WaitStep(ctx, step, nonExistWorkflowID)
|
||||
assert.Error(t, err)
|
||||
|
||||
err = dummyEngine.DestroyStep(ctx, step, nonExistWorkflowID)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("step exec successfully", func(t *testing.T) {
|
||||
|
||||
@@ -231,6 +231,9 @@ func (e *local) TailStep(_ context.Context, step *types.Step, taskUUID string) (
|
||||
func (e *local) DestroyStep(_ context.Context, step *types.Step, taskUUID string) error {
|
||||
state, err := e.getStepState(taskUUID, step.UUID)
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrStepStateNotFound) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -142,6 +142,7 @@ type Backend interface {
|
||||
// - Clean up step-specific resources (containers, processes)
|
||||
// - Close any open log streams
|
||||
// - Not affect other steps in the same or other workflows
|
||||
// - Must not fail if already invoked once
|
||||
//
|
||||
// Must be safe to call even if StartStep failed or the step was never started.
|
||||
// This function must be thread-safe for concurrent calls.
|
||||
|
||||
@@ -233,23 +233,43 @@ func (r *Runtime) execAll(runnerCtx context.Context, steps []*backend.Step) <-ch
|
||||
Str("step", step.Name).
|
||||
Msg("executing")
|
||||
|
||||
processState, err := r.exec(runnerCtx, step)
|
||||
// setup exec func in a way it can be detached if needed
|
||||
// wg will signal once
|
||||
execAndTrace := func(wg *sync.WaitGroup) error {
|
||||
processState, err := r.exec(runnerCtx, step, wg)
|
||||
|
||||
logger.Debug().
|
||||
Str("step", step.Name).
|
||||
Msg("complete")
|
||||
logger.Debug().
|
||||
Str("step", step.Name).
|
||||
Msg("complete")
|
||||
|
||||
// normalize context cancel error
|
||||
if errors.Is(err, context.Canceled) {
|
||||
err = ErrCancel
|
||||
// normalize context cancel error
|
||||
if errors.Is(err, context.Canceled) {
|
||||
err = ErrCancel
|
||||
}
|
||||
|
||||
// Return the error after tracing it.
|
||||
err = r.traceStep(processState, err, step)
|
||||
if err != nil && step.Failure == metadata.FailureIgnore {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Return the error after tracing it.
|
||||
err = r.traceStep(processState, err, step)
|
||||
if err != nil && step.Failure == metadata.FailureIgnore {
|
||||
return nil
|
||||
// we report all errors till setup happened
|
||||
// afterwards they just ged dropped
|
||||
if step.Detached {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
var setupErr error
|
||||
go func() {
|
||||
setupErr = execAndTrace(&wg)
|
||||
}()
|
||||
wg.Wait()
|
||||
return setupErr
|
||||
}
|
||||
return err
|
||||
|
||||
// run blocking
|
||||
return execAndTrace(nil)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -262,7 +282,13 @@ func (r *Runtime) execAll(runnerCtx context.Context, steps []*backend.Step) <-ch
|
||||
}
|
||||
|
||||
// Executes the step and returns the state and error.
|
||||
func (r *Runtime) exec(runnerCtx context.Context, step *backend.Step) (*backend.State, error) {
|
||||
func (r *Runtime) exec(runnerCtx context.Context, step *backend.Step, setupWg *sync.WaitGroup) (*backend.State, error) {
|
||||
defer func() {
|
||||
if setupWg != nil {
|
||||
setupWg.Done()
|
||||
}
|
||||
}()
|
||||
|
||||
if err := r.engine.StartStep(r.ctx, step, r.taskUUID); err != nil { //nolint:contextcheck
|
||||
return nil, err
|
||||
}
|
||||
@@ -287,9 +313,11 @@ func (r *Runtime) exec(runnerCtx context.Context, step *backend.Step) (*backend.
|
||||
}()
|
||||
}
|
||||
|
||||
// nothing else to do, this is a detached process.
|
||||
if step.Detached {
|
||||
return nil, nil
|
||||
// nothing else to block for detached process.
|
||||
if setupWg != nil {
|
||||
setupWg.Done()
|
||||
// set to nil so the setupWg.Done in defer does not call it a second time
|
||||
setupWg = nil
|
||||
}
|
||||
|
||||
// We wait until all data was logged. (Needed for some backends like local as WaitStep kills the log stream)
|
||||
|
||||
Reference in New Issue
Block a user