diff --git a/design/vela-core/workflow_policy.md b/design/vela-core/workflow_policy.md index 76cdbcb0c..f2f295714 100644 --- a/design/vela-core/workflow_policy.md +++ b/design/vela-core/workflow_policy.md @@ -154,7 +154,7 @@ Each workflow step has the following interactions with the app controller: ```yaml conditions: - - type: workflow-finish + - type: workflow-progress status: 'True' reason: 'Succeeded' message: '{"observedGeneration":1}' diff --git a/pkg/oam/util/helper.go b/pkg/oam/util/helper.go index 76b83088c..158c70706 100644 --- a/pkg/oam/util/helper.go +++ b/pkg/oam/util/helper.go @@ -642,12 +642,21 @@ func Object2Map(obj interface{}) (map[string]interface{}, error) { // Object2RawExtension converts an object to a rawExtension func Object2RawExtension(obj interface{}) runtime.RawExtension { - bts, _ := json.Marshal(obj) + bts := MustJSONMarshal(obj) return runtime.RawExtension{ Raw: bts, } } +// MustJSONMarshal json-marshals an object into bytes. It panics on err. +func MustJSONMarshal(obj interface{}) []byte { + b, err := json.Marshal(obj) + if err != nil { + panic(err) + } + return b +} + // RawExtension2Map will convert rawExtension to map func RawExtension2Map(raw *runtime.RawExtension) (map[string]interface{}, error) { if raw == nil { diff --git a/pkg/workflow/interface.go b/pkg/workflow/interface.go new file mode 100644 index 000000000..043b3b8d6 --- /dev/null +++ b/pkg/workflow/interface.go @@ -0,0 +1,36 @@ +/* +Copyright 2021 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workflow + +import ( + "context" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +// Workflow is used to execute the workflow steps of Application. +type Workflow interface { + // ExecuteSteps executes the steps of an Application with given steps of rendered resources. + // It returns done=true only if all steps are executed and succeeded. + ExecuteSteps(ctx context.Context, appRevName string, steps []*unstructured.Unstructured) (done bool, err error) +} + +// SucceededMessage is the data json-marshalled into the message of `workflow-progress` condition +// when its reason is `succeeded`. +type SucceededMessage struct { + ObservedGeneration int64 `json:"observedGeneration,omitempty"` +} diff --git a/pkg/workflow/workflow.go b/pkg/workflow/workflow.go new file mode 100644 index 000000000..c3d959e82 --- /dev/null +++ b/pkg/workflow/workflow.go @@ -0,0 +1,178 @@ +/* +Copyright 2021 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workflow + +import ( + "context" + "encoding/json" + + runtimev1alpha1 "github.com/crossplane/crossplane-runtime/apis/core/v1alpha1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + + "github.com/oam-dev/kubevela/apis/core.oam.dev/common" + oamcore "github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1" + "github.com/oam-dev/kubevela/apis/types" + "github.com/oam-dev/kubevela/pkg/controller/utils" + "github.com/oam-dev/kubevela/pkg/oam" + oamutil "github.com/oam-dev/kubevela/pkg/oam/util" + "github.com/oam-dev/kubevela/pkg/utils/apply" +) + +type workflow struct { + app *oamcore.Application + applicator apply.Applicator +} + +// NewWorkflow returns a Workflow implementation. +func NewWorkflow(app *oamcore.Application, applicator apply.Applicator) Workflow { + return &workflow{ + app: app, + applicator: applicator, + } +} + +func (w *workflow) ExecuteSteps(ctx context.Context, rev string, objects []*unstructured.Unstructured) (bool, error) { + steps := w.app.Spec.Workflow + + if len(steps) == 0 { + return true, nil + } + + w.app.Status.Phase = common.ApplicationRunningWorkflow + + w.app.Status.Workflow = []common.WorkflowStepStatus{} + for i, step := range steps { + obj := objects[i].DeepCopy() + obj.SetName(step.Name) + obj.SetNamespace(w.app.Namespace) + obj.SetOwnerReferences([]metav1.OwnerReference{ + *metav1.NewControllerRef(w.app, oamcore.ApplicationKindVersionKind), + }) + err := w.applyWorkflowStep(ctx, obj, &types.WorkflowContext{ + AppName: w.app.Name, + AppRevision: rev, + WorkflowIndex: i, + ResourceConfigMap: corev1.LocalObjectReference{ + Name: rev, + }, + }) + if err != nil { + return false, err + } + + status, err := w.syncWorkflowStatus(step, obj) + if err != nil { + return false, err + } + + w.app.Status.Workflow = append(w.app.Status.Workflow, *status) + switch status.Phase { + case common.WorkflowStepPhaseSucceeded: // This one is done. Continue + case common.WorkflowStepPhaseRunning: // Need to retry shortly. + return false, nil + default: + return true, nil + } + } + return true, nil // all steps done +} + +func (w *workflow) applyWorkflowStep(ctx context.Context, obj *unstructured.Unstructured, wctx *types.WorkflowContext) error { + if err := addWorkflowContextToAnnotation(obj, wctx); err != nil { + return err + } + + return w.applicator.Apply(ctx, obj) +} + +func addWorkflowContextToAnnotation(obj *unstructured.Unstructured, wc *types.WorkflowContext) error { + b, err := json.Marshal(wc) + if err != nil { + return err + } + m := map[string]string{ + oam.AnnotationWorkflowContext: string(b), + } + obj.SetAnnotations(oamutil.MergeMapOverrideWithDst(m, obj.GetAnnotations())) + return nil +} + +const ( + // CondTypeWorkflowFinish is the type of the Condition indicating workflow progress + CondTypeWorkflowFinish = "workflow-progress" + + // CondReasonSucceeded is the reason of the workflow progress condition which is succeeded + CondReasonSucceeded = "Succeeded" + // CondReasonStopped is the reason of the workflow progress condition which is stopped + CondReasonStopped = "Stopped" + // CondReasonFailed is the reason of the workflow progress condition which is failed + CondReasonFailed = "Failed" + + // CondStatusTrue is the status of the workflow progress condition which is True + CondStatusTrue = "True" +) + +func (w *workflow) syncWorkflowStatus(step oamcore.WorkflowStep, obj *unstructured.Unstructured) (*common.WorkflowStepStatus, error) { + status := &common.WorkflowStepStatus{ + Name: step.Name, + Type: step.Type, + ResourceRef: runtimev1alpha1.TypedReference{ + APIVersion: obj.GetAPIVersion(), + Kind: obj.GetKind(), + Name: obj.GetName(), + UID: obj.GetUID(), + }, + } + + cond, found, err := utils.GetUnstructuredObjectStatusCondition(obj, CondTypeWorkflowFinish) + if err != nil { + return nil, err + } + + if !found || cond.Status != CondStatusTrue { + status.Phase = common.WorkflowStepPhaseRunning + return status, nil + } + + switch cond.Reason { + case CondReasonSucceeded: + observedG, err := parseGeneration(cond.Message) + if err != nil { + return nil, err + } + if observedG != obj.GetGeneration() { + status.Phase = common.WorkflowStepPhaseRunning + } else { + status.Phase = common.WorkflowStepPhaseSucceeded + } + case CondReasonFailed: + status.Phase = common.WorkflowStepPhaseFailed + case CondReasonStopped: + status.Phase = common.WorkflowStepPhaseStopped + default: + status.Phase = common.WorkflowStepPhaseRunning + } + return status, nil +} + +func parseGeneration(message string) (int64, error) { + m := &SucceededMessage{} + err := json.Unmarshal([]byte(message), m) + return m.ObservedGeneration, err +} diff --git a/pkg/workflow/workflow_test.go b/pkg/workflow/workflow_test.go new file mode 100644 index 000000000..9dcf8a446 --- /dev/null +++ b/pkg/workflow/workflow_test.go @@ -0,0 +1,187 @@ +/* +Copyright 2021 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workflow + +import ( + "context" + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + + oamcore "github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1" + "github.com/oam-dev/kubevela/pkg/utils/apply" +) + +func TestExecuteSteps(t *testing.T) { + + zerostepApp := &oamcore.Application{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + Spec: oamcore.ApplicationSpec{ + Workflow: []oamcore.WorkflowStep{}, + }, + } + + onestepApp := zerostepApp.DeepCopy() + onestepApp.Spec.Workflow = []oamcore.WorkflowStep{{ + Name: "test", + Type: "test", + }} + + twostepsApp := onestepApp.DeepCopy() + twostepsApp.Spec.Workflow = append(twostepsApp.Spec.Workflow, oamcore.WorkflowStep{ + Name: "test2", + Type: "test2", + }) + + succeededMessage, err := json.Marshal(&SucceededMessage{ObservedGeneration: 1}) + if err != nil { + panic(err) + } + + succeededStep := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "metadata": map[string]interface{}{ + "generation": int64(1), + }, + "status": map[string]interface{}{ + "conditions": []interface{}{map[string]interface{}{ + "type": CondTypeWorkflowFinish, + "reason": CondReasonSucceeded, + "message": string(succeededMessage), + "status": CondStatusTrue, + }}, + }, + }, + } + succeededStepUnmatchedGen := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "metadata": map[string]interface{}{ + "generation": int64(2), + }, + "status": map[string]interface{}{ + "conditions": []interface{}{map[string]interface{}{ + "type": CondTypeWorkflowFinish, + "reason": CondReasonSucceeded, + "message": string(succeededMessage), + "status": CondStatusTrue, + }}, + }, + }, + } + + runningStep := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "status": map[string]interface{}{ + "conditions": []interface{}{map[string]interface{}{ + "type": CondTypeWorkflowFinish, + "status": CondStatusTrue, + }}, + }, + }, + } + stoppedStep := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "status": map[string]interface{}{ + "conditions": []interface{}{map[string]interface{}{ + "type": CondTypeWorkflowFinish, + "reason": CondReasonStopped, + "status": CondStatusTrue, + }}, + }, + }, + } + + type want struct { + done bool + err error + } + + testcases := []struct { + desc string + app *oamcore.Application + steps []*unstructured.Unstructured + want want + }{{ + desc: "zero steps should return true", + app: zerostepApp.DeepCopy(), + want: want{ + done: true, + }, + }, { + desc: "one succeeded step should return true", + app: onestepApp.DeepCopy(), + steps: []*unstructured.Unstructured{succeededStep.DeepCopy()}, + want: want{ + done: true, + }, + }, { + desc: "one succeeded step with unmatched generation should return false", + app: onestepApp.DeepCopy(), + steps: []*unstructured.Unstructured{succeededStepUnmatchedGen.DeepCopy()}, + want: want{ + done: false, + }, + }, { + desc: "one running step should return false", + app: onestepApp.DeepCopy(), + steps: []*unstructured.Unstructured{runningStep.DeepCopy()}, + want: want{ + done: false, + }, + }, { + desc: "one stopped step should return true", + app: onestepApp.DeepCopy(), + steps: []*unstructured.Unstructured{stoppedStep.DeepCopy()}, + want: want{ + done: true, + }, + }, { + desc: "one succeeded step and one running step should return false", + app: twostepsApp.DeepCopy(), + steps: []*unstructured.Unstructured{succeededStep.DeepCopy(), runningStep.DeepCopy()}, + want: want{ + done: false, + }, + }} + for _, tc := range testcases { + t.Logf("%s", tc.desc) + done, err := NewWorkflow(tc.app, mockApplicator()).ExecuteSteps(context.Background(), "app-v1", tc.steps) + if err != nil { + assert.Equal(t, tc.want.err, err) + continue + } + assert.Equal(t, tc.want.done, done) + } +} + +type testmockApplicator struct { +} + +func (t *testmockApplicator) Apply(ctx context.Context, object runtime.Object, option ...apply.ApplyOption) error { + return nil +} + +func mockApplicator() apply.Applicator { + return &testmockApplicator{} +}