Merge pull request #1750 from hongchaodeng/workflow

add workflow package
This commit is contained in:
Hongchao Deng
2021-06-02 09:52:07 -07:00
committed by GitHub
5 changed files with 412 additions and 2 deletions

View File

@@ -642,12 +642,21 @@ func Object2Map(obj interface{}) (map[string]interface{}, error) {
// Object2RawExtension converts an object to a rawExtension
func Object2RawExtension(obj interface{}) runtime.RawExtension {
bts, _ := json.Marshal(obj)
bts := MustJSONMarshal(obj)
return runtime.RawExtension{
Raw: bts,
}
}
// MustJSONMarshal json-marshals an object into bytes. It panics on err.
func MustJSONMarshal(obj interface{}) []byte {
b, err := json.Marshal(obj)
if err != nil {
panic(err)
}
return b
}
// RawExtension2Map will convert rawExtension to map
func RawExtension2Map(raw *runtime.RawExtension) (map[string]interface{}, error) {
if raw == nil {

36
pkg/workflow/interface.go Normal file
View File

@@ -0,0 +1,36 @@
/*
Copyright 2021 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workflow
import (
"context"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)
// Workflow is used to execute the workflow steps of Application.
type Workflow interface {
// ExecuteSteps executes the steps of an Application with given steps of rendered resources.
// It returns done=true only if all steps are executed and succeeded.
ExecuteSteps(ctx context.Context, appRevName string, steps []*unstructured.Unstructured) (done bool, err error)
}
// SucceededMessage is the data json-marshalled into the message of `workflow-progress` condition
// when its reason is `succeeded`.
type SucceededMessage struct {
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
}

178
pkg/workflow/workflow.go Normal file
View File

@@ -0,0 +1,178 @@
/*
Copyright 2021 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workflow
import (
"context"
"encoding/json"
runtimev1alpha1 "github.com/crossplane/crossplane-runtime/apis/core/v1alpha1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"github.com/oam-dev/kubevela/apis/core.oam.dev/common"
oamcore "github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
"github.com/oam-dev/kubevela/apis/types"
"github.com/oam-dev/kubevela/pkg/controller/utils"
"github.com/oam-dev/kubevela/pkg/oam"
oamutil "github.com/oam-dev/kubevela/pkg/oam/util"
"github.com/oam-dev/kubevela/pkg/utils/apply"
)
type workflow struct {
app *oamcore.Application
applicator apply.Applicator
}
// NewWorkflow returns a Workflow implementation.
func NewWorkflow(app *oamcore.Application, applicator apply.Applicator) Workflow {
return &workflow{
app: app,
applicator: applicator,
}
}
func (w *workflow) ExecuteSteps(ctx context.Context, rev string, objects []*unstructured.Unstructured) (bool, error) {
steps := w.app.Spec.Workflow
if len(steps) == 0 {
return true, nil
}
w.app.Status.Phase = common.ApplicationRunningWorkflow
w.app.Status.Workflow = []common.WorkflowStepStatus{}
for i, step := range steps {
obj := objects[i].DeepCopy()
obj.SetName(step.Name)
obj.SetNamespace(w.app.Namespace)
obj.SetOwnerReferences([]metav1.OwnerReference{
*metav1.NewControllerRef(w.app, oamcore.ApplicationKindVersionKind),
})
err := w.applyWorkflowStep(ctx, obj, &types.WorkflowContext{
AppName: w.app.Name,
AppRevision: rev,
WorkflowIndex: i,
ResourceConfigMap: corev1.LocalObjectReference{
Name: rev,
},
})
if err != nil {
return false, err
}
status, err := w.syncWorkflowStatus(step, obj)
if err != nil {
return false, err
}
w.app.Status.Workflow = append(w.app.Status.Workflow, *status)
switch status.Phase {
case common.WorkflowStepPhaseSucceeded: // This one is done. Continue
case common.WorkflowStepPhaseRunning: // Need to retry shortly.
return false, nil
default:
return true, nil
}
}
return true, nil // all steps done
}
func (w *workflow) applyWorkflowStep(ctx context.Context, obj *unstructured.Unstructured, wctx *types.WorkflowContext) error {
if err := addWorkflowContextToAnnotation(obj, wctx); err != nil {
return err
}
return w.applicator.Apply(ctx, obj)
}
func addWorkflowContextToAnnotation(obj *unstructured.Unstructured, wc *types.WorkflowContext) error {
b, err := json.Marshal(wc)
if err != nil {
return err
}
m := map[string]string{
oam.AnnotationWorkflowContext: string(b),
}
obj.SetAnnotations(oamutil.MergeMapOverrideWithDst(m, obj.GetAnnotations()))
return nil
}
const (
// CondTypeWorkflowFinish is the type of the Condition indicating workflow progress
CondTypeWorkflowFinish = "workflow-progress"
// CondReasonSucceeded is the reason of the workflow progress condition which is succeeded
CondReasonSucceeded = "Succeeded"
// CondReasonStopped is the reason of the workflow progress condition which is stopped
CondReasonStopped = "Stopped"
// CondReasonFailed is the reason of the workflow progress condition which is failed
CondReasonFailed = "Failed"
// CondStatusTrue is the status of the workflow progress condition which is True
CondStatusTrue = "True"
)
func (w *workflow) syncWorkflowStatus(step oamcore.WorkflowStep, obj *unstructured.Unstructured) (*common.WorkflowStepStatus, error) {
status := &common.WorkflowStepStatus{
Name: step.Name,
Type: step.Type,
ResourceRef: runtimev1alpha1.TypedReference{
APIVersion: obj.GetAPIVersion(),
Kind: obj.GetKind(),
Name: obj.GetName(),
UID: obj.GetUID(),
},
}
cond, found, err := utils.GetUnstructuredObjectStatusCondition(obj, CondTypeWorkflowFinish)
if err != nil {
return nil, err
}
if !found || cond.Status != CondStatusTrue {
status.Phase = common.WorkflowStepPhaseRunning
return status, nil
}
switch cond.Reason {
case CondReasonSucceeded:
observedG, err := parseGeneration(cond.Message)
if err != nil {
return nil, err
}
if observedG != obj.GetGeneration() {
status.Phase = common.WorkflowStepPhaseRunning
} else {
status.Phase = common.WorkflowStepPhaseSucceeded
}
case CondReasonFailed:
status.Phase = common.WorkflowStepPhaseFailed
case CondReasonStopped:
status.Phase = common.WorkflowStepPhaseStopped
default:
status.Phase = common.WorkflowStepPhaseRunning
}
return status, nil
}
func parseGeneration(message string) (int64, error) {
m := &SucceededMessage{}
err := json.Unmarshal([]byte(message), m)
return m.ObservedGeneration, err
}

View File

@@ -0,0 +1,187 @@
/*
Copyright 2021 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workflow
import (
"context"
"encoding/json"
"testing"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
oamcore "github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
"github.com/oam-dev/kubevela/pkg/utils/apply"
)
func TestExecuteSteps(t *testing.T) {
zerostepApp := &oamcore.Application{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "test",
},
Spec: oamcore.ApplicationSpec{
Workflow: []oamcore.WorkflowStep{},
},
}
onestepApp := zerostepApp.DeepCopy()
onestepApp.Spec.Workflow = []oamcore.WorkflowStep{{
Name: "test",
Type: "test",
}}
twostepsApp := onestepApp.DeepCopy()
twostepsApp.Spec.Workflow = append(twostepsApp.Spec.Workflow, oamcore.WorkflowStep{
Name: "test2",
Type: "test2",
})
succeededMessage, err := json.Marshal(&SucceededMessage{ObservedGeneration: 1})
if err != nil {
panic(err)
}
succeededStep := &unstructured.Unstructured{
Object: map[string]interface{}{
"metadata": map[string]interface{}{
"generation": int64(1),
},
"status": map[string]interface{}{
"conditions": []interface{}{map[string]interface{}{
"type": CondTypeWorkflowFinish,
"reason": CondReasonSucceeded,
"message": string(succeededMessage),
"status": CondStatusTrue,
}},
},
},
}
succeededStepUnmatchedGen := &unstructured.Unstructured{
Object: map[string]interface{}{
"metadata": map[string]interface{}{
"generation": int64(2),
},
"status": map[string]interface{}{
"conditions": []interface{}{map[string]interface{}{
"type": CondTypeWorkflowFinish,
"reason": CondReasonSucceeded,
"message": string(succeededMessage),
"status": CondStatusTrue,
}},
},
},
}
runningStep := &unstructured.Unstructured{
Object: map[string]interface{}{
"status": map[string]interface{}{
"conditions": []interface{}{map[string]interface{}{
"type": CondTypeWorkflowFinish,
"status": CondStatusTrue,
}},
},
},
}
stoppedStep := &unstructured.Unstructured{
Object: map[string]interface{}{
"status": map[string]interface{}{
"conditions": []interface{}{map[string]interface{}{
"type": CondTypeWorkflowFinish,
"reason": CondReasonStopped,
"status": CondStatusTrue,
}},
},
},
}
type want struct {
done bool
err error
}
testcases := []struct {
desc string
app *oamcore.Application
steps []*unstructured.Unstructured
want want
}{{
desc: "zero steps should return true",
app: zerostepApp.DeepCopy(),
want: want{
done: true,
},
}, {
desc: "one succeeded step should return true",
app: onestepApp.DeepCopy(),
steps: []*unstructured.Unstructured{succeededStep.DeepCopy()},
want: want{
done: true,
},
}, {
desc: "one succeeded step with unmatched generation should return false",
app: onestepApp.DeepCopy(),
steps: []*unstructured.Unstructured{succeededStepUnmatchedGen.DeepCopy()},
want: want{
done: false,
},
}, {
desc: "one running step should return false",
app: onestepApp.DeepCopy(),
steps: []*unstructured.Unstructured{runningStep.DeepCopy()},
want: want{
done: false,
},
}, {
desc: "one stopped step should return true",
app: onestepApp.DeepCopy(),
steps: []*unstructured.Unstructured{stoppedStep.DeepCopy()},
want: want{
done: true,
},
}, {
desc: "one succeeded step and one running step should return false",
app: twostepsApp.DeepCopy(),
steps: []*unstructured.Unstructured{succeededStep.DeepCopy(), runningStep.DeepCopy()},
want: want{
done: false,
},
}}
for _, tc := range testcases {
t.Logf("%s", tc.desc)
done, err := NewWorkflow(tc.app, mockApplicator()).ExecuteSteps(context.Background(), "app-v1", tc.steps)
if err != nil {
assert.Equal(t, tc.want.err, err)
continue
}
assert.Equal(t, tc.want.done, done)
}
}
type testmockApplicator struct {
}
func (t *testmockApplicator) Apply(ctx context.Context, object runtime.Object, option ...apply.ApplyOption) error {
return nil
}
func mockApplicator() apply.Applicator {
return &testmockApplicator{}
}