Files
woodpecker/pipeline/runtime/executor.go
Harri Avellan 25842498c6 Prevent leaking goroutines on cancelled steps (#6186)
I've been debugging case where Kubernetes backend agents occasionally are OOMKilled. During the investigation noticed an increasing amount of goroutines on the agent processes which turned out to be multiple goroutines hanging on:
```
runtime.gopark(proc.go:461)
runtime.chanrecv(chan.go:667)
runtime.chanrecv1(chan.go:509)
go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/kubernetes.(*kube).WaitStep(kubernetes.go:325)
go.woodpecker-ci.org/woodpecker/v3/pipeline/runtime.(*Runtime).exec(executor.go:261)
go.woodpecker-ci.org/woodpecker/v3/pipeline/runtime.(*Runtime).execAll.func1.1(executor.go:174)
go.woodpecker-ci.org/woodpecker/v3/pipeline/runtime.(*Runtime).execAll.func1.2(executor.go:200)
runtime.goexit(asm_arm64.s:1268)
go.woodpecker-ci.org/woodpecker/v3/pipeline/runtime.(*Runtime).execAll.func1(executor.go:199)
```

My analysis is that on cancel nothing fires the `finished` channel, as pod is deleted i.e. not updated (there's a registed handler for pod update events). There was already a comment about adding cancellation handler for ctx.Done, so I think this is the proper way (instead of adding event handler for pod deletion).
2026-03-02 20:14:56 +00:00

301 lines
7.4 KiB
Go

// Copyright 2023 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package runtime
import (
"context"
"errors"
"strings"
"sync"
"time"
"golang.org/x/sync/errgroup"
backend "go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types"
pipeline_errors "go.woodpecker-ci.org/woodpecker/v3/pipeline/errors"
"go.woodpecker-ci.org/woodpecker/v3/pipeline/frontend/metadata"
"go.woodpecker-ci.org/woodpecker/v3/pipeline/state"
)
// Run starts the execution of a workflow and waits for it to complete.
func (r *Runtime) Run(runnerCtx context.Context) error {
logger := r.MakeLogger()
logger.Debug().Msgf("executing %d stages, in order of:", len(r.spec.Stages))
for stagePos, stage := range r.spec.Stages {
stepNames := []string{}
for _, step := range stage.Steps {
stepNames = append(stepNames, step.Name)
}
logger.Debug().
Int("StagePos", stagePos).
Str("Steps", strings.Join(stepNames, ",")).
Msg("stage")
}
defer func() {
ctx := runnerCtx //nolint:contextcheck
if ctx.Err() != nil {
ctx = GetShutdownCtx()
}
if err := r.engine.DestroyWorkflow(ctx, r.spec, r.taskUUID); err != nil {
logger.Error().Err(err).Msg("could not destroy engine")
}
}()
r.started = time.Now().Unix()
if err := r.engine.SetupWorkflow(runnerCtx, r.spec, r.taskUUID); err != nil {
var stepErr *pipeline_errors.ErrInvalidWorkflowSetup
if errors.As(err, &stepErr) {
state := new(state.State)
state.Pipeline.Step = stepErr.Step
state.Pipeline.Error = stepErr.Err
state.Process = backend.State{
Error: stepErr.Err,
Exited: true,
ExitCode: 1,
}
// Trace the error if we have a tracer
if r.tracer != nil {
if err := r.tracer.Trace(state); err != nil {
logger.Error().Err(err).Msg("failed to trace step error")
}
}
}
return err
}
for _, stage := range r.spec.Stages {
select {
case <-r.ctx.Done():
return pipeline_errors.ErrCancel
case err := <-r.execAll(runnerCtx, stage.Steps):
if err != nil {
r.err = err
}
}
}
return r.err
}
// Updates the current status of a step.
// If processState is nil, we assume the step did not start.
// If step did not started and err exists, it's a step start issue and step is done.
func (r *Runtime) traceStep(processState *backend.State, err error, step *backend.Step) error {
if r.tracer == nil {
// no tracer nothing to trace :)
return nil
}
state := new(state.State)
state.Pipeline.Started = r.started
state.Pipeline.Step = step
state.Pipeline.Error = r.err
// We have an error while starting the step
if processState == nil && err != nil {
state.Process = backend.State{
Error: err,
Exited: true,
OOMKilled: false,
}
} else if processState != nil {
state.Process = *processState
}
if traceErr := r.tracer.Trace(state); traceErr != nil {
return traceErr
}
return err
}
// Executes a set of parallel steps.
func (r *Runtime) execAll(runnerCtx context.Context, steps []*backend.Step) <-chan error {
var g errgroup.Group
done := make(chan error)
logger := r.MakeLogger()
for _, step := range steps {
// Required since otherwise the loop variable
// will be captured by the function. This will
// recreate the step "variable"
step := step
g.Go(func() error {
// Case the pipeline was already complete.
logger.Debug().
Str("step", step.Name).
Msg("prepare")
switch {
case r.err != nil && !step.OnFailure:
logger.Debug().
Str("step", step.Name).
Err(r.err).
Msgf("skipped due to OnFailure=%t", step.OnFailure)
return nil
case r.err == nil && !step.OnSuccess:
logger.Debug().
Str("step", step.Name).
Msgf("skipped due to OnSuccess=%t", step.OnSuccess)
return nil
}
// Trace started.
err := r.traceStep(nil, nil, step)
if err != nil {
return err
}
// add compatibility for drone-ci plugins
metadata.SetDroneEnviron(step.Environment)
logger.Debug().
Str("step", step.Name).
Msg("executing")
// setup exec func in a way it can be detached if needed
// wg will signal once
execAndTrace := func(wg *sync.WaitGroup) error {
processState, err := r.exec(runnerCtx, step, wg)
logger.Debug().
Str("step", step.Name).
Msg("complete")
// normalize context cancel error
if errors.Is(err, context.Canceled) {
err = pipeline_errors.ErrCancel
}
// Return the error after tracing it.
err = r.traceStep(processState, err, step)
if err != nil && step.Failure == metadata.FailureIgnore {
return nil
}
return err
}
// Report all errors until the setup happened.
// Afterwards errors will be dropped.
if step.Detached {
var wg sync.WaitGroup
wg.Add(1)
var setupErr error
go func() {
setupErr = execAndTrace(&wg)
}()
wg.Wait()
return setupErr
}
// run blocking
return execAndTrace(nil)
})
}
go func() {
done <- g.Wait()
close(done)
}()
return done
}
// Executes the step and returns the state and error.
func (r *Runtime) exec(runnerCtx context.Context, step *backend.Step, setupWg *sync.WaitGroup) (*backend.State, error) {
defer func() {
if setupWg != nil {
setupWg.Done()
}
}()
if err := r.engine.StartStep(r.ctx, step, r.taskUUID); err != nil { //nolint:contextcheck
return nil, err
}
startTime := time.Now().Unix()
logger := r.MakeLogger()
var wg sync.WaitGroup
if r.logger != nil {
rc, err := r.engine.TailStep(r.ctx, step, r.taskUUID) //nolint:contextcheck
if err != nil {
return nil, err
}
wg.Add(1)
go func() {
defer wg.Done()
if err := r.logger(step, rc); err != nil {
logger.Error().Err(err).Msg("process logging failed")
}
_ = rc.Close()
}()
}
// nothing else to block for detached process.
if setupWg != nil {
setupWg.Done()
// set to nil so the setupWg.Done in defer does not call it a second time
setupWg = nil
}
// We wait until all data was logged. (Needed for some backends like local as WaitStep kills the log stream)
wg.Wait()
waitState, err := r.engine.WaitStep(r.ctx, step, r.taskUUID) //nolint:contextcheck
if err != nil {
if errors.Is(err, context.Canceled) {
if waitState == nil {
waitState = &backend.State{}
}
waitState.Error = pipeline_errors.ErrCancel
} else {
return nil, err
}
}
// It is important to use the runnerCtx here because
// in case the workflow was canceled we still have the docker daemon to stop the container.
if err := r.engine.DestroyStep(runnerCtx, step, r.taskUUID); err != nil {
return nil, err
}
// we update with our start time here
waitState.Started = startTime
// we handle cancel case
if ctxErr := r.ctx.Err(); ctxErr != nil && errors.Is(ctxErr, context.Canceled) {
waitState.Error = pipeline_errors.ErrCancel
}
if waitState.OOMKilled {
return waitState, &pipeline_errors.OomError{
UUID: step.UUID,
Code: waitState.ExitCode,
}
} else if waitState.ExitCode != 0 {
return waitState, &pipeline_errors.ExitError{
UUID: step.UUID,
Code: waitState.ExitCode,
}
}
return waitState, nil
}