mirror of
https://github.com/fluxcd/flagger.git
synced 2026-02-27 16:23:53 +00:00
108 lines
2.5 KiB
Go
108 lines
2.5 KiB
Go
package loadtester
|
|
|
|
import (
|
|
"context"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"go.uber.org/zap"
|
|
"hash/fnv"
|
|
"os/exec"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
type TaskRunner struct {
|
|
logger *zap.SugaredLogger
|
|
timeout time.Duration
|
|
todoTasks *sync.Map
|
|
runningTasks *sync.Map
|
|
totalExecs uint64
|
|
logCmdOutput bool
|
|
}
|
|
|
|
type Task struct {
|
|
Canary string
|
|
Command string
|
|
}
|
|
|
|
func (t Task) Hash() string {
|
|
fnvHash := fnv.New32()
|
|
fnvBytes := fnvHash.Sum([]byte(t.Canary + t.Command))
|
|
return hex.EncodeToString(fnvBytes[:])
|
|
}
|
|
|
|
func NewTaskRunner(logger *zap.SugaredLogger, timeout time.Duration, logCmdOutput bool) *TaskRunner {
|
|
return &TaskRunner{
|
|
logger: logger,
|
|
todoTasks: new(sync.Map),
|
|
runningTasks: new(sync.Map),
|
|
timeout: timeout,
|
|
logCmdOutput: logCmdOutput,
|
|
}
|
|
}
|
|
|
|
func (tr *TaskRunner) Add(task Task) {
|
|
tr.todoTasks.Store(task.Hash(), task)
|
|
}
|
|
|
|
func (tr *TaskRunner) GetTotalExecs() uint64 {
|
|
return atomic.LoadUint64(&tr.totalExecs)
|
|
}
|
|
|
|
func (tr *TaskRunner) runAll() {
|
|
tr.todoTasks.Range(func(key interface{}, value interface{}) bool {
|
|
task := value.(Task)
|
|
go func(t Task) {
|
|
// remove task from the to do list
|
|
tr.todoTasks.Delete(t.Hash())
|
|
|
|
// check if task is already running, if not run the task's command
|
|
if _, exists := tr.runningTasks.Load(t.Hash()); !exists {
|
|
// save the task in the running list
|
|
tr.runningTasks.Store(t.Hash(), t)
|
|
|
|
// create timeout context
|
|
ctx, cancel := context.WithTimeout(context.Background(), tr.timeout)
|
|
defer cancel()
|
|
|
|
// increment the total exec counter
|
|
atomic.AddUint64(&tr.totalExecs, 1)
|
|
|
|
tr.logger.With("canary", t.Canary).Infof("command starting %s", t.Command)
|
|
cmd := exec.CommandContext(ctx, "sh", "-c", t.Command)
|
|
|
|
// execute task
|
|
out, err := cmd.CombinedOutput()
|
|
if err != nil {
|
|
tr.logger.With("canary", t.Canary).Errorf("command failed %s %v %s", t.Command, err, out)
|
|
} else {
|
|
if tr.logCmdOutput {
|
|
fmt.Printf("%s\n", out)
|
|
}
|
|
tr.logger.With("canary", t.Canary).Infof("command finished %s", t.Command)
|
|
}
|
|
|
|
// remove task from the running list
|
|
tr.runningTasks.Delete(t.Hash())
|
|
} else {
|
|
tr.logger.With("canary", t.Canary).Infof("command skipped %s is already running", t.Command)
|
|
}
|
|
}(task)
|
|
return true
|
|
})
|
|
}
|
|
|
|
func (tr *TaskRunner) Start(interval time.Duration, stopCh <-chan struct{}) {
|
|
tickChan := time.NewTicker(interval).C
|
|
for {
|
|
select {
|
|
case <-tickChan:
|
|
tr.runAll()
|
|
case <-stopCh:
|
|
tr.logger.Info("shutting down the task runner")
|
|
return
|
|
}
|
|
}
|
|
}
|