Files
kubevela/pkg/workflow/operation/operation.go
Ai Ranthem a5606b7808
Some checks failed
CodeQL / Analyze (go) (push) Failing after 6m23s
Definition-Lint / definition-doc (push) Failing after 3m8s
E2E MultiCluster Test / detect-noop (push) Successful in 6s
E2E Test / detect-noop (push) Successful in 3s
Go / detect-noop (push) Successful in 2s
license / Check for unapproved licenses (push) Failing after 18s
Registry / publish-core-images (push) Failing after 1m4s
Unit-Test / detect-noop (push) Successful in 20s
Sync SDK / sync_sdk (push) Failing after 3m9s
Go / staticcheck (push) Successful in 6m17s
Go / check-diff (push) Failing after 19m4s
Go / check-core-image-build (push) Failing after 5m44s
Go / check-cli-image-build (push) Failing after 3m31s
Unit-Test / unit-tests (push) Failing after 13m54s
Go / lint (push) Failing after 1h53m27s
Scorecards supply-chain security / Scorecards analysis (push) Failing after 1m27s
E2E MultiCluster Test / e2e-multi-cluster-tests (v1.29) (push) Has been cancelled
E2E Test / e2e-tests (v1.29) (push) Has been cancelled
Go / check-windows (push) Has been cancelled
Chore: (deps): Update k8s to 1.29 (#6654)
* chore: update k8s to 1.29

Signed-off-by: phantomnat <w.nattadej@gmail.com>

* fix: unit test

Signed-off-by: phantomnat <w.nattadej@gmail.com>

* fix: lint

Signed-off-by: phantomnat <w.nattadej@gmail.com>

* fix: lint

Signed-off-by: phantomnat <w.nattadej@gmail.com>

* fix: e2e

Signed-off-by: phantomnat <w.nattadej@gmail.com>

* fix: lint and e2e test

Signed-off-by: phantomnat <w.nattadej@gmail.com>

* test(e2e): increase timeout

Signed-off-by: phantomnat <w.nattadej@gmail.com>

* fix e2e and scripts

Signed-off-by: AiRanthem <zhongtianyun.zty@alibaba-inc.com>

* make reviewable

Signed-off-by: AiRanthem <zhongtianyun.zty@alibaba-inc.com>

* rollback a unnecessary ut change

Signed-off-by: AiRanthem <zhongtianyun.zty@alibaba-inc.com>

* update go.mod to import merged workflow

Signed-off-by: AiRanthem <zhongtianyun.zty@alibaba-inc.com>

---------

Signed-off-by: phantomnat <w.nattadej@gmail.com>
Signed-off-by: AiRanthem <zhongtianyun.zty@alibaba-inc.com>
Co-authored-by: phantomnat <w.nattadej@gmail.com>
2025-01-03 07:54:42 +08:00

524 lines
17 KiB
Go

/*
Copyright 2021 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package operation
import (
"context"
"fmt"
"io"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
workflowv1alpha1 "github.com/kubevela/workflow/api/v1alpha1"
wfTypes "github.com/kubevela/workflow/pkg/types"
wfUtils "github.com/kubevela/workflow/pkg/utils"
"github.com/oam-dev/kubevela/apis/core.oam.dev/common"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
"github.com/oam-dev/kubevela/pkg/appfile"
"github.com/oam-dev/kubevela/pkg/controller/core.oam.dev/v1beta1/application"
"github.com/oam-dev/kubevela/pkg/controller/utils"
"github.com/oam-dev/kubevela/pkg/oam"
"github.com/oam-dev/kubevela/pkg/resourcetracker"
"github.com/oam-dev/kubevela/pkg/rollout"
kubevelaapp "github.com/oam-dev/kubevela/pkg/utils/app"
errors3 "github.com/oam-dev/kubevela/pkg/utils/errors"
)
// NewApplicationWorkflowOperator get an workflow operator with k8sClient, ioWriter(optional, useful for cli) and application
func NewApplicationWorkflowOperator(cli client.Client, w io.Writer, app *v1beta1.Application) wfUtils.WorkflowOperator {
return appWorkflowOperator{
cli: cli,
outputWriter: w,
application: app,
}
}
// NewApplicationWorkflowStepOperator get an workflow step operator with k8sClient, ioWriter(optional, useful for cli) and application
func NewApplicationWorkflowStepOperator(cli client.Client, w io.Writer, app *v1beta1.Application) wfUtils.WorkflowStepOperator {
return appWorkflowStepOperator{
cli: cli,
outputWriter: w,
application: app,
}
}
type appWorkflowOperator struct {
cli client.Client
outputWriter io.Writer
application *v1beta1.Application
}
type appWorkflowStepOperator struct {
cli client.Client
outputWriter io.Writer
application *v1beta1.Application
}
// Suspend a running workflow
func (wo appWorkflowOperator) Suspend(ctx context.Context) error {
app := wo.application
if app.Status.Workflow == nil {
return fmt.Errorf("the workflow in application is not running")
}
var err error
if err = rollout.SuspendRollout(ctx, wo.cli, app, wo.outputWriter); err != nil {
return err
}
if err := SuspendWorkflow(ctx, wo.cli, app, ""); err != nil {
return err
}
return writeOutputF(wo.outputWriter, "Successfully suspend workflow: %s\n", app.Name)
}
// Suspend a suspending workflow
func (wo appWorkflowStepOperator) Suspend(ctx context.Context, step string) error {
if step == "" {
return fmt.Errorf("step can not be empty")
}
app := wo.application
if app.Status.Workflow == nil {
return fmt.Errorf("the workflow in application is not running")
}
if app.Status.Workflow.Terminated {
return fmt.Errorf("can not suspend a terminated workflow")
}
if err := SuspendWorkflow(ctx, wo.cli, app, step); err != nil {
return err
}
return writeOutputF(wo.outputWriter, "Successfully suspend workflow %s from step %s \n", app.Name, step)
}
// SuspendWorkflow suspend workflow
func SuspendWorkflow(ctx context.Context, kubecli client.Client, app *v1beta1.Application, stepName string) error {
app.Status.Workflow.Suspend = true
steps := app.Status.Workflow.Steps
found := stepName == ""
for i, step := range steps {
for j, sub := range step.SubStepsStatus {
if sub.Phase != workflowv1alpha1.WorkflowStepPhaseRunning {
continue
}
if stepName == "" {
wfUtils.OperateSteps(steps, i, j, workflowv1alpha1.WorkflowStepPhaseSuspending)
} else if stepName == sub.Name {
wfUtils.OperateSteps(steps, i, j, workflowv1alpha1.WorkflowStepPhaseSuspending)
found = true
break
}
}
if step.Phase != workflowv1alpha1.WorkflowStepPhaseRunning {
continue
}
if stepName == "" {
wfUtils.OperateSteps(steps, i, -1, workflowv1alpha1.WorkflowStepPhaseSuspending)
} else if stepName == step.Name {
wfUtils.OperateSteps(steps, i, -1, workflowv1alpha1.WorkflowStepPhaseSuspending)
found = true
break
}
}
if !found {
return fmt.Errorf("can not find step %s", stepName)
}
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return kubecli.Status().Patch(ctx, app, client.Merge)
})
}
// Resume a suspending workflow
func (wo appWorkflowOperator) Resume(ctx context.Context) error {
app := wo.application
if app.Status.Workflow == nil {
return fmt.Errorf("the workflow in application is not running")
}
if app.Status.Workflow.Terminated {
return fmt.Errorf("can not resume a terminated workflow")
}
var rolloutResumed bool
var err error
if rolloutResumed, err = rollout.ResumeRollout(ctx, wo.cli, app, wo.outputWriter); err != nil {
return err
}
if !rolloutResumed && !app.Status.Workflow.Suspend {
return writeOutputF(wo.outputWriter, "workflow %s is not suspended.\n", app.Name)
}
if app.Status.Workflow.Suspend {
if err = ResumeWorkflow(ctx, wo.cli, app, ""); err != nil {
return err
}
}
return writeOutputF(wo.outputWriter, "Successfully resume workflow: %s\n", app.Name)
}
// Resume a suspending workflow
func (wo appWorkflowStepOperator) Resume(ctx context.Context, step string) error {
if step == "" {
return fmt.Errorf("step can not be empty")
}
app := wo.application
if app.Status.Workflow == nil {
return fmt.Errorf("the workflow in application is not running")
}
if app.Status.Workflow.Terminated {
return fmt.Errorf("can not resume a terminated workflow")
}
if !app.Status.Workflow.Suspend {
return writeOutputF(wo.outputWriter, "workflow %s is not suspended.\n", app.Name)
}
if app.Status.Workflow.Suspend {
if err := ResumeWorkflow(ctx, wo.cli, app, step); err != nil {
return err
}
}
return writeOutputF(wo.outputWriter, "Successfully resume workflow %s from step %s \n", app.Name, step)
}
// ResumeWorkflow resume workflow
func ResumeWorkflow(ctx context.Context, kubecli client.Client, app *v1beta1.Application, stepName string) error {
app.Status.Workflow.Suspend = false
steps := app.Status.Workflow.Steps
found := stepName == ""
for i, step := range steps {
for j, sub := range step.SubStepsStatus {
if sub.Phase != workflowv1alpha1.WorkflowStepPhaseSuspending {
continue
}
if stepName == "" {
wfUtils.OperateSteps(steps, i, j, workflowv1alpha1.WorkflowStepPhaseRunning)
} else if stepName == sub.Name {
wfUtils.OperateSteps(steps, i, j, workflowv1alpha1.WorkflowStepPhaseRunning)
found = true
break
}
}
if step.Phase != workflowv1alpha1.WorkflowStepPhaseSuspending {
continue
}
if stepName == "" {
wfUtils.OperateSteps(steps, i, -1, workflowv1alpha1.WorkflowStepPhaseRunning)
} else if stepName == step.Name {
wfUtils.OperateSteps(steps, i, -1, workflowv1alpha1.WorkflowStepPhaseRunning)
found = true
break
}
}
if !found {
return fmt.Errorf("can not find step %s", stepName)
}
return kubecli.Status().Patch(ctx, app, client.Merge)
}
// Rollback a running in middle state workflow.
// nolint
func (wo appWorkflowOperator) Rollback(ctx context.Context) error {
app := wo.application
if app.Status.Workflow != nil && !app.Status.Workflow.Terminated && !app.Status.Workflow.Suspend && !app.Status.Workflow.Finished {
return fmt.Errorf("can not rollback a running workflow")
}
if oam.GetPublishVersion(app) == "" {
if app.Status.LatestRevision == nil || app.Status.LatestRevision.Name == "" {
return fmt.Errorf("the latest revision is not set: %s", app.Name)
}
// get the last revision
revision := &v1beta1.ApplicationRevision{}
if err := wo.cli.Get(ctx, types.NamespacedName{Name: app.Status.LatestRevision.Name, Namespace: app.Namespace}, revision); err != nil {
return fmt.Errorf("failed to get the latest revision: %w", err)
}
app.Spec = revision.Spec.Application.Spec
if err := wo.cli.Update(ctx, app); err != nil {
return err
}
fmt.Printf("Successfully rollback workflow to the latest revision: %s\n", app.Name)
return nil
}
appRevs, err := application.GetSortedAppRevisions(ctx, wo.cli, app.Name, app.Namespace)
if err != nil {
return errors.Wrapf(err, "failed to list revisions for application %s/%s", app.Namespace, app.Name)
}
// find succeeded revision to rollback
var rev *v1beta1.ApplicationRevision
var outdatedRev []*v1beta1.ApplicationRevision
for i := range appRevs {
candidate := appRevs[len(appRevs)-i-1]
_rev := candidate.DeepCopy()
if !candidate.Status.Succeeded || oam.GetPublishVersion(_rev) == "" {
outdatedRev = append(outdatedRev, _rev)
continue
}
rev = _rev
break
}
if rev == nil {
return errors.Errorf("failed to find previous succeeded revision for application %s/%s", app.Namespace, app.Name)
}
publishVersion := oam.GetPublishVersion(rev)
revisionNumber, err := utils.ExtractRevision(rev.Name)
if err != nil {
return errors.Wrapf(err, "failed to extract revision number from revision %s", rev.Name)
}
_, currentRT, historyRTs, _, err := resourcetracker.ListApplicationResourceTrackers(ctx, wo.cli, app)
if err != nil {
return errors.Wrapf(err, "failed to list resource trackers for application %s/%s", app.Namespace, app.Name)
}
var matchRT *v1beta1.ResourceTracker
for _, rt := range append(historyRTs, currentRT) {
if rt == nil {
continue
}
labels := rt.GetLabels()
if labels != nil && labels[oam.LabelAppRevision] == rev.Name {
matchRT = rt.DeepCopy()
}
}
if matchRT == nil {
return errors.Errorf("cannot find resource tracker for previous revision %s, unable to rollback", rev.Name)
}
if matchRT.DeletionTimestamp != nil {
return errors.Errorf("previous revision %s is being recycled, unable to rollback", rev.Name)
}
err = wo.writeOutput("Find succeeded application revision %s (PublishVersion: %s) to rollback.\n")
if err != nil {
return err
}
appKey := client.ObjectKeyFromObject(app)
// rollback application spec and freeze
controllerRequirement, err := kubevelaapp.FreezeApplication(ctx, wo.cli, app, func() {
app.Spec = rev.Spec.Application.Spec
oam.SetPublishVersion(app, publishVersion)
})
if err != nil {
return errors.Wrapf(err, "failed to rollback application spec to revision %s (PublishVersion: %s)", rev.Name, publishVersion)
}
err = writeOutputF(wo.outputWriter, "Application spec rollback successfully.\n")
if err != nil {
return err
}
// rollback application status
if err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
if err = wo.cli.Get(ctx, appKey, app); err != nil {
return err
}
app.Status.Workflow = rev.Status.Workflow
app.Status.Services = []common.ApplicationComponentStatus{}
app.Status.AppliedResources = []common.ClusterObjectReference{}
for _, rsc := range matchRT.Spec.ManagedResources {
app.Status.AppliedResources = append(app.Status.AppliedResources, rsc.ClusterObjectReference)
}
app.Status.LatestRevision = &common.Revision{
Name: rev.Name,
Revision: int64(revisionNumber),
RevisionHash: rev.GetLabels()[oam.LabelAppRevisionHash],
}
return wo.cli.Status().Update(ctx, app)
}); err != nil {
return errors.Wrapf(err, "failed to rollback application status to revision %s (PublishVersion: %s)", rev.Name, publishVersion)
}
err = writeOutputF(wo.outputWriter, "Application status rollback successfully.\n")
if err != nil {
return err
}
// update resource tracker generation
matchRTKey := client.ObjectKeyFromObject(matchRT)
if err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
if err = wo.cli.Get(ctx, matchRTKey, matchRT); err != nil {
return err
}
matchRT.Spec.ApplicationGeneration = app.Generation
return wo.cli.Update(ctx, matchRT)
}); err != nil {
return errors.Wrapf(err, "failed to update application generation in resource tracker")
}
// unfreeze application
if err = kubevelaapp.UnfreezeApplication(ctx, wo.cli, app, nil, controllerRequirement); err != nil {
return errors.Wrapf(err, "failed to resume application to restart")
}
rollback, err := rollout.RollbackRollout(ctx, wo.cli, app, wo.outputWriter)
if err != nil {
return err
}
if rollback {
err = writeOutputF(wo.outputWriter, "Successfully rollback app.\n")
if err != nil {
return err
}
}
// clean up outdated revisions
var errs errors3.ErrorList
for _, _rev := range outdatedRev {
if err = wo.cli.Delete(ctx, _rev); err != nil {
errs = append(errs, err)
}
}
if errs.HasError() {
return errors.Wrapf(errs, "failed to clean up outdated revisions")
}
err = writeOutputF(wo.outputWriter, "Application outdated revision cleaned up.\n")
if err != nil {
return err
}
return nil
}
// Restart a terminated or finished workflow.
func (wo appWorkflowOperator) Restart(ctx context.Context) error {
app := wo.application
status := app.Status.Workflow
if status == nil {
return fmt.Errorf("the workflow in application is not running")
}
// reset the workflow status to restart the workflow
app.Status.Workflow = nil
if err := wo.cli.Status().Update(ctx, app); err != nil {
return err
}
return writeOutputF(wo.outputWriter, "Successfully restart workflow: %s\n", app.Name)
}
// Restart a terminated or finished workflow.
func (wo appWorkflowStepOperator) Restart(ctx context.Context, step string) error {
if step == "" {
return fmt.Errorf("step can not be empty")
}
app := wo.application
status := app.Status.Workflow
if status == nil {
return fmt.Errorf("the workflow in application is not running")
}
status.Terminated = false
status.Suspend = false
status.Finished = false
if !status.EndTime.IsZero() {
status.EndTime = metav1.Time{}
}
var cm *corev1.ConfigMap
if status.ContextBackend != nil {
if err := wo.cli.Get(ctx, client.ObjectKey{Namespace: app.Namespace, Name: status.ContextBackend.Name}, cm); err != nil {
return err
}
}
appParser := appfile.NewApplicationParser(wo.cli)
appFile, err := appParser.GenerateAppFile(ctx, app)
if err != nil {
return fmt.Errorf("failed to parse appfile: %w", err)
}
stepStatus, cm, err := wfUtils.CleanStatusFromStep(appFile.WorkflowSteps, status.Steps, *appFile.WorkflowMode, cm, step)
if err != nil {
return err
}
status.Steps = stepStatus
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return wo.cli.Status().Update(ctx, app)
}); err != nil {
return err
}
if cm != nil {
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return wo.cli.Update(ctx, cm)
}); err != nil {
return err
}
}
return writeOutputF(wo.outputWriter, "Successfully restart workflow %s from step %s\n", app.Name, step)
}
func (wo appWorkflowOperator) Terminate(ctx context.Context) error {
app := wo.application
if err := TerminateWorkflow(ctx, wo.cli, app); err != nil {
return err
}
return writeOutputF(wo.outputWriter, "Successfully terminate workflow: %s\n", app.Name)
}
// TerminateWorkflow terminate workflow
func TerminateWorkflow(ctx context.Context, kubecli client.Client, app *v1beta1.Application) error {
// set the workflow terminated to true
app.Status.Workflow.Terminated = true
// set the workflow suspend to false
app.Status.Workflow.Suspend = false
steps := app.Status.Workflow.Steps
for i, step := range steps {
switch step.Phase {
case workflowv1alpha1.WorkflowStepPhaseFailed:
if step.Reason != wfTypes.StatusReasonFailedAfterRetries && step.Reason != wfTypes.StatusReasonTimeout {
steps[i].Reason = wfTypes.StatusReasonTerminate
}
case workflowv1alpha1.WorkflowStepPhaseRunning, workflowv1alpha1.WorkflowStepPhaseSuspending:
steps[i].Phase = workflowv1alpha1.WorkflowStepPhaseFailed
steps[i].Reason = wfTypes.StatusReasonTerminate
default:
}
for j, sub := range step.SubStepsStatus {
switch sub.Phase {
case workflowv1alpha1.WorkflowStepPhaseFailed:
if sub.Reason != wfTypes.StatusReasonFailedAfterRetries && sub.Reason != wfTypes.StatusReasonTimeout {
steps[i].SubStepsStatus[j].Reason = wfTypes.StatusReasonTerminate
}
case workflowv1alpha1.WorkflowStepPhaseRunning, workflowv1alpha1.WorkflowStepPhaseSuspending:
steps[i].SubStepsStatus[j].Phase = workflowv1alpha1.WorkflowStepPhaseFailed
steps[i].SubStepsStatus[j].Reason = wfTypes.StatusReasonTerminate
default:
}
}
}
return kubecli.Status().Patch(ctx, app, client.Merge)
}
func (wo appWorkflowOperator) writeOutput(str string) error {
if wo.outputWriter == nil {
return nil
}
_, err := wo.outputWriter.Write([]byte(str))
return err
}
func writeOutputF(outputWriter io.Writer, format string, a ...interface{}) error {
if outputWriter == nil {
return nil
}
_, err := fmt.Fprintf(outputWriter, format, a...)
return err
}