Feat: enhance the apply-once capability (#4874)

* Feat: enhance the apply-once capability

Signed-off-by: 朱晓兵 <596908030@qq.com>

* Fix: add unit-test

Signed-off-by: 朱晓兵 <596908030@qq.com>

* Fix: adjustment variable name

Signed-off-by: 朱晓兵 <596908030@qq.com>

* Fix: add doc

Signed-off-by: 朱晓兵 <596908030@qq.com>

* Fix: adjustment variable name

Signed-off-by: 朱晓兵 <596908030@qq.com>

Signed-off-by: 朱晓兵 <596908030@qq.com>
This commit is contained in:
朱晓兵
2022-10-25 21:15:21 +08:00
committed by GitHub
parent 2324357907
commit c4a0c1480d
8 changed files with 402 additions and 23 deletions

View File

@@ -23,8 +23,17 @@ import (
const (
// ApplyOncePolicyType refers to the type of configuration drift policy
ApplyOncePolicyType = "apply-once"
// ApplyOnceStrategyOnAppUpdate policy takes effect on application updating
ApplyOnceStrategyOnAppUpdate ApplyOnceAffectStrategy = "onUpdate"
// ApplyOnceStrategyOnAppStateKeep policy takes effect on application state keep
ApplyOnceStrategyOnAppStateKeep ApplyOnceAffectStrategy = "onStateKeep"
// ApplyOnceStrategyAlways policy takes effect always
ApplyOnceStrategyAlways ApplyOnceAffectStrategy = "always"
)
// ApplyOnceAffectStrategy is a string that mark the policy effective stage
type ApplyOnceAffectStrategy string
// ApplyOncePolicySpec defines the spec of preventing configuration drift
type ApplyOncePolicySpec struct {
Enable bool `json:"enable"`
@@ -45,6 +54,9 @@ type ApplyOnceStrategy struct {
// Path the specified path that allow configuration drift
// like 'spec.template.spec.containers[0].resources' and '*' means the whole target allow configuration drift
Path []string `json:"path"`
// ApplyOnceAffectStrategy Decide when the strategy will take effect
// like affect:onUpdate/onStateKeep/always
ApplyOnceAffectStrategy ApplyOnceAffectStrategy `json:"affect"`
}
// FindStrategy find apply-once strategy for target resource

View File

@@ -12,6 +12,8 @@ spec:
cue:
template: |
#ApplyOnceStrategy: {
// +usage=When the strategy takes effect,e.g. onUpdate、onStateKeep
affect?: string
// +usage=Specify the path of the resource that allow configuration drift
path: [...string]
}

View File

@@ -12,6 +12,8 @@ spec:
cue:
template: |
#ApplyOnceStrategy: {
// +usage=When the strategy takes effect,e.g. onUpdate、onStateKeep
affect?: string
// +usage=Specify the path of the resource that allow configuration drift
path: [...string]
}

View File

@@ -120,4 +120,61 @@ EOF
In the `apply-once-app-3` case, any changes of `hello-cosmos` deployment will not be brought back and any changes
of `hello-cosmos` service will be brought back in the next reconcile loop. In the same time, any changes
of `hello-world` component will be brought back in the next reconcile loop.
of `hello-world` component will be brought back in the next reconcile loop.
```shell
$ cat <<EOF | kubectl apply -f -
apiVersion: core.oam.dev/v1beta1
kind: Application
metadata:
name: apply-once-app-4
spec:
components:
- name: hello-world
type: webservice
properties:
image: crccheck/hello-world
port: 8080
traits:
- type: scaler
properties:
replicas: 1
- name: hello-cosmos
type: webservice
properties:
image: crccheck/hello-world
port: 8080
traits:
- type: scaler
properties:
replicas: 1
policies:
- name: apply-once
type: apply-once
properties:
enable: true
rules:
- selector:
componentNames: [ "hello-cosmos" ]
resourceTypes: [ "Deployment" ]
strategy:
affect: onStateKeep
path: [ "spec.replicas"]
EOF
```
By default, KubeVela executes the apply-once policy in two phases: application update and cycle state maintenance,
allowing configuration drift depending on the policy configuration.
If you have special requirements, you can set the affect to determine the phase of policy execution .
affect supported configurations: onUpdate/onStateKeep/always (default)
When affect=always, or not set, the policy is executed in two phase.
When affect=onStateKeep, the policy is executed only during the stateKeep phase. In the case of `apply-once-app-4`, any
changes to the deployed copy of `hello-cosmos` will not be brought back to the next state keeping loop, but will be
brought back to the next application update.
When affect=onUpdate, the policy is only executed when the application is updated. In the case of `
apply-once-app-4`, if affect=onUpdate is set, any changes to the deployed copy of `hello-cosmos` will not be brought
back in the next application update, but will be brought back in the next state keeping loop.

View File

@@ -0,0 +1,281 @@
/*
Copyright 2021. The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package application
import (
"context"
"encoding/json"
"fmt"
"time"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/pkg/errors"
v1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/yaml"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1alpha1"
"github.com/oam-dev/kubevela/pkg/resourcekeeper"
"github.com/oam-dev/kubevela/apis/core.oam.dev/common"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
"github.com/oam-dev/kubevela/pkg/oam/testutil"
"github.com/oam-dev/kubevela/pkg/oam/util"
)
var _ = Describe("Test Application with apply-once policy", func() {
ctx := context.Background()
initReplicas := int32(2)
targetReplicas := int32(5)
ns := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "apply-once-policy-test",
},
}
baseApp := &v1beta1.Application{
TypeMeta: metav1.TypeMeta{
Kind: "Application",
APIVersion: "core.oam.dev/v1beta1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "baseApp",
Namespace: ns.Name,
},
Spec: v1beta1.ApplicationSpec{
Components: []common.ApplicationComponent{
{
Name: "baseComp",
Type: "worker",
Properties: &runtime.RawExtension{Raw: []byte(`{"cmd":["sleep","1000"],"image":"busybox"}`)},
Traits: []common.ApplicationTrait{{
Type: "scale",
Properties: &runtime.RawExtension{Raw: []byte(fmt.Sprintf(`{"replicas": %d}`, initReplicas))},
}},
},
},
Policies: []v1beta1.AppPolicy{{
Name: "basePolicy",
Type: "apply-once",
Properties: &runtime.RawExtension{Raw: []byte(fmt.Sprintf(`{"enable": true,"rules": [{"selector": { "resourceTypes": ["Deployment"] }, "strategy": {"affect":"%s", "path": ["spec.replicas"] }}]}`, ""))},
}},
},
}
worker := &v1beta1.ComponentDefinition{}
workerCdDefJson, _ := yaml.YAMLToJSON([]byte(componentDefYaml))
scaleTrait := &v1beta1.TraitDefinition{}
scaleTdDefJson, _ := yaml.YAMLToJSON([]byte(scaleTraitDefYaml))
BeforeEach(func() {
Expect(k8sClient.Create(ctx, ns.DeepCopy())).Should(SatisfyAny(BeNil(), &util.AlreadyExistMatcher{}))
Expect(json.Unmarshal(workerCdDefJson, worker)).Should(BeNil())
Expect(k8sClient.Create(ctx, worker.DeepCopy())).Should(SatisfyAny(BeNil(), &util.AlreadyExistMatcher{}))
Expect(json.Unmarshal(scaleTdDefJson, scaleTrait)).Should(BeNil())
Expect(k8sClient.Create(ctx, scaleTrait.DeepCopy())).Should(SatisfyAny(BeNil(), &util.AlreadyExistMatcher{}))
})
Context("Test Application with apply-once policy in different affect stage", func() {
It(" Affect not set or affect is empty , test effective globally", func() {
app := baseApp.DeepCopy()
app.SetName("apply-once-app-1")
app.Spec.Components[0].Name = "apply-once-comp-1"
By("step 1. Create app , replicas: 2")
Expect(k8sClient.Create(ctx, app)).Should(BeNil())
Eventually(waitAppRunning(ctx, app), 3*time.Second, 300*time.Second).Should(BeNil())
By("step 2. Update deployment to replicas: 5 ")
Eventually(updateDeployReplicas(ctx, app, targetReplicas), time.Second*3, time.Microsecond*300).Should(BeNil())
By("step 3. Check OnUpdate, e.g. update app's component with new properties, replicas should be 5 ")
for i := 0; i <= 3; i++ {
properties := &runtime.RawExtension{Raw: []byte(fmt.Sprintf(`{"cmd":["sleep","%d"],"image":"busybox"}`, i*1000))}
Eventually(updateApp(ctx, app, properties), time.Second*3, time.Microsecond*300).Should(BeNil())
testutil.ReconcileRetry(reconciler, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(app)})
Eventually(waitAppRunning(ctx, app), 3*time.Second, 300*time.Second).Should(BeNil())
deploy := new(v1.Deployment)
deployObjKey := client.ObjectKey{Name: app.Spec.Components[0].Name, Namespace: app.Namespace}
Expect(k8sClient.Get(ctx, deployObjKey, deploy)).Should(BeNil())
Expect(*deploy.Spec.Replicas).Should(Equal(targetReplicas))
}
By("step 4. Check OnStateKeep, replicas also should be 5 ")
rk, err := resourcekeeper.NewResourceKeeper(context.Background(), k8sClient, app)
Expect(err).Should(BeNil())
for i := 0; i <= 3; i++ {
// state keep :5
Expect(rk.StateKeep(context.Background())).Should(BeNil())
deploy := new(v1.Deployment)
deployObjKey := client.ObjectKey{Name: app.Spec.Components[0].Name, Namespace: app.Namespace}
Expect(k8sClient.Get(ctx, deployObjKey, deploy)).Should(BeNil())
Expect(*deploy.Spec.Replicas).Should(Equal(targetReplicas))
}
})
It("Affect: onStateKeep, test only effective when state keep", func() {
By("step 1. Create app , replicas: 2")
app := baseApp.DeepCopy()
app.SetName("apply-once-app-2")
app.Spec.Components[0].Name = "apply-once-comp-2"
app.Spec.Policies[0].Properties = &runtime.RawExtension{Raw: []byte(fmt.Sprintf(`{"enable": true,"rules": [{"selector": { "resourceTypes": ["Deployment"] }, "strategy": {"affect":"%s", "path": ["spec.replicas"] }}]}`, v1alpha1.ApplyOnceStrategyOnAppStateKeep))}
Expect(k8sClient.Create(ctx, app)).Should(BeNil())
Eventually(waitAppRunning(ctx, app), 3*time.Second, 300*time.Second).Should(BeNil())
By("step 2. Update deployment, replicas: 5 ")
Eventually(updateDeployReplicas(ctx, app, targetReplicas), time.Second*3, time.Microsecond*300).Should(BeNil())
By("step 3. Check OnStateKeep, replicas should be 5 ")
rk, err := resourcekeeper.NewResourceKeeper(context.Background(), k8sClient, app)
Expect(err).Should(BeNil())
for i := 0; i <= 3; i++ {
// state keep : use newest replicas
Expect(rk.StateKeep(context.Background())).Should(BeNil())
deploy := new(v1.Deployment)
deployObjKey := client.ObjectKey{Name: app.Spec.Components[0].Name, Namespace: app.Namespace}
Expect(k8sClient.Get(ctx, deployObjKey, deploy)).Should(BeNil())
Expect(*deploy.Spec.Replicas).Should(Equal(targetReplicas))
}
By("step 4. Check OnUpdate, e.g. update app's component with new properties, replicas should be 2 ")
for i := 0; i <= 3; i++ {
// onupdate: not use newest replicas
properties := &runtime.RawExtension{Raw: []byte(fmt.Sprintf(`{"cmd":["sleep","%d"],"image":"busybox"}`, i*1000))}
Eventually(updateApp(ctx, app, properties), time.Second*3, time.Microsecond*300).Should(BeNil())
testutil.ReconcileRetry(reconciler, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(app)})
Eventually(waitAppRunning(ctx, app), 3*time.Second, 300*time.Second).Should(BeNil())
deploy := new(v1.Deployment)
deployObjKey := client.ObjectKey{Name: app.Spec.Components[0].Name, Namespace: app.Namespace}
Expect(k8sClient.Get(ctx, deployObjKey, deploy)).Should(BeNil())
Expect(*deploy.Spec.Replicas).Should(Equal(initReplicas))
}
})
It("Affect: onUpdate , test only effective when updating the app", func() {
By("step 1. Create app , replicas: 2")
app := baseApp.DeepCopy()
app.SetName("apply-once-app-3")
app.Spec.Components[0].Name = "apply-once-comp-3"
app.Spec.Policies[0].Properties = &runtime.RawExtension{Raw: []byte(fmt.Sprintf(`{"enable": true,"rules": [{"selector": { "resourceTypes": ["Deployment"] }, "strategy": {"affect":"%s", "path": ["spec.replicas"] }}]}`, v1alpha1.ApplyOnceStrategyOnAppUpdate))}
Expect(k8sClient.Create(ctx, app)).Should(BeNil())
Eventually(waitAppRunning(ctx, app), 3*time.Second, 300*time.Second).Should(BeNil())
By("step 2. Update deployment, replicas: 5 ")
Eventually(updateDeployReplicas(ctx, app, targetReplicas), time.Second*3, time.Microsecond*300).Should(BeNil())
By("step 3. Check OnUpdate, e.g. update app's component with new properties, replicas should be 5 ")
for i := 0; i <= 3; i++ {
// onUpdate : use newest replicas
properties := &runtime.RawExtension{Raw: []byte(fmt.Sprintf(`{"cmd":["sleep","%d"],"image":"busybox"}`, i*1000))}
Eventually(updateApp(ctx, app, properties), time.Second*3, time.Microsecond*300).Should(BeNil())
testutil.ReconcileRetry(reconciler, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(app)})
Eventually(waitAppRunning(ctx, app), 3*time.Second, 300*time.Second).Should(BeNil())
deploy := new(v1.Deployment)
deployObjKey := client.ObjectKey{Name: app.Spec.Components[0].Name, Namespace: app.Namespace}
Expect(k8sClient.Get(ctx, deployObjKey, deploy)).Should(BeNil())
Expect(*deploy.Spec.Replicas).Should(Equal(initReplicas))
}
By("step 4. Check OnStateKeep, replicas should be 2 ")
rk, err := resourcekeeper.NewResourceKeeper(context.Background(), k8sClient, app)
Expect(err).Should(BeNil())
for i := 0; i <= 3; i++ {
// state keep : not use newest replicas
Expect(rk.StateKeep(context.Background())).Should(BeNil())
deploy := new(v1.Deployment)
deployObjKey := client.ObjectKey{Name: app.Spec.Components[0].Name, Namespace: app.Namespace}
Expect(k8sClient.Get(ctx, deployObjKey, deploy)).Should(BeNil())
Expect(*deploy.Spec.Replicas).Should(Equal(initReplicas))
}
})
})
})
func updateDeployReplicas(ctx context.Context, app *v1beta1.Application, targetReplicas int32) func() error {
return func() error {
deploy := new(v1.Deployment)
deployKey := client.ObjectKey{Name: app.Spec.Components[0].Name, Namespace: app.Namespace}
Expect(k8sClient.Get(ctx, deployKey, deploy)).Should(BeNil())
deploy.Spec.Replicas = &targetReplicas
return k8sClient.Update(ctx, deploy)
}
}
func waitAppRunning(ctx context.Context, app *v1beta1.Application) func() error {
return func() error {
appV1 := new(v1beta1.Application)
_, err := testutil.ReconcileOnceAfterFinalizer(reconciler, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(app)})
if err != nil {
return err
}
if err := k8sClient.Get(ctx, client.ObjectKeyFromObject(app), appV1); err != nil {
return err
}
if appV1.Status.Phase != common.ApplicationRunning {
return errors.New("app is not in running status")
}
return nil
}
}
func updateApp(ctx context.Context, app *v1beta1.Application, properties *runtime.RawExtension) func() error {
return func() error {
oldApp := new(v1beta1.Application)
Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(app), oldApp)).Should(BeNil())
newApp := oldApp.DeepCopy()
newApp.Spec.Components[0].Properties = properties
return k8sClient.Update(ctx, newApp)
}
}
const (
scaleTraitDefYaml = `
apiVersion: core.oam.dev/v1beta1
kind: TraitDefinition
metadata:
annotations:
definition.oam.dev/description: Manually scale K8s pod for your workload which follows the pod spec in path 'spec.template'.
name: scale
namespace: vela-system
spec:
appliesToWorkloads:
- deployments.apps
- statefulsets.apps
podDisruptive: false
schematic:
cue:
template: |
parameter: {
// +usage=Specify the number of workload
replicas: *1 | int
}
// +patchStrategy=retainKeys
patch: spec: replicas: parameter.replicas
`
)

View File

@@ -23,6 +23,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1alpha1"
"github.com/oam-dev/kubevela/pkg/auth"
"github.com/oam-dev/kubevela/pkg/features"
"github.com/oam-dev/kubevela/pkg/multicluster"
@@ -141,6 +142,10 @@ func (h *resourceKeeper) dispatch(ctx context.Context, manifests []*unstructured
if h.isShared(manifest) {
ao = append([]apply.ApplyOption{apply.SharedByApp(h.app)}, ao...)
}
manifest, err := ApplyStrategies(applyCtx, h, manifest, v1alpha1.ApplyOnceStrategyOnAppUpdate)
if err != nil {
return errors.Wrapf(err, "failed to apply once policy for application %s,%s", h.app.Name, err.Error())
}
return h.applicator.Apply(applyCtx, manifest, ao...)
}, manifests, MaxDispatchConcurrent)
return velaerrors.AggregateErrors(errs.([]error))

View File

@@ -24,6 +24,9 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1alpha1"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
"github.com/oam-dev/kubevela/pkg/auth"
"github.com/oam-dev/kubevela/pkg/multicluster"
@@ -60,7 +63,7 @@ func (h *resourceKeeper) StateKeep(ctx context.Context) error {
return errors.Wrapf(err, "failed to decode resource %s from resourcetracker", mr.ResourceKey())
}
applyCtx := multicluster.ContextWithClusterName(ctx, mr.Cluster)
manifest, err = ApplyStrategies(applyCtx, h, manifest)
manifest, err = ApplyStrategies(applyCtx, h, manifest, v1alpha1.ApplyOnceStrategyOnAppStateKeep)
if err != nil {
return errors.Wrapf(err, "failed to apply once resource %s from resourcetracker %s", mr.ResourceKey(), rt.Name)
}
@@ -79,34 +82,49 @@ func (h *resourceKeeper) StateKeep(ctx context.Context) error {
}
// ApplyStrategies will generate manifest with applyOnceStrategy
func ApplyStrategies(ctx context.Context, h *resourceKeeper, manifest *unstructured.Unstructured) (*unstructured.Unstructured, error) {
func ApplyStrategies(ctx context.Context, h *resourceKeeper, manifest *unstructured.Unstructured, matchedAffectStage v1alpha1.ApplyOnceAffectStrategy) (*unstructured.Unstructured, error) {
if h.applyOncePolicy == nil {
return manifest, nil
}
applyOncePath := h.applyOncePolicy.FindStrategy(manifest)
if applyOncePath != nil {
un := new(unstructured.Unstructured)
un.SetAPIVersion(manifest.GetAPIVersion())
un.SetKind(manifest.GetKind())
err := h.Get(ctx, types.NamespacedName{Name: manifest.GetName(), Namespace: manifest.GetNamespace()}, un)
if err != nil {
return nil, err
}
for _, path := range applyOncePath.Path {
if path == "*" {
manifest = un.DeepCopy()
break
}
value, err := fieldpath.Pave(un.UnstructuredContent()).GetValue(path)
if err != nil {
return nil, err
}
err = fieldpath.Pave(manifest.UnstructuredContent()).SetValue(path, value)
strategy := h.applyOncePolicy.FindStrategy(manifest)
if strategy != nil {
affectStage := strategy.ApplyOnceAffectStrategy
if shouldMerge(affectStage, matchedAffectStage) {
un := new(unstructured.Unstructured)
un.SetAPIVersion(manifest.GetAPIVersion())
un.SetKind(manifest.GetKind())
err := h.Get(ctx, types.NamespacedName{Name: manifest.GetName(), Namespace: manifest.GetNamespace()}, un)
if err != nil {
if kerrors.IsNotFound(err) {
return manifest, nil
}
return nil, err
}
return mergeValue(strategy.Path, manifest, un)
}
}
return manifest, nil
}
func shouldMerge(affectStage v1alpha1.ApplyOnceAffectStrategy, matchedAffectType v1alpha1.ApplyOnceAffectStrategy) bool {
return affectStage == "" || affectStage == v1alpha1.ApplyOnceStrategyAlways || affectStage == matchedAffectType
}
func mergeValue(paths []string, manifest *unstructured.Unstructured, un *unstructured.Unstructured) (*unstructured.Unstructured, error) {
for _, path := range paths {
if path == "*" {
manifest = un.DeepCopy()
break
}
value, err := fieldpath.Pave(un.UnstructuredContent()).GetValue(path)
if err != nil {
return nil, err
}
err = fieldpath.Pave(manifest.UnstructuredContent()).SetValue(path, value)
if err != nil {
return nil, err
}
return manifest, nil
}
return manifest, nil
}

View File

@@ -8,6 +8,8 @@
template: {
#ApplyOnceStrategy: {
// +usage=When the strategy takes effect,e.g. onUpdate、onStateKeep
affect?: string
// +usage=Specify the path of the resource that allow configuration drift
path: [...string]
}