Files
act_runner/pkg/common/executor_test.go
silverwind 4fdf9ab904 Fix executor_test.go: ErrorIs arg order, wrong target, and data races
- TestNewParallelExecutorFailed: fix assert.ErrorIs argument order
- TestNewParallelExecutorCanceled: check for context.Canceled (not the
  executor error) since NewParallelExecutor returns ctx.Err() when
  context is cancelled; use atomic counter to fix data race
- TestNewParallelExecutor: use atomic counters to fix data race with
  concurrent goroutines

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 08:06:05 +01:00

156 lines
3.5 KiB
Go

package common
import (
"context"
"errors"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestNewWorkflow(t *testing.T) {
assert := assert.New(t)
ctx := context.Background()
// empty
emptyWorkflow := NewPipelineExecutor()
require.NoError(t, emptyWorkflow(ctx))
// error case
errorWorkflow := NewErrorExecutor(errors.New("test error"))
require.Error(t, errorWorkflow(ctx))
// multiple success case
runcount := 0
successWorkflow := NewPipelineExecutor(
func(_ context.Context) error {
runcount++
return nil
},
func(_ context.Context) error {
runcount++
return nil
})
require.NoError(t, successWorkflow(ctx))
assert.Equal(2, runcount)
}
func TestNewConditionalExecutor(t *testing.T) {
assert := assert.New(t)
ctx := context.Background()
trueCount := 0
falseCount := 0
err := NewConditionalExecutor(func(_ context.Context) bool {
return false
}, func(_ context.Context) error {
trueCount++
return nil
}, func(_ context.Context) error {
falseCount++
return nil
})(ctx)
require.NoError(t, err)
assert.Equal(0, trueCount)
assert.Equal(1, falseCount)
err = NewConditionalExecutor(func(_ context.Context) bool {
return true
}, func(_ context.Context) error {
trueCount++
return nil
}, func(_ context.Context) error {
falseCount++
return nil
})(ctx)
require.NoError(t, err)
assert.Equal(1, trueCount)
assert.Equal(1, falseCount)
}
func TestNewParallelExecutor(t *testing.T) {
assert := assert.New(t)
ctx := context.Background()
var count atomic.Int32
var activeCount atomic.Int32
var maxCount atomic.Int32
emptyWorkflow := NewPipelineExecutor(func(_ context.Context) error {
count.Add(1)
cur := activeCount.Add(1)
for {
old := maxCount.Load()
if cur <= old || maxCount.CompareAndSwap(old, cur) {
break
}
}
time.Sleep(2 * time.Second)
activeCount.Add(-1)
return nil
})
err := NewParallelExecutor(2, emptyWorkflow, emptyWorkflow, emptyWorkflow)(ctx)
assert.Equal(int32(3), count.Load(), "should run all 3 executors")
assert.Equal(int32(2), maxCount.Load(), "should run at most 2 executors in parallel")
require.NoError(t, err)
// Reset to test running the executor with 0 parallelism
count.Store(0)
activeCount.Store(0)
maxCount.Store(0)
errSingle := NewParallelExecutor(0, emptyWorkflow, emptyWorkflow, emptyWorkflow)(ctx)
assert.Equal(int32(3), count.Load(), "should run all 3 executors")
assert.Equal(int32(1), maxCount.Load(), "should run at most 1 executors in parallel")
require.NoError(t, errSingle)
}
func TestNewParallelExecutorFailed(t *testing.T) {
assert := assert.New(t)
ctx, cancel := context.WithCancel(context.Background())
cancel()
count := 0
errorWorkflow := NewPipelineExecutor(func(_ context.Context) error {
count++
return errors.New("fake error")
})
err := NewParallelExecutor(1, errorWorkflow)(ctx)
assert.Equal(1, count)
assert.ErrorIs(err, context.Canceled)
}
func TestNewParallelExecutorCanceled(t *testing.T) {
assert := assert.New(t)
ctx, cancel := context.WithCancel(context.Background())
cancel()
var count atomic.Int32
successWorkflow := NewPipelineExecutor(func(_ context.Context) error {
count.Add(1)
return nil
})
errorWorkflow := NewPipelineExecutor(func(_ context.Context) error {
count.Add(1)
return errors.New("fake error")
})
err := NewParallelExecutor(3, errorWorkflow, successWorkflow, successWorkflow)(ctx)
assert.Equal(int32(3), count.Load())
assert.ErrorIs(err, context.Canceled)
}