Feat: support dependsOn in deploy workflowstep (#3750)

Signed-off-by: Somefive <yd219913@alibaba-inc.com>
This commit is contained in:
Somefive
2022-04-28 16:29:34 +08:00
committed by GitHub
parent 7935447d44
commit 512664b9b3
26 changed files with 702 additions and 664 deletions

View File

@@ -191,10 +191,9 @@ func (in *ResourceTracker) findMangedResourceIndex(mr ManagedResource) int {
return -1
}
// AddManagedResource add object to managed resources, if exists, update
func (in *ResourceTracker) AddManagedResource(rsc client.Object, metaOnly bool, creator common.ResourceCreatorRole) (updated bool) {
func newManagedResourceFromResource(rsc client.Object) ManagedResource {
gvk := rsc.GetObjectKind().GroupVersionKind()
mr := ManagedResource{
return ManagedResource{
ClusterObjectReference: common.ClusterObjectReference{
ObjectReference: v1.ObjectReference{
APIVersion: gvk.GroupVersion().String(),
@@ -207,6 +206,17 @@ func (in *ResourceTracker) AddManagedResource(rsc client.Object, metaOnly bool,
OAMObjectReference: common.NewOAMObjectReferenceFromObject(rsc),
Deleted: false,
}
}
// ContainsManagedResource check if resource exists in ResourceTracker
func (in *ResourceTracker) ContainsManagedResource(rsc client.Object) bool {
mr := newManagedResourceFromResource(rsc)
return in.findMangedResourceIndex(mr) >= 0
}
// AddManagedResource add object to managed resources, if exists, update
func (in *ResourceTracker) AddManagedResource(rsc client.Object, metaOnly bool, creator common.ResourceCreatorRole) (updated bool) {
mr := newManagedResourceFromResource(rsc)
if !metaOnly {
mr.Data = &runtime.RawExtension{Object: rsc}
}

View File

@@ -15,41 +15,9 @@ spec:
"vela/op"
)
deploy: op.#Steps & {
load: op.#Load @step(1)
_components: [ for k, v in load.value {v}]
loadPoliciesInOrder: op.#LoadPoliciesInOrder & {
if parameter.policies != _|_ {
input: parameter.policies
}
} @step(2)
_policies: loadPoliciesInOrder.output
handleDeployPolicies: op.#HandleDeployPolicies & {
inputs: {
components: _components
policies: _policies
}
} @step(3)
_decisions: handleDeployPolicies.outputs.decisions
_patchedComponents: handleDeployPolicies.outputs.components
deploy: op.#ApplyComponents & {
parallelism: parameter.parallelism
components: {
for decision in _decisions {
for key, comp in _patchedComponents {
"\(decision.cluster)-\(decision.namespace)-\(key)": {
value: comp
if decision.cluster != _|_ {
cluster: decision.cluster
}
if decision.namespace != _|_ {
namespace: decision.namespace
}
}
}
}
}
} @step(4)
deploy: op.#Deploy & {
policies: parameter.policies
parallelism: parameter.parallelism
}
parameter: {
auto: *true | bool

View File

@@ -15,41 +15,9 @@ spec:
"vela/op"
)
deploy: op.#Steps & {
load: op.#Load @step(1)
_components: [ for k, v in load.value {v}]
loadPoliciesInOrder: op.#LoadPoliciesInOrder & {
if parameter.policies != _|_ {
input: parameter.policies
}
} @step(2)
_policies: loadPoliciesInOrder.output
handleDeployPolicies: op.#HandleDeployPolicies & {
inputs: {
components: _components
policies: _policies
}
} @step(3)
_decisions: handleDeployPolicies.outputs.decisions
_patchedComponents: handleDeployPolicies.outputs.components
deploy: op.#ApplyComponents & {
parallelism: parameter.parallelism
components: {
for decision in _decisions {
for key, comp in _patchedComponents {
"\(decision.cluster)-\(decision.namespace)-\(key)": {
value: comp
if decision.cluster != _|_ {
cluster: decision.cluster
}
if decision.namespace != _|_ {
namespace: decision.namespace
}
}
}
}
}
} @step(4)
deploy: op.#Deploy & {
policies: parameter.policies
parallelism: parameter.parallelism
}
parameter: {
auto: *true | bool

View File

@@ -43,6 +43,7 @@ import (
"github.com/oam-dev/kubevela/apis/types"
"github.com/oam-dev/kubevela/pkg/appfile/helm"
velaclient "github.com/oam-dev/kubevela/pkg/client"
"github.com/oam-dev/kubevela/pkg/component"
"github.com/oam-dev/kubevela/pkg/cue/definition"
"github.com/oam-dev/kubevela/pkg/cue/model"
"github.com/oam-dev/kubevela/pkg/cue/model/value"
@@ -917,3 +918,33 @@ func (af *Appfile) PolicyClient(cli client.Client) client.Client {
},
}
}
// LoadDynamicComponent for ref-objects typed components, this function will load referred objects from stored revisions
func (af *Appfile) LoadDynamicComponent(ctx context.Context, cli client.Client, comp *common.ApplicationComponent) (*common.ApplicationComponent, error) {
if comp.Type != v1alpha1.RefObjectsComponentType {
return comp, nil
}
_comp := comp.DeepCopy()
spec := &v1alpha1.RefObjectsComponentSpec{}
if err := json.Unmarshal(comp.Properties.Raw, spec); err != nil {
return nil, errors.Wrapf(err, "invalid ref-objects component properties")
}
var uns []*unstructured.Unstructured
for _, selector := range spec.Objects {
objs, err := component.SelectRefObjectsForDispatch(ctx, component.ReferredObjectsDelegatingClient(cli, af.ReferredObjects), af.Namespace, comp.Name, selector)
if err != nil {
return nil, errors.Wrapf(err, "failed to select objects from referred objects in revision storage")
}
uns = component.AppendUnstructuredObjects(uns, objs...)
}
refObjs, err := component.ConvertUnstructuredsToReferredObjects(uns)
if err != nil {
return nil, errors.Wrapf(err, "failed to marshal referred object")
}
bs, err := json.Marshal(&common.ReferredObjectList{Objects: refObjs})
if err != nil {
return nil, errors.Wrapf(err, "failed to marshal loaded ref-objects")
}
_comp.Properties = &runtime.RawExtension{Raw: bs}
return _comp, nil
}

View File

