Files
kubevela/pkg/workflow/workflow.go
Jian.Li b66f2995a5 Fix: Bugs (as follows) (#2278)
* Fix: suspend step id

* Feat: add test cases

* Fix: reslove conflict

* Fix: reduce reconclie duration for test
2021-09-13 20:03:49 +08:00

309 lines
7.1 KiB
Go

/*
Copyright 2021 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workflow
import (
"context"
"fmt"
"github.com/pkg/errors"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/oam-dev/kubevela/apis/core.oam.dev/common"
oamcore "github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
"github.com/oam-dev/kubevela/pkg/controller/utils"
"github.com/oam-dev/kubevela/pkg/cue/model/value"
"github.com/oam-dev/kubevela/pkg/oam/util"
wfContext "github.com/oam-dev/kubevela/pkg/workflow/context"
wfTypes "github.com/oam-dev/kubevela/pkg/workflow/types"
)
type workflow struct {
app *oamcore.Application
cli client.Client
dagMode bool
}
// NewWorkflow returns a Workflow implementation.
func NewWorkflow(app *oamcore.Application, cli client.Client, mode common.WorkflowMode) Workflow {
dagMode := false
if mode == common.WorkflowModeDAG {
dagMode = true
}
return &workflow{
app: app,
cli: cli,
dagMode: dagMode,
}
}
// ExecuteSteps process workflow step in order.
func (w *workflow) ExecuteSteps(ctx context.Context, appRev *oamcore.ApplicationRevision, taskRunners []wfTypes.TaskRunner) (common.WorkflowState, error) {
revAndSpecHash, err := computeAppRevisionHash(appRev.Name, w.app)
if err != nil {
return common.WorkflowStateExecuting, err
}
if len(taskRunners) == 0 {
return common.WorkflowStateFinished, nil
}
if w.app.Status.Workflow == nil || w.app.Status.Workflow.AppRevision != revAndSpecHash {
w.app.Status.Workflow = &common.WorkflowStatus{
AppRevision: revAndSpecHash,
Mode: common.WorkflowModeStep,
}
if w.dagMode {
w.app.Status.Workflow.Mode = common.WorkflowModeDAG
}
// clean recorded resources info.
w.app.Status.Services = nil
w.app.Status.AppliedResources = nil
}
wfStatus := w.app.Status.Workflow
allTasksDone := w.allDone(taskRunners)
if wfStatus.Terminated {
return common.WorkflowStateTerminated, nil
}
if wfStatus.Suspend {
return common.WorkflowStateSuspended, nil
}
if allTasksDone {
return common.WorkflowStateFinished, nil
}
var (
wfCtx wfContext.Context
)
wfCtx, err = w.makeContext(w.app.Name)
if err != nil {
return common.WorkflowStateExecuting, err
}
e := &engine{
status: wfStatus,
dagMode: w.dagMode,
}
err = e.run(wfCtx, taskRunners)
if err != nil {
return common.WorkflowStateExecuting, err
}
if wfStatus.Terminated {
return common.WorkflowStateTerminated, nil
}
if wfStatus.Suspend {
return common.WorkflowStateSuspended, nil
}
if w.allDone(taskRunners) {
return common.WorkflowStateFinished, nil
}
return common.WorkflowStateExecuting, nil
}
func (w *workflow) allDone(taskRunners []wfTypes.TaskRunner) bool {
status := w.app.Status.Workflow
for _, t := range taskRunners {
done := false
for _, ss := range status.Steps {
if ss.Name == t.Name() {
done = ss.Phase == common.WorkflowStepPhaseSucceeded
break
}
}
if !done {
return false
}
}
return true
}
func (w *workflow) makeContext(appName string) (wfCtx wfContext.Context, err error) {
wfStatus := w.app.Status.Workflow
if wfStatus.ContextBackend != nil {
wfCtx, err = wfContext.LoadContext(w.cli, w.app.Namespace, appName)
if err != nil {
err = errors.WithMessage(err, "load context")
}
return
}
wfCtx, err = wfContext.NewEmptyContext(w.cli, w.app.Namespace, appName)
if err != nil {
err = errors.WithMessage(err, "new context")
return
}
if err = w.setMetadataToContext(wfCtx); err != nil {
return
}
if err = wfCtx.Commit(); err != nil {
return
}
wfStatus.ContextBackend = wfCtx.StoreRef()
return
}
func (w *workflow) setMetadataToContext(wfCtx wfContext.Context) error {
copierMeta := w.app.ObjectMeta.DeepCopy()
copierMeta.ManagedFields = nil
copierMeta.Finalizers = nil
copierMeta.OwnerReferences = nil
metadata, err := value.NewValue(string(util.MustJSONMarshal(copierMeta)), nil, "")
if err != nil {
return err
}
return wfCtx.SetVar(metadata, wfTypes.ContextKeyMetadata)
}
func (e *engine) runAsDAG(wfCtx wfContext.Context, taskRunners []wfTypes.TaskRunner) error {
var (
todoTasks []wfTypes.TaskRunner
pendingTasks []wfTypes.TaskRunner
)
done := true
for _, tRunner := range taskRunners {
ready := false
for _, ss := range e.status.Steps {
if ss.Name == tRunner.Name() {
ready = ss.Phase == common.WorkflowStepPhaseSucceeded
break
}
}
if !ready {
done = false
if tRunner.Pending(wfCtx) {
pendingTasks = append(pendingTasks, tRunner)
continue
}
todoTasks = append(todoTasks, tRunner)
}
}
if done {
return nil
}
if len(todoTasks) > 0 {
err := e.steps(wfCtx, todoTasks)
if err != nil {
return err
}
if e.needStop() {
return nil
}
if len(pendingTasks) > 0 {
return e.runAsDAG(wfCtx, pendingTasks)
}
}
return nil
}
func (e *engine) run(wfCtx wfContext.Context, taskRunners []wfTypes.TaskRunner) error {
if e.dagMode {
return e.runAsDAG(wfCtx, taskRunners)
}
return e.steps(wfCtx, e.todoByIndex(taskRunners))
}
func (e *engine) todoByIndex(taskRunners []wfTypes.TaskRunner) []wfTypes.TaskRunner {
index := 0
for _, t := range taskRunners {
for _, ss := range e.status.Steps {
if ss.Name == t.Name() {
if ss.Phase == common.WorkflowStepPhaseSucceeded {
index++
}
break
}
}
}
return taskRunners[index:]
}
func (e *engine) steps(wfCtx wfContext.Context, taskRunners []wfTypes.TaskRunner) error {
for _, runner := range taskRunners {
status, operation, err := runner.Run(wfCtx, &wfTypes.TaskRunOptions{})
if err != nil {
return err
}
e.updateStepStatus(status)
if status.Phase != common.WorkflowStepPhaseSucceeded {
if e.isDag() {
continue
}
return nil
}
if err := wfCtx.Commit(); err != nil {
return errors.WithMessage(err, "commit workflow context")
}
e.finishStep(operation)
if e.needStop() {
return nil
}
}
return nil
}
type engine struct {
dagMode bool
status *common.WorkflowStatus
}
func (e *engine) isDag() bool {
return e.dagMode
}
func (e *engine) finishStep(operation *wfTypes.Operation) {
if operation != nil {
e.status.Suspend = operation.Suspend
e.status.Terminated = operation.Terminated
}
}
func (e *engine) updateStepStatus(status common.WorkflowStepStatus) {
var conditionUpdated bool
for i := range e.status.Steps {
if e.status.Steps[i].Name == status.Name {
e.status.Steps[i] = status
conditionUpdated = true
break
}
}
if !conditionUpdated {
e.status.Steps = append(e.status.Steps, status)
}
}
func (e *engine) needStop() bool {
return e.status.Suspend || e.status.Terminated
}
func computeAppRevisionHash(rev string, app *oamcore.Application) (string, error) {
specHash, err := utils.ComputeSpecHash(app.Spec)
return fmt.Sprintf("%s:%s", rev, specHash), err
}