Feat: provide stage field for TraitDefinition (#4570)

* Feat: provide stage field for TraitDefinition

Signed-off-by: ZhongsJie <zhongsjie@gmail.com>

* Feat: Refactor the collect health status fn

Signed-off-by: ZhongsJie <zhongsjie@gmail.com>

Signed-off-by: ZhongsJie <zhongsjie@gmail.com>
This commit is contained in:
ZhongsJie
2022-10-08 10:38:06 +08:00
committed by GitHub
parent 4c56fac228
commit 3299184d3b
18 changed files with 731 additions and 60 deletions

View File

@@ -249,6 +249,12 @@ type ApplicationComponentStatus struct {
Scopes []corev1.ObjectReference `json:"scopes,omitempty"`
}
// Equal check if two ApplicationComponentStatus are equal
func (in ApplicationComponentStatus) Equal(r ApplicationComponentStatus) bool {
return in.Name == r.Name && in.Namespace == r.Namespace &&
in.Cluster == r.Cluster && in.Env == r.Env
}
// ApplicationTraitStatus records the trait health status
type ApplicationTraitStatus struct {
Type string `json:"type"`

View File

@@ -158,8 +158,29 @@ type TraitDefinitionSpec struct {
// ControlPlaneOnly defines which cluster is dispatched to
// +optional
ControlPlaneOnly bool `json:"controlPlaneOnly,omitempty"`
// Stage defines the stage information to which this trait resource processing belongs.
// Currently, PreDispatch and PostDispatch are provided, which are used to control resource
// pre-process and post-process respectively.
// +optional
Stage StageType `json:"stage,omitempty"`
}
// StageType describes how the manifests should be dispatched.
// Only one of the following stage types may be specified.
// If none of the following types is specified, the default one
// is DefaultDispatch.
type StageType string
const (
// PreDispatch means that pre dispatch for manifests
PreDispatch StageType = "PreDispatch"
// DefaultDispatch means that default dispatch for manifests
DefaultDispatch StageType = "DefaultDispatch"
// PostDispatch means that post dispatch for manifests
PostDispatch StageType = "PostDispatch"
)
// TraitDefinitionStatus is the status of TraitDefinition
type TraitDefinitionStatus struct {
// ConditionedStatus reflects the observed status of a resource

View File

@@ -3887,6 +3887,12 @@ spec:
- configuration
type: object
type: object
stage:
description: Stage defines the stage information to which
this trait resource processing belongs. Currently, PreDispatch
and PostDispatch are provided, which are used to control
resource pre-process and post-process respectively.
type: string
status:
description: Status defines the custom health policy and
status message for trait

View File

@@ -916,6 +916,12 @@ spec:
- configuration
type: object
type: object
stage:
description: Stage defines the stage information to which
this trait resource processing belongs. Currently, PreDispatch
and PostDispatch are provided, which are used to control
resource pre-process and post-process respectively.
type: string
status:
description: Status defines the custom health policy and status
message for trait

View File

@@ -558,6 +558,12 @@ spec:
- configuration
type: object
type: object
stage:
description: Stage defines the stage information to which this trait
resource processing belongs. Currently, PreDispatch and PostDispatch
are provided, which are used to control resource pre-process and
post-process respectively.
type: string
status:
description: Status defines the custom health policy and status message
for trait

View File

@@ -3887,6 +3887,12 @@ spec:
- configuration
type: object
type: object
stage:
description: Stage defines the stage information to which
this trait resource processing belongs. Currently, PreDispatch
and PostDispatch are provided, which are used to control
resource pre-process and post-process respectively.
type: string
status:
description: Status defines the custom health policy and
status message for trait

View File

@@ -916,6 +916,12 @@ spec:
- configuration
type: object
type: object
stage:
description: Stage defines the stage information to which
this trait resource processing belongs. Currently, PreDispatch
and PostDispatch are provided, which are used to control
resource pre-process and post-process respectively.
type: string
status:
description: Status defines the custom health policy and status
message for trait

View File

@@ -558,6 +558,12 @@ spec:
- configuration
type: object
type: object
stage:
description: Stage defines the stage information to which this trait
resource processing belongs. Currently, PreDispatch and PostDispatch
are provided, which are used to control resource pre-process and
post-process respectively.
type: string
status:
description: Status defines the custom health policy and status message
for trait

View File

@@ -3887,6 +3887,12 @@ spec:
- configuration
type: object
type: object
stage:
description: Stage defines the stage information to which
this trait resource processing belongs. Currently, PreDispatch
and PostDispatch are provided, which are used to control
resource pre-process and post-process respectively.
type: string
status:
description: Status defines the custom health policy and
status message for trait

View File

@@ -916,6 +916,12 @@ spec:
- configuration
type: object
type: object
stage:
description: Stage defines the stage information to which
this trait resource processing belongs. Currently, PreDispatch
and PostDispatch are provided, which are used to control
resource pre-process and post-process respectively.
type: string
status:
description: Status defines the custom health policy and status
message for trait

View File

@@ -558,6 +558,12 @@ spec:
- configuration
type: object
type: object
stage:
description: Stage defines the stage information to which this trait
resource processing belongs. Currently, PreDispatch and PostDispatch
are provided, which are used to control resource pre-process and
post-process respectively.
type: string
status:
description: Status defines the custom health policy and status message
for trait

View File

@@ -26,6 +26,8 @@ import (
"testing"
"time"
"github.com/oam-dev/kubevela/pkg/features"
"github.com/google/go-cmp/cmp"
testdef "github.com/kubevela/pkg/util/test/definition"
wffeatures "github.com/kubevela/workflow/pkg/features"
@@ -421,7 +423,7 @@ var _ = Describe("Test Application Controller", func() {
return strings.ReplaceAll(s, `{{ include "systemDefinitionNamespace" . }}`, "vela-system")
})).Should(SatisfyAny(Succeed(), &util.AlreadyExistMatcher{}))
}
for _, def := range []string{"panic", "hubcpuscaler"} {
for _, def := range []string{"panic", "hubcpuscaler", "storage-pre-dispatch", "storage-pre-dispatch-unhealthy"} {
Expect(testdef.InstallDefinitionFromYAML(ctx, k8sClient, filepath.Join(file, "../../application/testdata/definitions/", def+".yaml"), nil)).
Should(SatisfyAny(Succeed(), &util.AlreadyExistMatcher{}))
}
@@ -4007,6 +4009,106 @@ var _ = Describe("Test Application Controller", func() {
// Expect(curApp.Status.Phase).Should(Equal(common.ApplicationWorkflowFailed))
Expect(curApp.Status.Phase).Should(Equal(common.ApplicationRunning))
})
It("test application with healthy and PreDispatch trait", func() {
defer featuregatetesting.SetFeatureGateDuringTest(&testing.T{}, utilfeature.DefaultFeatureGate, features.MultiStageComponentApply, true)()
By("create the new namespace")
ns := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "app-with-pre-dispatch-healthy",
},
}
Expect(k8sClient.Create(ctx, ns)).Should(BeNil())
appWithPreDispatch := appwithNoTrait.DeepCopy()
appWithPreDispatch.Spec.Components[0].Name = "comp-with-pre-dispatch-trait"
appWithPreDispatch.Spec.Components[0].Traits = []common.ApplicationTrait{
{
Type: "storage-pre-dispatch",
Properties: &runtime.RawExtension{Raw: []byte("{\"configMap\":[{\"name\":\"pre-dispatch-cm\",\"mountPath\":\"/test/mount/cm\",\"data\":{\"firstKey\":\"firstValue\"}}]}")},
},
}
appWithPreDispatch.Name = "app-with-pre-dispatch"
appWithPreDispatch.SetNamespace(ns.Name)
Expect(k8sClient.Create(ctx, appWithPreDispatch)).Should(BeNil())
appKey := client.ObjectKey{
Name: appWithPreDispatch.Name,
Namespace: appWithPreDispatch.Namespace,
}
testutil.ReconcileOnceAfterFinalizer(reconciler, reconcile.Request{NamespacedName: appKey})
By("Check App running successfully")
curApp := &v1beta1.Application{}
Expect(k8sClient.Get(ctx, appKey, curApp)).Should(BeNil())
Expect(curApp.Status.Phase).Should(Equal(common.ApplicationRunning))
By("Check Manifests Created")
expConfigMap := &corev1.ConfigMap{}
Expect(k8sClient.Get(ctx, types.NamespacedName{
Name: "pre-dispatch-cm",
Namespace: appWithPreDispatch.Namespace,
}, expConfigMap)).Should(BeNil())
expDeployment := &v1.Deployment{}
Expect(k8sClient.Get(ctx, client.ObjectKey{
Name: appWithPreDispatch.Spec.Components[0].Name,
Namespace: appWithPreDispatch.Namespace,
}, expDeployment)).Should(BeNil())
Expect(k8sClient.Delete(ctx, appWithPreDispatch)).Should(BeNil())
Expect(k8sClient.Delete(ctx, ns)).Should(BeNil())
})
It("test application with unhealthy and PreDispatch trait", func() {
defer featuregatetesting.SetFeatureGateDuringTest(&testing.T{}, utilfeature.DefaultFeatureGate, features.MultiStageComponentApply, true)()
By("create the new namespace")
ns := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "app-with-pre-dispatch-unhealthy",
},
}
Expect(k8sClient.Create(ctx, ns)).Should(BeNil())
appWithPreDispatch := appwithNoTrait.DeepCopy()
appWithPreDispatch.Spec.Components[0].Name = "comp-with-pre-dispatch-trait"
appWithPreDispatch.Spec.Components[0].Traits = []common.ApplicationTrait{
{
Type: "storage-pre-dispatch-unhealthy",
Properties: &runtime.RawExtension{Raw: []byte("{\"configMap\":[{\"name\":\"pre-dispatch-unhealthy-cm\",\"mountPath\":\"/test/mount/cm\",\"data\":{\"firstKey\":\"firstValue\"}}]}")},
},
}
appWithPreDispatch.Name = "app-with-pre-dispatch-unhealthy"
appWithPreDispatch.SetNamespace(ns.Name)
Expect(k8sClient.Create(ctx, appWithPreDispatch)).Should(BeNil())
appKey := client.ObjectKey{
Name: appWithPreDispatch.Name,
Namespace: appWithPreDispatch.Namespace,
}
testutil.ReconcileOnceAfterFinalizer(reconciler, reconcile.Request{NamespacedName: appKey})
By("Check the Application status")
curApp := &v1beta1.Application{}
Expect(k8sClient.Get(ctx, appKey, curApp)).Should(BeNil())
Expect(curApp.Status.Phase).Should(Equal(common.ApplicationRunningWorkflow))
By("Check Manifests Created")
expConfigMap := &corev1.ConfigMap{}
Expect(k8sClient.Get(ctx, types.NamespacedName{
Name: "pre-dispatch-unhealthy-cm",
Namespace: appWithPreDispatch.Namespace,
}, expConfigMap)).Should(BeNil())
expDeployment := &v1.Deployment{}
Expect(k8sClient.Get(ctx, client.ObjectKey{
Name: appWithPreDispatch.Spec.Components[0].Name,
Namespace: appWithPreDispatch.Namespace,
}, expDeployment)).Should(util.NotFoundMatcher{})
Expect(k8sClient.Delete(ctx, appWithPreDispatch)).Should(BeNil())
Expect(k8sClient.Delete(ctx, ns)).Should(BeNil())
})
})
const (

View File

@@ -188,6 +188,17 @@ func removeResources(elements []common.ClusterObjectReference, index int) []comm
return elements[:len(elements)-1]
}
// getServiceStatus get specified component status
func (h *AppHandler) getServiceStatus(svc common.ApplicationComponentStatus) common.ApplicationComponentStatus {
for i := range h.services {
current := h.services[i]
if current.Equal(svc) {
return current
}
}
return svc
}
// addServiceStatus recorde the whole component status.
// reconcile run at single threaded. So there is no need to consider to use locker.
func (h *AppHandler) addServiceStatus(cover bool, svcs ...common.ApplicationComponentStatus) {
@@ -197,7 +208,7 @@ func (h *AppHandler) addServiceStatus(cover bool, svcs ...common.ApplicationComp
found := false
for i := range h.services {
current := h.services[i]
if current.Name == svc.Name && current.Env == svc.Env && current.Namespace == svc.Namespace && current.Cluster == svc.Cluster {
if current.Equal(svc) {
if cover {
h.services[i] = svc
}
@@ -216,10 +227,77 @@ func (h *AppHandler) ProduceArtifacts(ctx context.Context, comps []*types.Compon
return h.createResourcesConfigMap(ctx, h.currentAppRev, comps, policies)
}
// nolint
func (h *AppHandler) collectHealthStatus(ctx context.Context, wl *appfile.Workload, appRev *v1beta1.ApplicationRevision, overrideNamespace string) (*common.ApplicationComponentStatus, bool, error) {
accessor := util.NewApplicationResourceNamespaceAccessor(h.app.Namespace, overrideNamespace)
// collectTraitHealthStatus collect trait health status
func (h *AppHandler) collectTraitHealthStatus(wl *appfile.Workload, tr *appfile.Trait, appRev *v1beta1.ApplicationRevision, overrideNamespace string) (common.ApplicationTraitStatus, error) {
defer func(clusterName string) {
wl.Ctx.SetCtx(pkgmulticluster.WithCluster(wl.Ctx.GetCtx(), clusterName))
}(multicluster.ClusterNameInContext(wl.Ctx.GetCtx()))
var (
pCtx = wl.Ctx
appName = appRev.Spec.Application.Name
traitStatus = common.ApplicationTraitStatus{
Type: tr.Name,
Healthy: true,
}
traitOverrideNamespace = overrideNamespace
err error
)
if tr.FullTemplate.TraitDefinition.Spec.ControlPlaneOnly {
traitOverrideNamespace = appRev.GetNamespace()
pCtx.SetCtx(pkgmulticluster.WithCluster(pCtx.GetCtx(), pkgmulticluster.Local))
}
_accessor := util.NewApplicationResourceNamespaceAccessor(h.app.Namespace, traitOverrideNamespace)
if ok, err := tr.EvalHealth(pCtx, h.r.Client, _accessor); !ok || err != nil {
traitStatus.Healthy = false
}
traitStatus.Message, err = tr.EvalStatus(pCtx, h.r.Client, _accessor)
if err != nil {
return common.ApplicationTraitStatus{}, errors.WithMessagef(err, "app=%s, comp=%s, trait=%s, evaluate status message error", appName, wl.Name, tr.Name)
}
return traitStatus, nil
}
// collectWorkloadHealthStatus collect workload health status
func (h *AppHandler) collectWorkloadHealthStatus(ctx context.Context, wl *appfile.Workload, appRev *v1beta1.ApplicationRevision, status *common.ApplicationComponentStatus, accessor util.NamespaceAccessor) (bool, error) {
var (
appName = appRev.Spec.Application.Name
isHealth = true
err error
)
if wl.CapabilityCategory == types.TerraformCategory {
var configuration terraforv1beta2.Configuration
if err := h.r.Client.Get(ctx, client.ObjectKey{Name: wl.Name, Namespace: accessor.Namespace()}, &configuration); err != nil {
if kerrors.IsNotFound(err) {
var legacyConfiguration terraforv1beta1.Configuration
if err := h.r.Client.Get(ctx, client.ObjectKey{Name: wl.Name, Namespace: accessor.Namespace()}, &legacyConfiguration); err != nil {
return false, errors.WithMessagef(err, "app=%s, comp=%s, check health error", appName, wl.Name)
}
isHealth = setStatus(status, legacyConfiguration.Status.ObservedGeneration, legacyConfiguration.Generation,
legacyConfiguration.GetLabels(), appRev.Name, legacyConfiguration.Status.Apply.State, legacyConfiguration.Status.Apply.Message)
} else {
return false, errors.WithMessagef(err, "app=%s, comp=%s, check health error", appName, wl.Name)
}
} else {
isHealth = setStatus(status, configuration.Status.ObservedGeneration, configuration.Generation, configuration.GetLabels(),
appRev.Name, configuration.Status.Apply.State, configuration.Status.Apply.Message)
}
} else {
if ok, err := wl.EvalHealth(wl.Ctx, h.r.Client, accessor); !ok || err != nil {
isHealth = false
}
status.Healthy = isHealth
status.Message, err = wl.EvalStatus(wl.Ctx, h.r.Client, accessor)
if err != nil {
return false, errors.WithMessagef(err, "app=%s, comp=%s, evaluate workload status message error", appName, wl.Name)
}
}
return isHealth, nil
}
// nolint
func (h *AppHandler) collectHealthStatus(ctx context.Context, wl *appfile.Workload, appRev *v1beta1.ApplicationRevision, overrideNamespace string, skipWorkload bool, traitFilters ...TraitFilter) (*common.ApplicationComponentStatus, bool, error) {
accessor := util.NewApplicationResourceNamespaceAccessor(h.app.Namespace, overrideNamespace)
var (
status = common.ApplicationComponentStatus{
Name: wl.Name,
@@ -228,68 +306,48 @@ func (h *AppHandler) collectHealthStatus(ctx context.Context, wl *appfile.Worklo
Namespace: accessor.Namespace(),
Cluster: multicluster.ClusterNameInContext(ctx),
}
appName = appRev.Spec.Application.Name
isHealth = true
err error
)
if wl.CapabilityCategory == types.TerraformCategory {
var configuration terraforv1beta2.Configuration
if err := h.r.Client.Get(ctx, client.ObjectKey{Name: wl.Name, Namespace: accessor.Namespace()}, &configuration); err != nil {
if kerrors.IsNotFound(err) {
var legacyConfiguration terraforv1beta1.Configuration
if err := h.r.Client.Get(ctx, client.ObjectKey{Name: wl.Name, Namespace: accessor.Namespace()}, &legacyConfiguration); err != nil {
return nil, false, errors.WithMessagef(err, "app=%s, comp=%s, check health error", appName, wl.Name)
}
isHealth = setStatus(&status, legacyConfiguration.Status.ObservedGeneration, legacyConfiguration.Generation,
legacyConfiguration.GetLabels(), appRev.Name, legacyConfiguration.Status.Apply.State, legacyConfiguration.Status.Apply.Message)
} else {
return nil, false, errors.WithMessagef(err, "app=%s, comp=%s, check health error", appName, wl.Name)
}
} else {
isHealth = setStatus(&status, configuration.Status.ObservedGeneration, configuration.Generation, configuration.GetLabels(),
appRev.Name, configuration.Status.Apply.State, configuration.Status.Apply.Message)
}
} else {
if ok, err := wl.EvalHealth(wl.Ctx, h.r.Client, accessor); !ok || err != nil {
isHealth = false
status.Healthy = false
}
status.Message, err = wl.EvalStatus(wl.Ctx, h.r.Client, accessor)
status = h.getServiceStatus(status)
if !skipWorkload {
isHealth, err = h.collectWorkloadHealthStatus(ctx, wl, appRev, &status, accessor)
if err != nil {
return nil, false, errors.WithMessagef(err, "app=%s, comp=%s, evaluate workload status message error", appName, wl.Name)
return nil, false, err
}
}
var traitStatusList []common.ApplicationTraitStatus
collectNext:
for _, tr := range wl.Traits {
traitOverrideNamespace := overrideNamespace
if tr.FullTemplate.TraitDefinition.Spec.ControlPlaneOnly {
traitOverrideNamespace = appRev.GetNamespace()
wl.Ctx.SetCtx(pkgmulticluster.WithCluster(wl.Ctx.GetCtx(), pkgmulticluster.Local))
for _, filter := range traitFilters {
// If filtered out by one of the filters
if filter(*tr) {
continue collectNext
}
}
_accessor := util.NewApplicationResourceNamespaceAccessor(h.app.Namespace, traitOverrideNamespace)
var traitStatus = common.ApplicationTraitStatus{
Type: tr.Name,
Healthy: true,
}
if ok, err := tr.EvalHealth(wl.Ctx, h.r.Client, _accessor); !ok || err != nil {
isHealth = false
traitStatus.Healthy = false
}
traitStatus.Message, err = tr.EvalStatus(wl.Ctx, h.r.Client, _accessor)
traitStatus, err := h.collectTraitHealthStatus(wl, tr, appRev, overrideNamespace)
if err != nil {
return nil, false, errors.WithMessagef(err, "app=%s, comp=%s, trait=%s, evaluate status message error", appName, wl.Name, tr.Name)
return nil, false, err
}
isHealth = isHealth && traitStatus.Healthy
if status.Message == "" && traitStatus.Message != "" {
status.Message = traitStatus.Message
}
traitStatusList = append(traitStatusList, traitStatus)
wl.Ctx.SetCtx(pkgmulticluster.WithCluster(wl.Ctx.GetCtx(), status.Cluster))
}
status.Traits = traitStatusList
var oldStatus []common.ApplicationTraitStatus
for _, _trait := range status.Traits {
if _trait.Type != tr.Name {
oldStatus = append(oldStatus, _trait)
}
}
status.Traits = oldStatus
}
status.Traits = append(status.Traits, traitStatusList...)
status.Scopes = generateScopeReference(wl.Scopes)
h.addServiceStatus(true, status)
return &status, isHealth, nil

View File

@@ -0,0 +1,215 @@
/*
Copyright 2022 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package application
import (
"context"
"sort"
"strings"
"github.com/oam-dev/kubevela/apis/core.oam.dev/common"
"github.com/oam-dev/kubevela/pkg/cue/definition"
"github.com/oam-dev/kubevela/pkg/oam"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
"github.com/oam-dev/kubevela/pkg/appfile"
)
// DispatchOptions is the options for dispatch
type DispatchOptions struct {
Workload *unstructured.Unstructured
Traits []*unstructured.Unstructured
OverrideNamespace string
Stage StageType
}
// SortDispatchOptions describe the sorting for options
type SortDispatchOptions []DispatchOptions
func (s SortDispatchOptions) Len() int {
return len(s)
}
func (s SortDispatchOptions) Less(i, j int) bool {
return s[i].Stage < s[j].Stage
}
func (s SortDispatchOptions) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
var _ sort.Interface = new(SortDispatchOptions)
// StageType is a valid value for TraitDefinitionSpec.Stage
type StageType int
const (
// PreDispatch means that pre dispatch for manifests
PreDispatch StageType = iota
// DefaultDispatch means that default dispatch for manifests
DefaultDispatch
// PostDispatch means that post dispatch for manifests
PostDispatch
)
var stages = map[StageType]string{
PreDispatch: "PreDispatch",
DefaultDispatch: "DefaultDispatch",
PostDispatch: "PostDispatch",
}
// ParseStageType parse the StageType from a string
func ParseStageType(s string) (StageType, error) {
for k, v := range stages {
if v == s {
return k, nil
}
}
return -1, errors.New("unknown stage type")
}
// TraitFilter is used to filter trait object.
type TraitFilter func(trait appfile.Trait) bool
// ByTraitType returns a filter that does not match the given type and belongs to readyTraits.
func ByTraitType(readyTraits, checkTraits []*unstructured.Unstructured) TraitFilter {
generateFn := func(manifests []*unstructured.Unstructured) map[string]bool {
out := map[string]bool{}
for _, obj := range manifests {
out[obj.GetLabels()[oam.TraitTypeLabel]] = true
}
return out
}
readyMap := generateFn(readyTraits)
checkMap := generateFn(checkTraits)
return func(trait appfile.Trait) bool {
return !checkMap[trait.Name] && readyMap[trait.Name]
}
}
// manifestDispatcher is a manifest dispatcher
type manifestDispatcher struct {
run func(ctx context.Context, wl *appfile.Workload, appRev *v1beta1.ApplicationRevision, clusterName string) (bool, error)
healthCheck func(ctx context.Context, wl *appfile.Workload, appRev *v1beta1.ApplicationRevision) (bool, error)
}
func (h *AppHandler) generateDispatcher(appRev *v1beta1.ApplicationRevision, readyWorkload *unstructured.Unstructured, readyTraits []*unstructured.Unstructured, overrideNamespace string) ([]*manifestDispatcher, error) {
dispatcherGenerator := func(options DispatchOptions) *manifestDispatcher {
assembleManifestFn := func(skipApplyWorkload bool) (bool, []*unstructured.Unstructured) {
manifests := options.Traits
skipWorkload := skipApplyWorkload || options.Workload == nil
if !skipWorkload {
manifests = append([]*unstructured.Unstructured{options.Workload}, options.Traits...)
}
return skipWorkload, manifests
}
dispatcher := new(manifestDispatcher)
dispatcher.healthCheck = func(ctx context.Context, wl *appfile.Workload, appRev *v1beta1.ApplicationRevision) (bool, error) {
skipWorkload, manifests := assembleManifestFn(wl.SkipApplyWorkload)
if !h.resourceKeeper.ContainsResources(manifests) {
return false, nil
}
_, isHealth, err := h.collectHealthStatus(ctx, wl, appRev, options.OverrideNamespace, skipWorkload,
ByTraitType(readyTraits, options.Traits))
if err != nil {
return false, err
}
return isHealth, nil
}
dispatcher.run = func(ctx context.Context, wl *appfile.Workload, appRev *v1beta1.ApplicationRevision, clusterName string) (bool, error) {
skipWorkload, dispatchManifests := assembleManifestFn(wl.SkipApplyWorkload)
if isHealth, err := dispatcher.healthCheck(ctx, wl, appRev); !isHealth || err != nil {
if err := h.Dispatch(ctx, clusterName, common.WorkflowResourceCreator, dispatchManifests...); err != nil {
return false, errors.WithMessage(err, "Dispatch")
}
status, isHealth, err := h.collectHealthStatus(ctx, wl, appRev, options.OverrideNamespace, skipWorkload,
ByTraitType(readyTraits, options.Traits))
if err != nil {
return false, errors.WithMessage(err, "CollectHealthStatus")
}
if options.Stage < DefaultDispatch {
status.Healthy = false
if status.Message == "" {
status.Message = "waiting for previous stage healthy"
}
h.addServiceStatus(true, *status)
}
if !isHealth {
return false, nil
}
}
return true, nil
}
return dispatcher
}
traitStageMap := make(map[StageType][]*unstructured.Unstructured)
for _, readyTrait := range readyTraits {
var (
traitType = readyTrait.GetLabels()[oam.TraitTypeLabel]
stageType = DefaultDispatch
err error
)
switch {
case traitType == definition.AuxiliaryWorkload:
case traitType != "":
if strings.Contains(traitType, "-") {
splitName := traitType[0:strings.LastIndex(traitType, "-")]
if _, ok := appRev.Spec.TraitDefinitions[splitName]; ok {
traitType = splitName
}
}
if trait, ok := appRev.Spec.TraitDefinitions[traitType]; ok {
_stageType := trait.Spec.Stage
if len(_stageType) == 0 {
_stageType = v1beta1.DefaultDispatch
}
stageType, err = ParseStageType(string(_stageType))
if err != nil {
return nil, err
}
}
}
traitStageMap[stageType] = append(traitStageMap[stageType], readyTrait)
}
var optionList SortDispatchOptions
if _, ok := traitStageMap[DefaultDispatch]; !ok {
traitStageMap[DefaultDispatch] = []*unstructured.Unstructured{}
}
for stage, traits := range traitStageMap {
option := DispatchOptions{
Stage: stage,
Traits: traits,
OverrideNamespace: overrideNamespace,
}
if stage == DefaultDispatch {
option.Workload = readyWorkload
}
optionList = append(optionList, option)
}
sort.Sort(optionList)
var manifestDispatchers []*manifestDispatcher
for _, option := range optionList {
manifestDispatchers = append(manifestDispatchers, dispatcherGenerator(option))
}
return manifestDispatchers, nil
}

View File

@@ -22,6 +22,10 @@ import (
"strings"
"time"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"github.com/oam-dev/kubevela/pkg/features"
"github.com/crossplane/crossplane-runtime/pkg/meta"
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -322,7 +326,7 @@ func (h *AppHandler) checkComponentHealth(appParser *appfile.Parser, appRev *v1b
return false, err
}
_, isHealth, err := h.collectHealthStatus(auth.ContextWithUserInfo(ctx, h.app), wl, appRev, overrideNamespace)
_, isHealth, err := h.collectHealthStatus(auth.ContextWithUserInfo(ctx, h.app), wl, appRev, overrideNamespace, false)
return isHealth, err
}
}
@@ -354,18 +358,31 @@ func (h *AppHandler) applyComponentFunc(appParser *appfile.Parser, appRev *v1bet
}
checkSkipApplyWorkload(wl)
dispatchResources := readyTraits
if !wl.SkipApplyWorkload {
dispatchResources = append([]*unstructured.Unstructured{readyWorkload}, readyTraits...)
}
isHealth := true
if utilfeature.DefaultMutableFeatureGate.Enabled(features.MultiStageComponentApply) {
manifestDispatchers, err := h.generateDispatcher(appRev, readyWorkload, readyTraits, overrideNamespace)
if err != nil {
return nil, nil, false, errors.WithMessage(err, "generateDispatcher")
}
if err := h.Dispatch(ctx, clusterName, common.WorkflowResourceCreator, dispatchResources...); err != nil {
return nil, nil, false, errors.WithMessage(err, "Dispatch")
}
for _, dispatcher := range manifestDispatchers {
if isHealth, err := dispatcher.run(ctx, wl, appRev, clusterName); !isHealth || err != nil {
return nil, nil, false, err
}
}
} else {
dispatchResources := readyTraits
if !wl.SkipApplyWorkload {
dispatchResources = append([]*unstructured.Unstructured{readyWorkload}, readyTraits...)
}
_, isHealth, err := h.collectHealthStatus(ctx, wl, appRev, overrideNamespace)
if err != nil {
return nil, nil, false, errors.WithMessage(err, "CollectHealthStatus")
if err := h.Dispatch(ctx, clusterName, common.WorkflowResourceCreator, dispatchResources...); err != nil {
return nil, nil, false, errors.WithMessage(err, "Dispatch")
}
_, isHealth, err = h.collectHealthStatus(ctx, wl, appRev, overrideNamespace, false)
if err != nil {
return nil, nil, false, errors.WithMessage(err, "CollectHealthStatus")
}
}
if DisableResourceApplyDoubleCheck {

View File

@@ -0,0 +1,99 @@
apiVersion: core.oam.dev/v1beta1
kind: TraitDefinition
metadata:
annotations: {}
name: storage-pre-dispatch-unhealthy
namespace: vela-system
spec:
appliesToWorkloads:
- deployments.apps
podDisruptive: true
stage: PreDispatch
schematic:
cue:
template: |
configMapVolumesList: *[
if parameter.configMap != _|_ for v in parameter.configMap if v.mountPath != _|_ {
{
name: "configmap-" + v.name
configMap: {
defaultMode: v.defaultMode
name: v.name
if v.items != _|_ {
items: v.items
}
}
}
},
] | []
configMapVolumeMountsList: *[
if parameter.configMap != _|_ for v in parameter.configMap if v.mountPath != _|_ {
{
name: "configmap-" + v.name
mountPath: v.mountPath
if v.subPath != _|_ {
subPath: v.subPath
}
}
},
] | []
volumesList: configMapVolumesList
deDupVolumesArray: [
for val in [
for i, vi in volumesList {
for j, vj in volumesList if j < i && vi.name == vj.name {
_ignore: true
}
vi
},
] if val._ignore == _|_ {
val
},
]
patch: spec: template: spec: {
// +patchKey=name
volumes: deDupVolumesArray
containers: [{
// +patchKey=name
volumeMounts: configMapVolumeMountsList
}, ...]
}
outputs: {
if parameter.configMap != _|_ for v in parameter.configMap {
if v.mountOnly == false {
"configmap-\(v.name)": {
apiVersion: "v1"
kind: "ConfigMap"
metadata: name: v.name
if v.data != _|_ {
data: v.data
}
}
}
}
}
parameter: {
// +usage=Declare config map type storage
configMap?: [...{
name: string
mountOnly: *false | bool
mountPath?: string
subPath?: string
defaultMode: *420 | int
readOnly: *false | bool
data?: {...}
}]
}
status:
customStatus: |-
message: ""
healthPolicy: |-
isHealth: false

View File

@@ -0,0 +1,94 @@
apiVersion: core.oam.dev/v1beta1
kind: TraitDefinition
metadata:
annotations: {}
name: storage-pre-dispatch
namespace: vela-system
spec:
appliesToWorkloads:
- deployments.apps
podDisruptive: true
stage: PreDispatch
schematic:
cue:
template: |
configMapVolumesList: *[
if parameter.configMap != _|_ for v in parameter.configMap if v.mountPath != _|_ {
{
name: "configmap-" + v.name
configMap: {
defaultMode: v.defaultMode
name: v.name
if v.items != _|_ {
items: v.items
}
}
}
},
] | []
configMapVolumeMountsList: *[
if parameter.configMap != _|_ for v in parameter.configMap if v.mountPath != _|_ {
{
name: "configmap-" + v.name
mountPath: v.mountPath
if v.subPath != _|_ {
subPath: v.subPath
}
}
},
] | []
volumesList: configMapVolumesList
deDupVolumesArray: [
for val in [
for i, vi in volumesList {
for j, vj in volumesList if j < i && vi.name == vj.name {
_ignore: true
}
vi
},
] if val._ignore == _|_ {
val
},
]
patch: spec: template: spec: {
// +patchKey=name
volumes: deDupVolumesArray
containers: [{
// +patchKey=name
volumeMounts: configMapVolumeMountsList
}, ...]
}
outputs: {
if parameter.configMap != _|_ for v in parameter.configMap {
if v.mountOnly == false {
"configmap-\(v.name)": {
apiVersion: "v1"
kind: "ConfigMap"
metadata: name: v.name
if v.data != _|_ {
data: v.data
}
}
}
}
}
parameter: {
// +usage=Declare config map type storage
configMap?: [...{
name: string
mountOnly: *false | bool
mountPath?: string
subPath?: string
defaultMode: *420 | int
readOnly: *false | bool
data?: {...}
}]
}

View File

@@ -73,6 +73,10 @@ const (
// If enabled, no StateKeep will be run, ResourceTracker will also disable the storage of all resource data, only
// metadata will be kept
ApplyOnce featuregate.Feature = "ApplyOnce"
// MultiStageComponentApply enable multi-stage feature for component
// If enabled, the dispatch of manifests is performed in batches according to the stage
MultiStageComponentApply featuregate.Feature = "MultiStageComponentApply"
)
var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
@@ -88,6 +92,7 @@ var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
GzipResourceTracker: {Default: false, PreRelease: featuregate.Alpha},
ZstdResourceTracker: {Default: false, PreRelease: featuregate.Alpha},
ApplyOnce: {Default: false, PreRelease: featuregate.Alpha},
MultiStageComponentApply: {Default: false, PreRelease: featuregate.Alpha},
}
func init() {