@@ -12,41 +12,9 @@ spec:
"vela/op"
)
deploy: op.#Steps & {
load: op.#Load @step(1)
_components: [ for k, v in load.value {v}]
loadPoliciesInOrder: op.#LoadPoliciesInOrder & {
if parameter.policies != _|_ {
input: parameter.policies
}
} @step(2)
_policies: loadPoliciesInOrder.output
handleDeployPolicies: op.#HandleDeployPolicies & {
inputs: {
components: _components
policies: _policies
}
} @step(3)
_decisions: handleDeployPolicies.outputs.decisions
_patchedComponents: handleDeployPolicies.outputs.components
deploy: op.#ApplyComponents & {
parallelism: parameter.parallelism
components: {
for decision in _decisions {
for key, comp in _patchedComponents {
"\(decision.cluster)-\(decision.namespace)-\(key)": {
value: comp
if decision.cluster != _|_ {
cluster: decision.cluster
}
if decision.namespace != _|_ {
namespace: decision.namespace
}
}
}
}
}
} @step(4)
deploy: op.#Deploy & {
policies: parameter.policies
parallelism: parameter.parallelism
}
parameter: {
auto: *true | bool

View File

@@ -212,6 +212,7 @@ func (h *AppHandler) ProduceArtifacts(ctx context.Context, comps []*types.Compon
return h.createResourcesConfigMap(ctx, h.currentAppRev, comps, policies)
}
// nolint
func (h *AppHandler) collectHealthStatus(ctx context.Context, wl *appfile.Workload, appRev *v1beta1.ApplicationRevision, overrideNamespace string) (*common.ApplicationComponentStatus, bool, error) {
namespace := h.app.Namespace
if overrideNamespace != "" {

View File

@@ -70,7 +70,10 @@ func (h *AppHandler) GenerateApplicationSteps(ctx monitorContext.Context,
http.Install(handlerProviders, h.r.Client, app.Namespace)
pCtx := process.NewContext(generateContextDataFromApp(app, appRev.Name))
taskDiscover := tasks.NewTaskDiscoverFromRevision(ctx, handlerProviders, h.r.pd, appRev, h.r.dm, pCtx)
multiclusterProvider.Install(handlerProviders, h.r.Client, app)
multiclusterProvider.Install(handlerProviders, h.r.Client, app, af,
h.applyComponentFunc(appParser, appRev, af),
h.checkComponentHealth(appParser, appRev, af),
)
terraformProvider.Install(handlerProviders, app, func(comp common.ApplicationComponent) (*appfile.Workload, error) {
return appParser.ParseWorkloadFromRevision(comp, appRev)
})
@@ -173,6 +176,36 @@ func (h *AppHandler) renderComponentFunc(appParser *appfile.Parser, appRev *v1be
}
}
func (h *AppHandler) checkComponentHealth(appParser *appfile.Parser, appRev *v1beta1.ApplicationRevision, af *appfile.Appfile) oamProvider.ComponentHealthCheck {
return func(comp common.ApplicationComponent, patcher *value.Value, clusterName string, overrideNamespace string, env string) (bool, error) {
ctx := multicluster.ContextWithClusterName(context.Background(), clusterName)
ctx = contextWithComponentRevisionNamespace(ctx, overrideNamespace)
wl, manifest, err := h.prepareWorkloadAndManifests(ctx, appParser, comp, appRev, patcher, af)
if err != nil {
return false, err
}
wl.Ctx.SetCtx(ctx)
readyWorkload, readyTraits, err := renderComponentsAndTraits(h.r.Client, manifest, appRev, clusterName, overrideNamespace, env)
if err != nil {
return false, err
}
checkSkipApplyWorkload(wl)
dispatchResources := readyTraits
if !wl.SkipApplyWorkload {
dispatchResources = append([]*unstructured.Unstructured{readyWorkload}, readyTraits...)
}
if !h.resourceKeeper.ContainsResources(dispatchResources) {
return false, err
}
_, isHealth, err := h.collectHealthStatus(ctx, wl, appRev, overrideNamespace)
return isHealth, err
}
}
func (h *AppHandler) applyComponentFunc(appParser *appfile.Parser, appRev *v1beta1.ApplicationRevision, af *appfile.Appfile) oamProvider.ComponentApply {
return func(comp common.ApplicationComponent, patcher *value.Value, clusterName string, overrideNamespace string, env string) (*unstructured.Unstructured, []*unstructured.Unstructured, bool, error) {
t := time.Now()

View File

@@ -516,6 +516,17 @@ func (val *Value) GetString(paths ...string) (string, error) {
return v.CueValue().String()
}
// GetStringSlice get string slice from val
func (val *Value) GetStringSlice(paths ...string) ([]string, error) {
v, err := val.LookupValue(paths...)
if err != nil {
return nil, err
}
var s []string
err = v.UnmarshalTo(&s)
return s, err
}
// GetInt64 get the int value at a path starting from v.
func (val *Value) GetInt64(paths ...string) (int64, error) {
v, err := val.LookupValue(paths...)

View File

@@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"
"regexp"
"strings"
"github.com/imdario/mergo"
"github.com/pkg/errors"
@@ -180,7 +181,7 @@ func PatchComponents(baseComponents []common.ApplicationComponent, patchComponen
// 3. if the matched component uses a different type, the matched component will be overridden by the patch
// 4. if no component matches, and the component name is a valid kubernetes name, a new component will be added
addComponent := regexp.MustCompile("[a-z]([a-z-]{0,61}[a-z])?").MatchString(comp.Name)
if re, err := regexp.Compile(comp.Name); err == nil {
if re, err := regexp.Compile(strings.ReplaceAll(comp.Name, "*", ".*")); err == nil {
for compName, baseComp := range compMaps {
if re.MatchString(compName) {
addComponent = false

View File

@@ -42,7 +42,7 @@ func GetClusterLabelSelectorInTopology(topology *v1alpha1.TopologyPolicySpec) ma
}
// GetPlacementsFromTopologyPolicies get placements from topology policies with provided client
func GetPlacementsFromTopologyPolicies(ctx context.Context, cli client.Client, app *v1beta1.Application, policies []v1beta1.AppPolicy, allowCrossNamespace bool) ([]v1alpha1.PlacementDecision, error) {
func GetPlacementsFromTopologyPolicies(ctx context.Context, cli client.Client, appNs string, policies []v1beta1.AppPolicy, allowCrossNamespace bool) ([]v1alpha1.PlacementDecision, error) {
var placements []v1alpha1.PlacementDecision
placementMap := map[string]struct{}{}
addCluster := func(cluster string, ns string, validateCluster bool) error {
@@ -51,7 +51,7 @@ func GetPlacementsFromTopologyPolicies(ctx context.Context, cli client.Client, a
return errors.Wrapf(e, "failed to get cluster %s", cluster)
}
}
if !allowCrossNamespace && (ns != app.GetNamespace() && ns != "") {
if !allowCrossNamespace && (ns != appNs && ns != "") {
return errors.Errorf("cannot cross namespace")
}
placement := v1alpha1.PlacementDecision{Cluster: cluster, Namespace: ns}
@@ -62,8 +62,10 @@ func GetPlacementsFromTopologyPolicies(ctx context.Context, cli client.Client, a
}
return nil
}
hasTopologyPolicy := false
for _, policy := range policies {
if policy.Type == v1alpha1.TopologyPolicyType {
hasTopologyPolicy = true
topologySpec := &v1alpha1.TopologyPolicySpec{}
if err := utils.StrictUnmarshal(policy.Properties.Raw, topologySpec); err != nil {
return nil, errors.Wrapf(err, "failed to parse topology policy %s", policy.Name)
@@ -92,5 +94,8 @@ func GetPlacementsFromTopologyPolicies(ctx context.Context, cli client.Client, a
}
}
}
if !hasTopologyPolicy {
placements = []v1alpha1.PlacementDecision{{Cluster: multicluster.ClusterLocalName}}
}
return placements, nil
}

161
pkg/policy/topology_test.go Normal file
View File

@@ -0,0 +1,161 @@
/*
Copyright 2022 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package policy
import (
"context"
"testing"
clusterv1alpha1 "github.com/oam-dev/cluster-gateway/pkg/apis/cluster/v1alpha1"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1alpha1"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
"github.com/oam-dev/kubevela/apis/types"
"github.com/oam-dev/kubevela/pkg/features"
"github.com/oam-dev/kubevela/pkg/multicluster"
"github.com/oam-dev/kubevela/pkg/utils/common"
)
func TestGetClusterLabelSelectorInTopology(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DeprecatedPolicySpec, true)()
multicluster.ClusterGatewaySecretNamespace = types.DefaultKubeVelaNS
cli := fake.NewClientBuilder().WithScheme(common.Scheme).WithObjects(&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "cluster-a",
Namespace: multicluster.ClusterGatewaySecretNamespace,
Labels: map[string]string{
clusterv1alpha1.LabelKeyClusterCredentialType: string(clusterv1alpha1.CredentialTypeX509Certificate),
"key": "value",
},
},
}, &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "cluster-b",
Namespace: multicluster.ClusterGatewaySecretNamespace,
Labels: map[string]string{
clusterv1alpha1.LabelKeyClusterCredentialType: string(clusterv1alpha1.CredentialTypeX509Certificate),
"key": "value",
},
},
}, &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "cluster-c",
Namespace: multicluster.ClusterGatewaySecretNamespace,
Labels: map[string]string{
clusterv1alpha1.LabelKeyClusterCredentialType: string(clusterv1alpha1.CredentialTypeX509Certificate),
"key": "none",
},
},
}).Build()
appNs := "test"
testCases := map[string]struct {
Inputs []v1beta1.AppPolicy
Outputs []v1alpha1.PlacementDecision
Error string
AllowCrossNamespace bool
}{
"invalid-topology-policy": {
Inputs: []v1beta1.AppPolicy{{
Name: "topology-policy",
Type: "topology",
Properties: &runtime.RawExtension{Raw: []byte(`{"cluster":"x"}`)},
}},
Error: "failed to parse topology policy",
},
"cluster-not-found": {
Inputs: []v1beta1.AppPolicy{{
Name: "topology-policy",
Type: "topology",
Properties: &runtime.RawExtension{Raw: []byte(`{"clusters":["cluster-x"]}`)},
}},
Error: "failed to get cluster",
},
"topology-by-clusters": {
Inputs: []v1beta1.AppPolicy{{
Name: "topology-policy",
Type: "topology",
Properties: &runtime.RawExtension{Raw: []byte(`{"clusters":["cluster-a"]}`)},
}},
Outputs: []v1alpha1.PlacementDecision{{Cluster: "cluster-a", Namespace: ""}},
},
"topology-by-cluster-selector-404": {
Inputs: []v1beta1.AppPolicy{{
Name: "topology-policy",
Type: "topology",
Properties: &runtime.RawExtension{Raw: []byte(`{"clusterSelector":{"key":"bad-value"}}`)},
}},
Error: "failed to find any cluster matches given labels",
},
"topology-by-cluster-selector": {
Inputs: []v1beta1.AppPolicy{{
Name: "topology-policy",
Type: "topology",
Properties: &runtime.RawExtension{Raw: []byte(`{"clusterSelector":{"key":"value"}}`)},
}},
Outputs: []v1alpha1.PlacementDecision{{Cluster: "cluster-a", Namespace: ""}, {Cluster: "cluster-b", Namespace: ""}},
},
"topology-by-cluster-label-selector": {
Inputs: []v1beta1.AppPolicy{{
Name: "topology-policy",
Type: "topology",
Properties: &runtime.RawExtension{Raw: []byte(`{"clusterLabelSelector":{"key":"value"}}`)},
}},
Outputs: []v1alpha1.PlacementDecision{{Cluster: "cluster-a", Namespace: ""}, {Cluster: "cluster-b", Namespace: ""}},
},
"topology-by-cluster-selector-and-namespace-invalid": {
Inputs: []v1beta1.AppPolicy{{
Name: "topology-policy",
Type: "topology",
Properties: &runtime.RawExtension{Raw: []byte(`{"clusterSelector":{"key":"value"},"namespace":"override"}`)},
}},
Error: "cannot cross namespace",
},
"topology-by-cluster-selector-and-namespace": {
Inputs: []v1beta1.AppPolicy{{
Name: "topology-policy",
Type: "topology",
Properties: &runtime.RawExtension{Raw: []byte(`{"clusterSelector":{"key":"value"},"namespace":"override"}`)},
}},
Outputs: []v1alpha1.PlacementDecision{{Cluster: "cluster-a", Namespace: "override"}, {Cluster: "cluster-b", Namespace: "override"}},
AllowCrossNamespace: true,
},
"no-topology-policy": {
Inputs: []v1beta1.AppPolicy{},
Outputs: []v1alpha1.PlacementDecision{{Cluster: "local", Namespace: ""}},
},
}
for name, tt := range testCases {
t.Run(name, func(t *testing.T) {
r := require.New(t)
pds, err := GetPlacementsFromTopologyPolicies(context.Background(), cli, appNs, tt.Inputs, tt.AllowCrossNamespace)
if tt.Error != "" {
r.NotNil(err)
r.Contains(err.Error(), tt.Error)
} else {
r.NoError(err)
r.Equal(tt.Outputs, pds)
}
})
}
}

View File

@@ -0,0 +1,34 @@
/*
Copyright 2022 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resourcekeeper
import "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
// ContainsResources check if resources all exist
func (h *resourceKeeper) ContainsResources(resources []*unstructured.Unstructured) bool {
for _, rsc := range resources {
if rsc == nil {
continue
}
if (h._currentRT != nil && h._currentRT.ContainsManagedResource(rsc)) ||
(h._rootRT != nil && h._rootRT.ContainsManagedResource(rsc)) {
continue
}
return false
}
return true
}

View File

@@ -41,6 +41,7 @@ type ResourceKeeper interface {
Delete(context.Context, []*unstructured.Unstructured, ...DeleteOption) error
GarbageCollect(context.Context, ...GCOption) (bool, []v1beta1.ManagedResource, error)
StateKeep(context.Context) error
ContainsResources([]*unstructured.Unstructured) bool
DispatchComponentRevision(context.Context, *v1.ControllerRevision) error
DeleteComponentRevision(context.Context, *v1.ControllerRevision) error

View File

@@ -25,6 +25,8 @@ import (
#Delete: kube.#Delete
#Deploy: multicluster.#Deploy
#ApplyApplication: #Steps & {
load: oam.#LoadComponetsInOrder @step(1)
components: #Steps & {
@@ -53,8 +55,6 @@ import (
#ApplyComponent: oam.#ApplyComponent
#ApplyComponents: oam.#ApplyComponents
#RenderComponent: oam.#RenderComponent
#ApplyComponentRemaining: #Steps & {
@@ -140,16 +140,12 @@ import (
#ApplyEnvBindApp: multicluster.#ApplyEnvBindApp
#HandleDeployPolicies: multicluster.#HandleDeployPolicies
#DeployCloudResource: terraform.#DeployCloudResource
#ShareCloudResource: terraform.#ShareCloudResource
#LoadPolicies: oam.#LoadPolicies
#LoadPoliciesInOrder: oam.#LoadPoliciesInOrder
#ListClusters: multicluster.#ListClusters
#MakePlacementDecisions: multicluster.#MakePlacementDecisions

View File

@@ -24,6 +24,7 @@
properties: {...}
}]
externalRevision?: string
dependsOn?: [...string]
}
#ReadPlacementDecisions: {
@@ -217,48 +218,9 @@
}
}
#ExpandTopology: {
#Deploy: {
#provider: "multicluster"
#do: "expand-topology"
inputs: {
policies: [...{...}]
}
outputs: {
decisions: [...#PlacementDecision]
}
}
#OverrideConfiguration: {
#provider: "multicluster"
#do: "override-configuration"
inputs: {
policies: [...{...}]
components: [...#Component]
}
outputs: {
components: [...#Component]
}
}
#HandleDeployPolicies: #Steps & {
inputs: {
policies: [...{...}]
components: [...#Component]
}
_inputs: inputs
expandTopology: #ExpandTopology & {
inputs: {
policies: _inputs.policies
}
} @step(1)
overrideConfiguration: #OverrideConfiguration & {
inputs: {
policies: _inputs.policies
components: _inputs.components
}
} @step(2)
outputs: {
decisions: expandTopology.outputs.decisions
components: overrideConfiguration.outputs.components
}
#do: "deploy"
policies: [...string]
parallelism: int
}

View File

@@ -10,22 +10,6 @@
...
}
#ApplyComponents: {
#provider: "oam"
#do: "components-apply"
parallelism: *5 | int
components: [string]: {
cluster: *"" | string
env: *"" | string
namespace: *"" | string
waitHealthy: *true | bool
value: {...}
patch?: {...}
...
}
...
}
#RenderComponent: {
#provider: "oam"
#do: "component-render"
@@ -52,14 +36,6 @@
...
}
#LoadPoliciesInOrder: {
#provider: "oam"
#do: "load-policies-in-order"
input?: [...string]
output?: [...{...}]
...
}
#LoadComponetsInOrder: {
#provider: "oam"
#do: "load-comps-in-order"

View File

@@ -0,0 +1,210 @@
/*
Copyright 2022 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package multicluster
import (
"context"
"fmt"
"strings"
"github.com/pkg/errors"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/oam-dev/kubevela/apis/core.oam.dev/common"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1alpha1"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
"github.com/oam-dev/kubevela/pkg/appfile"
pkgpolicy "github.com/oam-dev/kubevela/pkg/policy"
"github.com/oam-dev/kubevela/pkg/policy/envbinding"
"github.com/oam-dev/kubevela/pkg/resourcekeeper"
"github.com/oam-dev/kubevela/pkg/utils"
velaerrors "github.com/oam-dev/kubevela/pkg/utils/errors"
"github.com/oam-dev/kubevela/pkg/utils/parallel"
oamProvider "github.com/oam-dev/kubevela/pkg/workflow/providers/oam"
)
// DeployWorkflowStepExecutor executor to run deploy workflow step
type DeployWorkflowStepExecutor interface {
Deploy(ctx context.Context, policyNames []string, parallelism int) (healthy bool, reason string, err error)
}
// NewDeployWorkflowStepExecutor .
func NewDeployWorkflowStepExecutor(cli client.Client, af *appfile.Appfile, apply oamProvider.ComponentApply, healthCheck oamProvider.ComponentHealthCheck) DeployWorkflowStepExecutor {
return &deployWorkflowStepExecutor{
cli: cli,
af: af,
apply: apply,
healthCheck: healthCheck,
}
}
type deployWorkflowStepExecutor struct {
cli client.Client
af *appfile.Appfile
apply oamProvider.ComponentApply
healthCheck oamProvider.ComponentHealthCheck
}
// Deploy execute deploy workflow step
func (executor *deployWorkflowStepExecutor) Deploy(ctx context.Context, policyNames []string, parallelism int) (bool, string, error) {
policies, err := selectPolicies(executor.af.Policies, policyNames)
if err != nil {
return false, "", err
}
components, err := loadComponents(ctx, executor.cli, executor.af, executor.af.Components)
if err != nil {
return false, "", err
}
placements, err := pkgpolicy.GetPlacementsFromTopologyPolicies(ctx, executor.cli, executor.af.Namespace, policies, resourcekeeper.AllowCrossNamespaceResource)
if err != nil {
return false, "", err
}
components, err = overrideConfiguration(policies, components)
if err != nil {
return false, "", err
}
return applyComponents(executor.apply, executor.healthCheck, components, placements, parallelism)
}
func selectPolicies(policies []v1beta1.AppPolicy, policyNames []string) ([]v1beta1.AppPolicy, error) {
policyMap := make(map[string]v1beta1.AppPolicy)
for _, policy := range policies {
policyMap[policy.Name] = policy
}
var selectedPolicies []v1beta1.AppPolicy
for _, policyName := range policyNames {
if policy, found := policyMap[policyName]; found {
selectedPolicies = append(selectedPolicies, policy)
} else {
return nil, errors.Errorf("policy %s not found", policyName)
}
}
return selectedPolicies, nil
}
func loadComponents(ctx context.Context, cli client.Client, af *appfile.Appfile, components []common.ApplicationComponent) ([]common.ApplicationComponent, error) {
var loadedComponents []common.ApplicationComponent
for _, comp := range components {
loadedComp, err := af.LoadDynamicComponent(ctx, cli, comp.DeepCopy())
if err != nil {
return nil, err
}
loadedComponents = append(loadedComponents, *loadedComp)
}
return loadedComponents, nil
}
func overrideConfiguration(policies []v1beta1.AppPolicy, components []common.ApplicationComponent) ([]common.ApplicationComponent, error) {
var err error
for _, policy := range policies {
if policy.Type == v1alpha1.OverridePolicyType {
overrideSpec := &v1alpha1.OverridePolicySpec{}
if err := utils.StrictUnmarshal(policy.Properties.Raw, overrideSpec); err != nil {
return nil, errors.Wrapf(err, "failed to parse override policy %s", policy.Name)
}
components, err = envbinding.PatchComponents(components, overrideSpec.Components, overrideSpec.Selector)
if err != nil {
return nil, errors.Wrapf(err, "failed to apply override policy %s", policy.Name)
}
}
}
return components, nil
}
type applyTask struct {
component common.ApplicationComponent
placement v1alpha1.PlacementDecision
}
func (t *applyTask) key() string {
return fmt.Sprintf("%s/%s/%s", t.placement.Cluster, t.placement.Namespace, t.component.Name)
}
func (t *applyTask) dependents() []string {
var dependents []string
for _, dependent := range t.component.DependsOn {
dependents = append(dependents, fmt.Sprintf("%s/%s/%s", t.placement.Cluster, t.placement.Namespace, dependent))
}
return dependents
}
type applyTaskResult struct {
healthy bool
err error
}
func applyComponents(apply oamProvider.ComponentApply, healthCheck oamProvider.ComponentHealthCheck, components []common.ApplicationComponent, placements []v1alpha1.PlacementDecision, parallelism int) (bool, string, error) {
var tasks []*applyTask
for _, comp := range components {
for _, pl := range placements {
tasks = append(tasks, &applyTask{component: comp, placement: pl})
}
}
healthCheckResults := parallel.Run(func(task *applyTask) *applyTaskResult {
healthy, err := healthCheck(task.component, nil, task.placement.Cluster, task.placement.Namespace, "")
return &applyTaskResult{healthy: healthy, err: err}
}, tasks, parallelism).([]*applyTaskResult)
taskHealthyMap := map[string]bool{}
for i, res := range healthCheckResults {
taskHealthyMap[tasks[i].key()] = res.healthy
}
var pendingTasks []*applyTask
var todoTasks []*applyTask
for _, task := range tasks {
if healthy, ok := taskHealthyMap[task.key()]; healthy && ok {
continue
}
pending := false
for _, dep := range task.dependents() {
if healthy, ok := taskHealthyMap[dep]; ok && !healthy {
pending = true
break
}
}
if pending {
pendingTasks = append(pendingTasks, task)
} else {
todoTasks = append(todoTasks, task)
}
}
var results []*applyTaskResult
if len(todoTasks) > 0 {
results = parallel.Run(func(task *applyTask) *applyTaskResult {
_, _, healthy, err := apply(task.component, nil, task.placement.Cluster, task.placement.Namespace, "")
return &applyTaskResult{healthy: healthy, err: err}
}, todoTasks, parallelism).([]*applyTaskResult)
}
var errs []error
var allHealthy = true
var reasons []string
for i, res := range results {
if res.err != nil {
errs = append(errs, res.err)
}
if !res.healthy {
allHealthy = false
reasons = append(reasons, fmt.Sprintf("%s is not healthy", todoTasks[i].key()))
}
}
for _, t := range pendingTasks {
reasons = append(reasons, fmt.Sprintf("%s is waiting dependents", t.key()))
}
return allHealthy && len(pendingTasks) == 0, strings.Join(reasons, ","), velaerrors.AggregateErrors(errs)
}

View File

@@ -0,0 +1,138 @@
/*
Copyright 2022 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package multicluster
import (
"fmt"
"math/rand"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
apicommon "github.com/oam-dev/kubevela/apis/core.oam.dev/common"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1alpha1"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
"github.com/oam-dev/kubevela/pkg/cue/model/value"
)
func TestOverrideConfiguration(t *testing.T) {
testCases := map[string]struct {
Policies []v1beta1.AppPolicy
Components []apicommon.ApplicationComponent
Outputs []apicommon.ApplicationComponent
Error string
}{
"invalid-policies": {
Policies: []v1beta1.AppPolicy{{
Name: "override-policy",
Type: "override",
Properties: &runtime.RawExtension{Raw: []byte(`bad value`)},
}},
Error: "failed to parse override policy",
},
"normal": {
Policies: []v1beta1.AppPolicy{{
Name: "override-policy",
Type: "override",
Properties: &runtime.RawExtension{Raw: []byte(`{"components":[{"name":"comp","properties":{"x":5}}]}`)},
}},
Components: []apicommon.ApplicationComponent{{
Name: "comp",
Traits: []apicommon.ApplicationTrait{},
Properties: &runtime.RawExtension{Raw: []byte(`{"x":1}`)},
}},
Outputs: []apicommon.ApplicationComponent{{
Name: "comp",
Traits: []apicommon.ApplicationTrait{},
Properties: &runtime.RawExtension{Raw: []byte(`{"x":5}`)},
}},
},
}
for name, tt := range testCases {
t.Run(name, func(t *testing.T) {
r := require.New(t)
comps, err := overrideConfiguration(tt.Policies, tt.Components)
if tt.Error != "" {
r.NotNil(err)
r.Contains(err.Error(), tt.Error)
} else {
r.NoError(err)
r.Equal(tt.Outputs, comps)
}
})
}
}
func TestApplyComponents(t *testing.T) {
r := require.New(t)
const n, m = 50, 5
var components []apicommon.ApplicationComponent
var placements []v1alpha1.PlacementDecision
for i := 0; i < n*3; i++ {
comp := apicommon.ApplicationComponent{Name: fmt.Sprintf("comp-%d", i)}
if i%3 != 0 {
comp.DependsOn = append(comp.DependsOn, fmt.Sprintf("comp-%d", i-1))
}
if i%3 == 2 {
comp.DependsOn = append(comp.DependsOn, fmt.Sprintf("comp-%d", i-1))
}
components = append(components, comp)
}
for i := 0; i < m; i++ {
placements = append(placements, v1alpha1.PlacementDecision{Cluster: fmt.Sprintf("cluster-%d", i)})
}
applyMap := &sync.Map{}
apply := func(comp apicommon.ApplicationComponent, patcher *value.Value, clusterName string, overrideNamespace string, env string) (*unstructured.Unstructured, []*unstructured.Unstructured, bool, error) {
time.Sleep(time.Duration(rand.Intn(200)+25) * time.Millisecond)
applyMap.Store(fmt.Sprintf("%s/%s", clusterName, comp.Name), true)
return nil, nil, true, nil
}
healthCheck := func(comp apicommon.ApplicationComponent, patcher *value.Value, clusterName string, overrideNamespace string, env string) (bool, error) {
_, found := applyMap.Load(fmt.Sprintf("%s/%s", clusterName, comp.Name))
return found, nil
}
parallelism := 10
countMap := func() int {
cnt := 0
applyMap.Range(func(key, value interface{}) bool {
cnt++
return true
})
return cnt
}
healthy, _, err := applyComponents(apply, healthCheck, components, placements, parallelism)
r.NoError(err)
r.False(healthy)
r.Equal(n*m, countMap())
healthy, _, err = applyComponents(apply, healthCheck, components, placements, parallelism)
r.NoError(err)
r.False(healthy)
r.Equal(2*n*m, countMap())
healthy, _, err = applyComponents(apply, healthCheck, components, placements, parallelism)
r.NoError(err)
r.True(healthy)
r.Equal(3*n*m, countMap())
}

View File

@@ -22,17 +22,15 @@ import (
"github.com/pkg/errors"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/oam-dev/kubevela/apis/core.oam.dev/common"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1alpha1"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
"github.com/oam-dev/kubevela/pkg/appfile"
"github.com/oam-dev/kubevela/pkg/cue/model/value"
"github.com/oam-dev/kubevela/pkg/multicluster"
pkgpolicy "github.com/oam-dev/kubevela/pkg/policy"
"github.com/oam-dev/kubevela/pkg/policy/envbinding"
"github.com/oam-dev/kubevela/pkg/resourcekeeper"
"github.com/oam-dev/kubevela/pkg/utils"
wfContext "github.com/oam-dev/kubevela/pkg/workflow/context"
"github.com/oam-dev/kubevela/pkg/workflow/providers"
oamProvider "github.com/oam-dev/kubevela/pkg/workflow/providers/oam"
wfTypes "github.com/oam-dev/kubevela/pkg/workflow/types"
)
@@ -43,7 +41,10 @@ const (
type provider struct {
client.Client
app *v1beta1.Application
app *v1beta1.Application
af *appfile.Appfile
apply oamProvider.ComponentApply
healthCheck oamProvider.ComponentHealthCheck
}
func (p *provider) ReadPlacementDecisions(ctx wfContext.Context, v *value.Value, act wfTypes.Action) error {
@@ -163,63 +164,37 @@ func (p *provider) ListClusters(ctx wfContext.Context, v *value.Value, act wfTyp
return v.FillObject(clusters, "outputs", "clusters")
}
func (p *provider) ExpandTopology(ctx wfContext.Context, v *value.Value, act wfTypes.Action) error {
policiesRaw, err := v.LookupValue("inputs", "policies")
func (p *provider) Deploy(ctx wfContext.Context, v *value.Value, act wfTypes.Action) error {
policyNames, err := v.GetStringSlice("policies")
if err != nil {
return err
}
policies := &[]v1beta1.AppPolicy{}
if err = policiesRaw.UnmarshalTo(policies); err != nil {
return errors.Wrapf(err, "failed to parse policies")
}
placements, err := pkgpolicy.GetPlacementsFromTopologyPolicies(context.Background(), p, p.app, *policies, resourcekeeper.AllowCrossNamespaceResource)
parallelism, err := v.GetInt64("parallelism")
if err != nil {
return err
}
return v.FillObject(placements, "outputs", "decisions")
}
func (p *provider) OverrideConfiguration(ctx wfContext.Context, v *value.Value, act wfTypes.Action) error {
policiesRaw, err := v.LookupValue("inputs", "policies")
if parallelism <= 0 {
return errors.Errorf("parallelism cannot be smaller than 1")
}
executor := NewDeployWorkflowStepExecutor(p.Client, p.af, p.apply, p.healthCheck)
healthy, reason, err := executor.Deploy(context.Background(), policyNames, int(parallelism))
if err != nil {
return err
}
policies := &[]*v1beta1.AppPolicy{}
if err = policiesRaw.UnmarshalTo(policies); err != nil {
return errors.Wrapf(err, "failed to parse policies")
if !healthy {
act.Wait(reason)
}
componentsRaw, err := v.LookupValue("inputs", "components")
if err != nil {
return err
}
components := make([]common.ApplicationComponent, 0)
if err = componentsRaw.UnmarshalTo(&components); err != nil {
return errors.Wrapf(err, "failed to parse components")
}
for _, policy := range *policies {
if policy.Type == v1alpha1.OverridePolicyType {
overrideSpec := &v1alpha1.OverridePolicySpec{}
if err = utils.StrictUnmarshal(policy.Properties.Raw, overrideSpec); err != nil {
return errors.Wrapf(err, "failed to parse override policy %s", policy.Name)
}
components, err = envbinding.PatchComponents(components, overrideSpec.Components, overrideSpec.Selector)
if err != nil {
return errors.Wrapf(err, "failed to apply override policy %s", policy.Name)
}
}
}
return v.FillObject(components, "outputs", "components")
return nil
}
// Install register handlers to provider discover.
func Install(p providers.Providers, c client.Client, app *v1beta1.Application) {
prd := &provider{Client: c, app: app}
func Install(p providers.Providers, c client.Client, app *v1beta1.Application, af *appfile.Appfile, apply oamProvider.ComponentApply, healthCheck oamProvider.ComponentHealthCheck) {
prd := &provider{Client: c, app: app, af: af, apply: apply, healthCheck: healthCheck}
p.Register(ProviderName, map[string]providers.Handler{
"read-placement-decisions": prd.ReadPlacementDecisions,
"make-placement-decisions": prd.MakePlacementDecisions,
"patch-application": prd.PatchApplication,
"list-clusters": prd.ListClusters,
"expand-topology": prd.ExpandTopology,
"override-configuration": prd.OverrideConfiguration,
"deploy": prd.Deploy,
})
}

View File

@@ -26,8 +26,6 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
apicommon "github.com/oam-dev/kubevela/apis/core.oam.dev/common"
@@ -35,9 +33,7 @@ import (
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
"github.com/oam-dev/kubevela/apis/types"
"github.com/oam-dev/kubevela/pkg/cue/model/value"
"github.com/oam-dev/kubevela/pkg/features"
"github.com/oam-dev/kubevela/pkg/multicluster"
"github.com/oam-dev/kubevela/pkg/resourcekeeper"
"github.com/oam-dev/kubevela/pkg/utils/common"
"github.com/oam-dev/kubevela/pkg/workflow/providers/mock"
)
@@ -513,175 +509,3 @@ func TestListClusters(t *testing.T) {
r.NoError(outputs.UnmarshalTo(&obj))
r.Equal(clusterNames, obj.Clusters)
}
func TestExpandTopology(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DeprecatedPolicySpec, true)()
multicluster.ClusterGatewaySecretNamespace = types.DefaultKubeVelaNS
cli := fake.NewClientBuilder().WithScheme(common.Scheme).WithObjects(&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "cluster-a",
Namespace: multicluster.ClusterGatewaySecretNamespace,
Labels: map[string]string{
clusterv1alpha1.LabelKeyClusterCredentialType: string(clusterv1alpha1.CredentialTypeX509Certificate),
"key": "value",
},
},
}, &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "cluster-b",
Namespace: multicluster.ClusterGatewaySecretNamespace,
Labels: map[string]string{
clusterv1alpha1.LabelKeyClusterCredentialType: string(clusterv1alpha1.CredentialTypeX509Certificate),
"key": "value",
},
},
}, &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "cluster-c",
Namespace: multicluster.ClusterGatewaySecretNamespace,
Labels: map[string]string{
clusterv1alpha1.LabelKeyClusterCredentialType: string(clusterv1alpha1.CredentialTypeX509Certificate),
"key": "none",
},
},
}).Build()
app := &v1beta1.Application{
ObjectMeta: metav1.ObjectMeta{Name: "app", Namespace: "test"},
}
p := &provider{
Client: cli,
app: app,
}
testCases := map[string]struct {
Input string
Outputs []v1alpha1.PlacementDecision
Error string
DisableCrossNamespace bool
}{
"policies-404": {
Input: "{inputs:{}}",
Error: "var(path=inputs.policies) not exist",
},
"invalid-policies": {
Input: `{inputs:{policies:"bad value"}}`,
Error: "failed to parse policies",
},
"invalid-topology-policy": {
Input: `{inputs:{policies:[{name:"topology-policy",type:"topology",properties:{cluster:"x"}}]}}`,
Error: "failed to parse topology policy",
},
"cluster-not-found": {
Input: `{inputs:{policies:[{name:"topology-policy",type:"topology",properties:{clusters:["cluster-x"]}}]}}`,
Error: "failed to get cluster",
},
"topology-by-clusters": {
Input: `{inputs:{policies:[{name:"topology-policy",type:"topology",properties:{clusters:["cluster-a"]}}]}}`,
Outputs: []v1alpha1.PlacementDecision{{Cluster: "cluster-a", Namespace: ""}},
},
"topology-by-cluster-selector-404": {
Input: `{inputs:{policies:[{name:"topology-policy",type:"topology",properties:{clusterSelector:{"key":"bad-value"}}}]}}`,
Error: "failed to find any cluster matches given labels",
},
"topology-by-cluster-selector": {
Input: `{inputs:{policies:[{name:"topology-policy",type:"topology",properties:{clusterSelector:{"key":"value"}}}]}}`,
Outputs: []v1alpha1.PlacementDecision{{Cluster: "cluster-a", Namespace: ""}, {Cluster: "cluster-b", Namespace: ""}},
},
"topology-by-cluster-label-selector": {
Input: `{inputs:{policies:[{name:"topology-policy",type:"topology",properties:{clusterLabelSelector:{"key":"value"}}}]}}`,
Outputs: []v1alpha1.PlacementDecision{{Cluster: "cluster-a", Namespace: ""}, {Cluster: "cluster-b", Namespace: ""}},
},
"topology-by-cluster-selector-and-namespace-invalid": {
Input: `{inputs:{policies:[{name:"topology-policy",type:"topology",properties:{clusterSelector:{"key":"value"},namespace:"override"}}]}}`,
Error: "cannot cross namespace",
DisableCrossNamespace: true,
},
"topology-by-cluster-selector-and-namespace": {
Input: `{inputs:{policies:[{name:"topology-policy",type:"topology",properties:{clusterSelector:{"key":"value"},namespace:"override"}}]}}`,
Outputs: []v1alpha1.PlacementDecision{{Cluster: "cluster-a", Namespace: "override"}, {Cluster: "cluster-b", Namespace: "override"}},
},
}
for name, tt := range testCases {
t.Run(name, func(t *testing.T) {
r := require.New(t)
resourcekeeper.AllowCrossNamespaceResource = !tt.DisableCrossNamespace
v, err := value.NewValue("", nil, "")
r.NoError(err)
r.NoError(v.FillRaw(tt.Input))
err = p.ExpandTopology(nil, v, &mock.Action{})
if tt.Error != "" {
r.NotNil(err)
r.Contains(err.Error(), tt.Error)
} else {
r.NoError(err)
outputs, err := v.LookupValue("outputs", "decisions")
r.NoError(err)
pds := &[]v1alpha1.PlacementDecision{}
r.NoError(outputs.UnmarshalTo(pds))
r.Equal(tt.Outputs, *pds)
}
})
}
}
func TestOverrideConfiguration(t *testing.T) {
cli := fake.NewClientBuilder().WithScheme(common.Scheme).Build()
app := &v1beta1.Application{}
p := &provider{
Client: cli,
app: app,
}
testCases := map[string]struct {
Input string
Outputs []apicommon.ApplicationComponent
Error string
}{
"policies-404": {
Input: "{inputs:{}}",
Error: "var(path=inputs.policies) not exist",
},
"invalid-policies": {
Input: `{inputs:{policies:"bad value"}}`,
Error: "failed to parse policies",
},
"components-404": {
Input: `{inputs:{policies:[{name:"override-policy",type:"override",properties:{}}]}}`,
Error: "var(path=inputs.components) not exist",
},
"invalid-components": {
Input: `{inputs:{policies:[{name:"override-policy",type:"override",properties:{}}],components:[{name:{}}]}}`,
Error: "failed to parse components",
},
"invalid-override-policy": {
Input: `{inputs:{policies:[{name:"override-policy",type:"override",properties:{bad:"value"}}],components:[{}]}}`,
Error: "failed to parse override policy",
},
"normal": {
Input: `{inputs:{policies:[{name:"override-policy",type:"override",properties:{components:[{name:"comp",properties:{x:5}}]}}],components:[{name:"comp",properties:{x:1}}]}}`,
Outputs: []apicommon.ApplicationComponent{{
Name: "comp",
Traits: []apicommon.ApplicationTrait{},
Properties: &runtime.RawExtension{Raw: []byte(`{"x":5}`)},
}},
},
}
for name, tt := range testCases {
t.Run(name, func(t *testing.T) {
r := require.New(t)
v, err := value.NewValue("", nil, "")
r.NoError(err)
r.NoError(v.FillRaw(tt.Input))
err = p.OverrideConfiguration(nil, v, &mock.Action{})
if tt.Error != "" {
r.NotNil(err)
r.Contains(err.Error(), tt.Error)
} else {
r.NoError(err)
outputs, err := v.LookupValue("outputs", "components")
r.NoError(err)
comps := &[]apicommon.ApplicationComponent{}
r.NoError(outputs.UnmarshalTo(comps))
r.Equal(tt.Outputs, *comps)
}
})
}
}

View File

@@ -24,19 +24,14 @@ import (
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/oam-dev/kubevela/apis/core.oam.dev/common"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1alpha1"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
"github.com/oam-dev/kubevela/pkg/appfile"
"github.com/oam-dev/kubevela/pkg/component"
"github.com/oam-dev/kubevela/pkg/cue/model/sets"
"github.com/oam-dev/kubevela/pkg/cue/model/value"
"github.com/oam-dev/kubevela/pkg/oam"
velaerrors "github.com/oam-dev/kubevela/pkg/utils/errors"
"github.com/oam-dev/kubevela/pkg/utils/parallel"
wfContext "github.com/oam-dev/kubevela/pkg/workflow/context"
"github.com/oam-dev/kubevela/pkg/workflow/providers"
wfTypes "github.com/oam-dev/kubevela/pkg/workflow/types"
@@ -53,6 +48,9 @@ type ComponentApply func(comp common.ApplicationComponent, patcher *value.Value,
// ComponentRender render oam component.
type ComponentRender func(comp common.ApplicationComponent, patcher *value.Value, clusterName string, overrideNamespace string, env string) (*unstructured.Unstructured, []*unstructured.Unstructured, error)
// ComponentHealthCheck health check oam component.
type ComponentHealthCheck func(comp common.ApplicationComponent, patcher *value.Value, clusterName string, overrideNamespace string, env string) (bool, error)
type provider struct {
render ComponentRender
apply ComponentApply
@@ -137,39 +135,6 @@ func (p *provider) ApplyComponent(ctx wfContext.Context, v *value.Value, act wfT
return p.applyComponent(ctx, v, act, nil)
}
// ApplyComponents apply components in parallel.
func (p *provider) ApplyComponents(ctx wfContext.Context, v *value.Value, act wfTypes.Action) error {
components, err := v.LookupValue("components")
if err != nil {
return err
}
parallelism, err := v.GetInt64("parallelism")
if err != nil {
return err
}
if parallelism <= 0 {
return errors.Errorf("parallelism cannot be smaller than 1")
}
// prepare parallel execution args
mu := &sync.Mutex{}
var parInputs [][]interface{}
if err = components.StepByFields(func(name string, in *value.Value) (bool, error) {
parInputs = append(parInputs, []interface{}{name, ctx, in, act, mu})
return false, nil
}); err != nil {
return errors.Wrapf(err, "failed to looping over components")
}
// parallel execution
outputs := parallel.Run(func(name string, ctx wfContext.Context, v *value.Value, act wfTypes.Action, mu *sync.Mutex) error {
if err := p.applyComponent(ctx, v, act, mu); err != nil {
return errors.Wrapf(err, "failed to apply component %s", name)
}
return nil
}, parInputs, int(parallelism))
// aggregate errors
return velaerrors.AggregateErrors(outputs.([]error))
}
func lookUpValues(v *value.Value, mu *sync.Mutex) (*common.ApplicationComponent, *value.Value, string, string, string, error) {
if mu != nil {
mu.Lock()
@@ -203,35 +168,6 @@ func lookUpValues(v *value.Value, mu *sync.Mutex) (*common.ApplicationComponent,
return comp, patcher, clusterName, overrideNamespace, env, nil
}
func (p *provider) loadDynamicComponent(comp *common.ApplicationComponent) (*common.ApplicationComponent, error) {
if comp.Type != v1alpha1.RefObjectsComponentType {
return comp, nil
}
_comp := comp.DeepCopy()
spec := &v1alpha1.RefObjectsComponentSpec{}
if err := json.Unmarshal(comp.Properties.Raw, spec); err != nil {
return nil, errors.Wrapf(err, "invalid ref-object component properties")
}
var uns []*unstructured.Unstructured
for _, selector := range spec.Objects {
objs, err := component.SelectRefObjectsForDispatch(context.Background(), component.ReferredObjectsDelegatingClient(p.cli, p.af.ReferredObjects), p.af.Namespace, comp.Name, selector)
if err != nil {
return nil, errors.Wrapf(err, "failed to select objects from referred objects in revision storage")
}
uns = component.AppendUnstructuredObjects(uns, objs...)
}
refObjs, err := component.ConvertUnstructuredsToReferredObjects(uns)
if err != nil {
return nil, errors.Wrapf(err, "failed to marshal referred object")
}
bs, err := json.Marshal(&common.ReferredObjectList{Objects: refObjs})
if err != nil {
return nil, errors.Wrapf(err, "failed to marshal loaded ref-objects")
}
_comp.Properties = &runtime.RawExtension{Raw: bs}
return _comp, nil
}
// LoadComponent load component describe info in application.
func (p *provider) LoadComponent(ctx wfContext.Context, v *value.Value, act wfTypes.Action) error {
app := &v1beta1.Application{}
@@ -249,7 +185,7 @@ func (p *provider) LoadComponent(ctx wfContext.Context, v *value.Value, act wfTy
}
}
for _, _comp := range app.Spec.Components {
comp, err := p.loadDynamicComponent(_comp.DeepCopy())
comp, err := p.af.LoadDynamicComponent(context.Background(), p.cli, _comp.DeepCopy())
if err != nil {
return err
}
@@ -288,7 +224,7 @@ func (p *provider) LoadComponentInOrder(ctx wfContext.Context, v *value.Value, a
}
comps := make([]common.ApplicationComponent, len(app.Spec.Components))
for idx, _comp := range app.Spec.Components {
comp, err := p.loadDynamicComponent(_comp.DeepCopy())
comp, err := p.af.LoadDynamicComponent(context.Background(), p.cli, _comp.DeepCopy())
if err != nil {
return err
}
@@ -312,31 +248,6 @@ func (p *provider) LoadPolicies(ctx wfContext.Context, v *value.Value, act wfTyp
return nil
}
func (p *provider) LoadPoliciesInOrder(ctx wfContext.Context, v *value.Value, act wfTypes.Action) error {
policyMap := map[string]v1beta1.AppPolicy{}
var specifiedPolicyNames []string
specifiedPolicyNamesRaw, err := v.LookupValue("input")
if err != nil || specifiedPolicyNamesRaw == nil {
for _, policy := range p.app.Spec.Policies {
specifiedPolicyNames = append(specifiedPolicyNames, policy.Name)
}
} else if err = specifiedPolicyNamesRaw.UnmarshalTo(&specifiedPolicyNames); err != nil {
return errors.Wrapf(err, "failed to parse specified policy names")
}
for _, policy := range p.af.Policies {
policyMap[policy.Name] = policy
}
var specifiedPolicies []v1beta1.AppPolicy
for _, policyName := range specifiedPolicyNames {
if policy, found := policyMap[policyName]; found {
specifiedPolicies = append(specifiedPolicies, policy)
} else {
return errors.Errorf("policy %s not found", policyName)
}
}
return v.FillObject(specifiedPolicies, "output")
}
// Install register handlers to provider discover.
func Install(p providers.Providers, app *v1beta1.Application, af *appfile.Appfile, cli client.Client, apply ComponentApply, render ComponentRender) {
prd := &provider{
@@ -347,12 +258,10 @@ func Install(p providers.Providers, app *v1beta1.Application, af *appfile.Appfil
cli: cli,
}
p.Register(ProviderName, map[string]providers.Handler{
"component-render": prd.RenderComponent,
"component-apply": prd.ApplyComponent,
"components-apply": prd.ApplyComponents,
"load": prd.LoadComponent,
"load-policies": prd.LoadPolicies,
"load-policies-in-order": prd.LoadPoliciesInOrder,
"load-comps-in-order": prd.LoadComponentInOrder,
"component-render": prd.RenderComponent,
"component-apply": prd.ApplyComponent,
"load": prd.LoadComponent,
"load-policies": prd.LoadPolicies,
"load-comps-in-order": prd.LoadComponentInOrder,
})
}

View File

@@ -17,21 +17,17 @@ limitations under the License.
package oam
import (
"fmt"
"strings"
"testing"
"time"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/rand"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
"github.com/oam-dev/kubevela/pkg/appfile"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
"github.com/oam-dev/kubevela/apis/core.oam.dev/common"
"github.com/oam-dev/kubevela/pkg/cue/model/value"
"github.com/oam-dev/kubevela/pkg/workflow/providers/mock"
@@ -141,65 +137,6 @@ outputs: {
`)
}
func TestApplyComponents(t *testing.T) {
r := require.New(t)
testcases := map[string]struct {
Input string
Error string
}{
"normal": {
Input: `{components:{first:{value:{name:"first"}},second:{value:{name:"second"}}},parallelism:5}`,
},
"no-components": {
Input: `{}`,
Error: "var(path=components) not exist",
},
"no-parallelism": {
Input: `{components:{first:{value:{name:"first"}},second:{value:{name:"second"}}}}`,
Error: "var(path=parallelism) not exist",
},
"invalid-parallelism": {
Input: `{components:{first:{value:{name:"first"}},second:{value:{name:"second"}}},parallelism:-1}`,
Error: "parallelism cannot be smaller than 1",
},
"bad-component": {
Input: `{components:{first:{value:{name:"error-first"}},second:{value:{name:"error-second"}},third:{value:{name:"third"}}},parallelism:5}`,
Error: "failed to apply component",
},
}
p := &provider{apply: simpleComponentApplyForTest}
for name, tt := range testcases {
t.Run(name, func(t *testing.T) {
act := &mock.Action{}
v, err := value.NewValue("", nil, "")
r.NoError(err)
r.NoError(v.FillRaw(tt.Input))
err = p.ApplyComponents(nil, v, act)
if tt.Error != "" {
r.NotNil(err)
r.Contains(err.Error(), tt.Error)
} else {
r.NoError(err)
}
})
}
}
func TestApplyComponentsHard(t *testing.T) {
r := require.New(t)
input := `comp0:{value:{name:"comp0"}}`
for i := 1; i < 1000; i++ {
input += fmt.Sprintf(`,comp%d:{value:{name:"comp%d"}}`, i, i)
}
input = fmt.Sprintf(`{components:{%s},parallelism:50}`, input)
p := &provider{apply: delayedComponentApplyForTest}
act := &mock.Action{}
v, err := value.NewValue("", nil, "")
r.NoError(err)
r.NoError(v.FillRaw(input))
r.NoError(p.ApplyComponents(nil, v, act))
}
func TestLoadComponent(t *testing.T) {
r := require.New(t)
p := &provider{
@@ -299,57 +236,6 @@ func TestLoadComponentInOrder(t *testing.T) {
`)
}
func TestLoadPolicyInOrder(t *testing.T) {
r := require.New(t)
p := &provider{af: &appfile.Appfile{
Policies: []v1beta1.AppPolicy{{Name: "policy-1"}, {Name: "policy-2"}, {Name: "policy-3"}},
}, app: &v1beta1.Application{
Spec: v1beta1.ApplicationSpec{Policies: []v1beta1.AppPolicy{{Name: "policy-1"}, {Name: "policy-2"}}},
}}
testcases := map[string]struct {
Input string
Output []v1beta1.AppPolicy
Error string
}{
"normal": {
Input: `{input:["policy-3","policy-1"]}`,
Output: []v1beta1.AppPolicy{{Name: "policy-3"}, {Name: "policy-1"}},
},
"empty-input": {
Input: `{}`,
Output: []v1beta1.AppPolicy{{Name: "policy-1"}, {Name: "policy-2"}},
},
"invalid-input": {
Input: `{input:{"name":"policy"}}`,
Error: "failed to parse specified policy name",
},
"policy-not-found": {
Input: `{input:["policy-4","policy-1"]}`,
Error: "not found",
},
}
for name, tt := range testcases {
t.Run(name, func(t *testing.T) {
act := &mock.Action{}
v, err := value.NewValue("", nil, "")
r.NoError(err)
r.NoError(v.FillRaw(tt.Input))
err = p.LoadPoliciesInOrder(nil, v, act)
if tt.Error != "" {
r.NotNil(err)
r.Contains(err.Error(), tt.Error)
} else {
r.NoError(err)
v, err = v.LookupValue("output")
r.NoError(err)
var outputPolicies []v1beta1.AppPolicy
r.NoError(v.UnmarshalTo(&outputPolicies))
r.Equal(tt.Output, outputPolicies)
}
})
}
}
var testHealthy bool
func simpleComponentApplyForTest(comp common.ApplicationComponent, _ *value.Value, _ string, _ string, _ string) (*unstructured.Unstructured, []*unstructured.Unstructured, bool, error) {
@@ -387,8 +273,3 @@ func simpleComponentApplyForTest(comp common.ApplicationComponent, _ *value.Valu
traits := []*unstructured.Unstructured{trait}
return workload, traits, testHealthy, nil
}
func delayedComponentApplyForTest(comp common.ApplicationComponent, v *value.Value, x string, y string, z string) (*unstructured.Unstructured, []*unstructured.Unstructured, bool, error) {
time.Sleep(time.Duration(rand.Intn(200)+25) * time.Millisecond)
return simpleComponentApplyForTest(comp, v, x, y, z)
}

View File

@@ -88,6 +88,8 @@ const (
ContextPrefixFailedTimes = "failed_times"
// ContextPrefixBackoffTimes is the prefix that refer to the backoff times in workflow context config map.
ContextPrefixBackoffTimes = "backoff_times"
// ContextPrefixBackoffReason is the prefix that refer to the current backoff reason in workflow context config map
ContextPrefixBackoffReason = "backoff_reason"
// ContextKeyLastExecuteTime is the key that refer to the last execute time in workflow context config map.
ContextKeyLastExecuteTime = "last_execute_time"
// ContextKeyNextExecuteTime is the key that refer to the next execute time in workflow context config map.

View File

@@ -531,6 +531,7 @@ func (e *engine) steps(taskRunners []wfTypes.TaskRunner) error {
e.waiting = e.waiting || operation.Waiting
if status.Phase == common.WorkflowStepPhaseSucceeded || (status.Phase == common.WorkflowStepPhaseRunning && status.Type == wfTypes.WorkflowStepTypeSuspend) {
wfCtx.DeleteValueInMemory(wfTypes.ContextPrefixBackoffTimes, status.ID)
wfCtx.DeleteValueInMemory(wfTypes.ContextPrefixBackoffReason, status.ID)
if err := wfCtx.Commit(); err != nil {
return errors.WithMessage(err, "commit workflow context")
}
@@ -542,6 +543,10 @@ func (e *engine) steps(taskRunners []wfTypes.TaskRunner) error {
continue
}
if val, exists := wfCtx.GetValueInMemory(wfTypes.ContextPrefixBackoffReason, status.ID); !exists || val != status.Message {
wfCtx.SetValueInMemory(status.Message, wfTypes.ContextPrefixBackoffReason, status.ID)
wfCtx.DeleteValueInMemory(wfTypes.ContextPrefixBackoffTimes, status.ID)
}
wfCtx.IncreaseCountValueInMemory(wfTypes.ContextPrefixBackoffTimes, status.ID)
if err := wfCtx.Commit(); err != nil {
return errors.WithMessage(err, "commit workflow context")

View File

@@ -403,7 +403,7 @@ func printApplicationTree(c common.Args, cmd *cobra.Command, appName string, app
var placements []v1alpha1.PlacementDecision
af, err := pkgappfile.NewApplicationParser(cli, dm, pd).GenerateAppFile(context.Background(), app)
if err == nil {
placements, _ = policy.GetPlacementsFromTopologyPolicies(context.Background(), cli, app, af.Policies, true)
placements, _ = policy.GetPlacementsFromTopologyPolicies(context.Background(), cli, app.GetNamespace(), af.Policies, true)
}
format, _ := cmd.Flags().GetString("detail-format")
var maxWidth *int

View File

@@ -9,41 +9,9 @@ import (
description: "Deploy components with policies."
}
template: {
deploy: op.#Steps & {
load: op.#Load @step(1)
_components: [ for k, v in load.value {v}]
loadPoliciesInOrder: op.#LoadPoliciesInOrder & {
if parameter.policies != _|_ {
input: parameter.policies
}
} @step(2)
_policies: loadPoliciesInOrder.output
handleDeployPolicies: op.#HandleDeployPolicies & {
inputs: {
components: _components
policies: _policies
}
} @step(3)
_decisions: handleDeployPolicies.outputs.decisions
_patchedComponents: handleDeployPolicies.outputs.components
deploy: op.#ApplyComponents & {
parallelism: parameter.parallelism
components: {
for decision in _decisions {
for key, comp in _patchedComponents {
"\(decision.cluster)-\(decision.namespace)-\(key)": {
value: comp
if decision.cluster != _|_ {
cluster: decision.cluster
}
if decision.namespace != _|_ {
namespace: decision.namespace
}
}
}
}
}
} @step(4)
deploy: op.#Deploy & {
policies: parameter.policies
parallelism: parameter.parallelism
}
parameter: {
auto: *true | bool