mirror of
https://github.com/fluxcd/flagger.git
synced 2026-02-27 00:03:49 +00:00
83 lines
1.9 KiB
Go
83 lines
1.9 KiB
Go
package loadtester
|
|
|
|
import (
|
|
"context"
|
|
"go.uber.org/zap"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
type TaskRunner struct {
|
|
logger *zap.SugaredLogger
|
|
timeout time.Duration
|
|
todoTasks *sync.Map
|
|
runningTasks *sync.Map
|
|
totalExecs uint64
|
|
logCmdOutput bool
|
|
}
|
|
|
|
func NewTaskRunner(logger *zap.SugaredLogger, timeout time.Duration) *TaskRunner {
|
|
return &TaskRunner{
|
|
logger: logger,
|
|
todoTasks: new(sync.Map),
|
|
runningTasks: new(sync.Map),
|
|
timeout: timeout,
|
|
}
|
|
}
|
|
|
|
func (tr *TaskRunner) Add(task Task) {
|
|
tr.todoTasks.Store(task.Hash(), task)
|
|
}
|
|
|
|
func (tr *TaskRunner) GetTotalExecs() uint64 {
|
|
return atomic.LoadUint64(&tr.totalExecs)
|
|
}
|
|
|
|
func (tr *TaskRunner) runAll() {
|
|
tr.todoTasks.Range(func(key interface{}, value interface{}) bool {
|
|
task := value.(Task)
|
|
go func(t Task) {
|
|
// remove task from the to do list
|
|
tr.todoTasks.Delete(t.Hash())
|
|
|
|
// check if task is already running, if not run the task's command
|
|
if _, exists := tr.runningTasks.Load(t.Hash()); !exists {
|
|
// save the task in the running list
|
|
tr.runningTasks.Store(t.Hash(), t)
|
|
|
|
// create timeout context
|
|
ctx, cancel := context.WithTimeout(context.Background(), tr.timeout)
|
|
defer cancel()
|
|
|
|
// increment the total exec counter
|
|
atomic.AddUint64(&tr.totalExecs, 1)
|
|
|
|
tr.logger.With("canary", t.Canary()).Infof("task starting %s", t)
|
|
|
|
// run task with the timeout context
|
|
t.Run(ctx)
|
|
|
|
// remove task from the running list
|
|
tr.runningTasks.Delete(t.Hash())
|
|
} else {
|
|
tr.logger.With("canary", t.Canary()).Infof("command skipped %s is already running", t)
|
|
}
|
|
}(task)
|
|
return true
|
|
})
|
|
}
|
|
|
|
func (tr *TaskRunner) Start(interval time.Duration, stopCh <-chan struct{}) {
|
|
tickChan := time.NewTicker(interval).C
|
|
for {
|
|
select {
|
|
case <-tickChan:
|
|
tr.runAll()
|
|
case <-stopCh:
|
|
tr.logger.Info("shutting down the task runner")
|
|
return
|
|
}
|
|
}
|
|
}
|