mirror of
https://github.com/kubevela/kubevela.git
synced 2026-05-06 01:17:09 +00:00
Feat: support data-passing in deploy step (#5161)
* Feat: support component data-passing in deploy step Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> * health check with input Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> * add union test for io and replication Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> * format Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> * format Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> * use pkg/slices.parMap Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> * reduce the GET request Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> * reviewable Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> * fix break test Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
This commit is contained in:
@@ -228,7 +228,7 @@ func (h *AppHandler) ProduceArtifacts(ctx context.Context, comps []*types.Compon
|
||||
}
|
||||
|
||||
// collectTraitHealthStatus collect trait health status
|
||||
func (h *AppHandler) collectTraitHealthStatus(wl *appfile.Workload, tr *appfile.Trait, appRev *v1beta1.ApplicationRevision, overrideNamespace string) (common.ApplicationTraitStatus, error) {
|
||||
func (h *AppHandler) collectTraitHealthStatus(wl *appfile.Workload, tr *appfile.Trait, appRev *v1beta1.ApplicationRevision, overrideNamespace string) (common.ApplicationTraitStatus, []*unstructured.Unstructured, error) {
|
||||
defer func(clusterName string) {
|
||||
wl.Ctx.SetCtx(pkgmulticluster.WithCluster(wl.Ctx.GetCtx(), clusterName))
|
||||
}(multicluster.ClusterNameInContext(wl.Ctx.GetCtx()))
|
||||
@@ -248,22 +248,27 @@ func (h *AppHandler) collectTraitHealthStatus(wl *appfile.Workload, tr *appfile.
|
||||
pCtx.SetCtx(pkgmulticluster.WithCluster(pCtx.GetCtx(), pkgmulticluster.Local))
|
||||
}
|
||||
_accessor := util.NewApplicationResourceNamespaceAccessor(h.app.Namespace, traitOverrideNamespace)
|
||||
if ok, err := tr.EvalHealth(pCtx, h.r.Client, _accessor); !ok || err != nil {
|
||||
templateContext, err := tr.GetTemplateContext(pCtx, h.r.Client, _accessor)
|
||||
if err != nil {
|
||||
return common.ApplicationTraitStatus{}, nil, errors.WithMessagef(err, "app=%s, comp=%s, trait=%s, get template context error", appName, wl.Name, tr.Name)
|
||||
}
|
||||
if ok, err := tr.EvalHealth(templateContext); !ok || err != nil {
|
||||
traitStatus.Healthy = false
|
||||
}
|
||||
traitStatus.Message, err = tr.EvalStatus(pCtx, h.r.Client, _accessor)
|
||||
traitStatus.Message, err = tr.EvalStatus(templateContext)
|
||||
if err != nil {
|
||||
return common.ApplicationTraitStatus{}, errors.WithMessagef(err, "app=%s, comp=%s, trait=%s, evaluate status message error", appName, wl.Name, tr.Name)
|
||||
return common.ApplicationTraitStatus{}, nil, errors.WithMessagef(err, "app=%s, comp=%s, trait=%s, evaluate status message error", appName, wl.Name, tr.Name)
|
||||
}
|
||||
return traitStatus, nil
|
||||
return traitStatus, extractOutputs(templateContext), nil
|
||||
}
|
||||
|
||||
// collectWorkloadHealthStatus collect workload health status
|
||||
func (h *AppHandler) collectWorkloadHealthStatus(ctx context.Context, wl *appfile.Workload, appRev *v1beta1.ApplicationRevision, status *common.ApplicationComponentStatus, accessor util.NamespaceAccessor) (bool, error) {
|
||||
func (h *AppHandler) collectWorkloadHealthStatus(ctx context.Context, wl *appfile.Workload, appRev *v1beta1.ApplicationRevision, status *common.ApplicationComponentStatus, accessor util.NamespaceAccessor) (bool, *unstructured.Unstructured, []*unstructured.Unstructured, error) {
|
||||
var output *unstructured.Unstructured
|
||||
var outputs []*unstructured.Unstructured
|
||||
var (
|
||||
appName = appRev.Spec.Application.Name
|
||||
isHealth = true
|
||||
err error
|
||||
)
|
||||
if wl.CapabilityCategory == types.TerraformCategory {
|
||||
var configuration terraforv1beta2.Configuration
|
||||
@@ -271,32 +276,40 @@ func (h *AppHandler) collectWorkloadHealthStatus(ctx context.Context, wl *appfil
|
||||
if kerrors.IsNotFound(err) {
|
||||
var legacyConfiguration terraforv1beta1.Configuration
|
||||
if err := h.r.Client.Get(ctx, client.ObjectKey{Name: wl.Name, Namespace: accessor.Namespace()}, &legacyConfiguration); err != nil {
|
||||
return false, errors.WithMessagef(err, "app=%s, comp=%s, check health error", appName, wl.Name)
|
||||
return false, nil, nil, errors.WithMessagef(err, "app=%s, comp=%s, check health error", appName, wl.Name)
|
||||
}
|
||||
isHealth = setStatus(status, legacyConfiguration.Status.ObservedGeneration, legacyConfiguration.Generation,
|
||||
legacyConfiguration.GetLabels(), appRev.Name, legacyConfiguration.Status.Apply.State, legacyConfiguration.Status.Apply.Message)
|
||||
} else {
|
||||
return false, errors.WithMessagef(err, "app=%s, comp=%s, check health error", appName, wl.Name)
|
||||
return false, nil, nil, errors.WithMessagef(err, "app=%s, comp=%s, check health error", appName, wl.Name)
|
||||
}
|
||||
} else {
|
||||
isHealth = setStatus(status, configuration.Status.ObservedGeneration, configuration.Generation, configuration.GetLabels(),
|
||||
appRev.Name, configuration.Status.Apply.State, configuration.Status.Apply.Message)
|
||||
}
|
||||
} else {
|
||||
if ok, err := wl.EvalHealth(wl.Ctx, h.r.Client, accessor); !ok || err != nil {
|
||||
templateContext, err := wl.GetTemplateContext(wl.Ctx, h.r.Client, accessor)
|
||||
if err != nil {
|
||||
return false, nil, nil, errors.WithMessagef(err, "app=%s, comp=%s, get template context error", appName, wl.Name)
|
||||
}
|
||||
if ok, err := wl.EvalHealth(templateContext); !ok || err != nil {
|
||||
isHealth = false
|
||||
}
|
||||
status.Healthy = isHealth
|
||||
status.Message, err = wl.EvalStatus(wl.Ctx, h.r.Client, accessor)
|
||||
status.Message, err = wl.EvalStatus(templateContext)
|
||||
if err != nil {
|
||||
return false, errors.WithMessagef(err, "app=%s, comp=%s, evaluate workload status message error", appName, wl.Name)
|
||||
return false, nil, nil, errors.WithMessagef(err, "app=%s, comp=%s, evaluate workload status message error", appName, wl.Name)
|
||||
}
|
||||
output, outputs = extractOutputAndOutputs(templateContext)
|
||||
}
|
||||
return isHealth, nil
|
||||
return isHealth, output, outputs, nil
|
||||
}
|
||||
|
||||
// nolint
|
||||
func (h *AppHandler) collectHealthStatus(ctx context.Context, wl *appfile.Workload, appRev *v1beta1.ApplicationRevision, overrideNamespace string, skipWorkload bool, traitFilters ...TraitFilter) (*common.ApplicationComponentStatus, bool, error) {
|
||||
// collectHealthStatus will collect health status of component, including component itself and traits.
|
||||
func (h *AppHandler) collectHealthStatus(ctx context.Context, wl *appfile.Workload, appRev *v1beta1.ApplicationRevision, overrideNamespace string, skipWorkload bool, traitFilters ...TraitFilter) (*common.ApplicationComponentStatus, *unstructured.Unstructured, []*unstructured.Unstructured, bool, error) {
|
||||
output := new(unstructured.Unstructured)
|
||||
outputs := make([]*unstructured.Unstructured, 0)
|
||||
accessor := util.NewApplicationResourceNamespaceAccessor(h.app.Namespace, overrideNamespace)
|
||||
var (
|
||||
status = common.ApplicationComponentStatus{
|
||||
@@ -312,9 +325,9 @@ func (h *AppHandler) collectHealthStatus(ctx context.Context, wl *appfile.Worklo
|
||||
|
||||
status = h.getServiceStatus(status)
|
||||
if !skipWorkload {
|
||||
isHealth, err = h.collectWorkloadHealthStatus(ctx, wl, appRev, &status, accessor)
|
||||
isHealth, output, outputs, err = h.collectWorkloadHealthStatus(ctx, wl, appRev, &status, accessor)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
return nil, nil, nil, false, err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -328,10 +341,11 @@ collectNext:
|
||||
}
|
||||
}
|
||||
|
||||
traitStatus, err := h.collectTraitHealthStatus(wl, tr, appRev, overrideNamespace)
|
||||
traitStatus, _outputs, err := h.collectTraitHealthStatus(wl, tr, appRev, overrideNamespace)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
return nil, nil, nil, false, err
|
||||
}
|
||||
outputs = append(outputs, _outputs...)
|
||||
|
||||
isHealth = isHealth && traitStatus.Healthy
|
||||
if status.Message == "" && traitStatus.Message != "" {
|
||||
@@ -350,7 +364,7 @@ collectNext:
|
||||
status.Traits = append(status.Traits, traitStatusList...)
|
||||
status.Scopes = generateScopeReference(wl.Scopes)
|
||||
h.addServiceStatus(true, status)
|
||||
return &status, isHealth, nil
|
||||
return &status, output, outputs, isHealth, nil
|
||||
}
|
||||
|
||||
func setStatus(status *common.ApplicationComponentStatus, observedGeneration, generation int64, labels map[string]string,
|
||||
@@ -434,3 +448,22 @@ func (h *AppHandler) ApplyPolicies(ctx context.Context, af *appfile.Appfile) err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func extractOutputAndOutputs(templateContext map[string]interface{}) (*unstructured.Unstructured, []*unstructured.Unstructured) {
|
||||
output := new(unstructured.Unstructured)
|
||||
if templateContext["output"] != nil {
|
||||
output = &unstructured.Unstructured{Object: templateContext["output"].(map[string]interface{})}
|
||||
}
|
||||
outputs := extractOutputs(templateContext)
|
||||
return output, outputs
|
||||
}
|
||||
|
||||
func extractOutputs(templateContext map[string]interface{}) []*unstructured.Unstructured {
|
||||
outputs := make([]*unstructured.Unstructured, 0)
|
||||
if templateContext["outputs"] != nil {
|
||||
for _, v := range templateContext["outputs"].(map[string]interface{}) {
|
||||
outputs = append(outputs, &unstructured.Unstructured{Object: v.(map[string]interface{})})
|
||||
}
|
||||
}
|
||||
return outputs
|
||||
}
|
||||
|
||||
@@ -127,7 +127,7 @@ func (h *AppHandler) generateDispatcher(appRev *v1beta1.ApplicationRevision, rea
|
||||
if !h.resourceKeeper.ContainsResources(manifests) {
|
||||
return false, nil
|
||||
}
|
||||
_, isHealth, err := h.collectHealthStatus(ctx, wl, appRev, options.OverrideNamespace, skipWorkload,
|
||||
_, _, _, isHealth, err := h.collectHealthStatus(ctx, wl, appRev, options.OverrideNamespace, skipWorkload,
|
||||
ByTraitType(readyTraits, options.Traits))
|
||||
if err != nil {
|
||||
return false, err
|
||||
@@ -140,7 +140,7 @@ func (h *AppHandler) generateDispatcher(appRev *v1beta1.ApplicationRevision, rea
|
||||
if err := h.Dispatch(ctx, clusterName, common.WorkflowResourceCreator, dispatchManifests...); err != nil {
|
||||
return false, errors.WithMessage(err, "Dispatch")
|
||||
}
|
||||
status, isHealth, err := h.collectHealthStatus(ctx, wl, appRev, options.OverrideNamespace, skipWorkload,
|
||||
status, _, _, isHealth, err := h.collectHealthStatus(ctx, wl, appRev, options.OverrideNamespace, skipWorkload,
|
||||
ByTraitType(readyTraits, options.Traits))
|
||||
if err != nil {
|
||||
return false, errors.WithMessage(err, "CollectHealthStatus")
|
||||
|
||||
@@ -307,20 +307,20 @@ func (h *AppHandler) renderComponentFunc(appParser *appfile.Parser, appRev *v1be
|
||||
}
|
||||
|
||||
func (h *AppHandler) checkComponentHealth(appParser *appfile.Parser, appRev *v1beta1.ApplicationRevision, af *appfile.Appfile) oamProvider.ComponentHealthCheck {
|
||||
return func(baseCtx context.Context, comp common.ApplicationComponent, patcher *value.Value, clusterName string, overrideNamespace string, env string) (bool, error) {
|
||||
return func(baseCtx context.Context, comp common.ApplicationComponent, patcher *value.Value, clusterName string, overrideNamespace string, env string) (bool, *unstructured.Unstructured, []*unstructured.Unstructured, error) {
|
||||
ctx := multicluster.ContextWithClusterName(baseCtx, clusterName)
|
||||
ctx = contextWithComponentNamespace(ctx, overrideNamespace)
|
||||
ctx = contextWithReplicaKey(ctx, comp.ReplicaKey)
|
||||
|
||||
wl, manifest, err := h.prepareWorkloadAndManifests(ctx, appParser, comp, appRev, patcher, af)
|
||||
if err != nil {
|
||||
return false, err
|
||||
return false, nil, nil, err
|
||||
}
|
||||
wl.Ctx.SetCtx(auth.ContextWithUserInfo(ctx, h.app))
|
||||
|
||||
readyWorkload, readyTraits, err := renderComponentsAndTraits(h.r.Client, manifest, appRev, clusterName, overrideNamespace, env)
|
||||
if err != nil {
|
||||
return false, err
|
||||
return false, nil, nil, err
|
||||
}
|
||||
checkSkipApplyWorkload(wl)
|
||||
|
||||
@@ -329,11 +329,15 @@ func (h *AppHandler) checkComponentHealth(appParser *appfile.Parser, appRev *v1b
|
||||
dispatchResources = append([]*unstructured.Unstructured{readyWorkload}, readyTraits...)
|
||||
}
|
||||
if !h.resourceKeeper.ContainsResources(dispatchResources) {
|
||||
return false, err
|
||||
return false, nil, nil, err
|
||||
}
|
||||
|
||||
_, isHealth, err := h.collectHealthStatus(auth.ContextWithUserInfo(ctx, h.app), wl, appRev, overrideNamespace, false)
|
||||
return isHealth, err
|
||||
_, output, outputs, isHealth, err := h.collectHealthStatus(auth.ContextWithUserInfo(ctx, h.app), wl, appRev, overrideNamespace, false)
|
||||
if err != nil {
|
||||
return false, nil, nil, err
|
||||
}
|
||||
|
||||
return isHealth, output, outputs, err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -385,7 +389,7 @@ func (h *AppHandler) applyComponentFunc(appParser *appfile.Parser, appRev *v1bet
|
||||
if err := h.Dispatch(ctx, clusterName, common.WorkflowResourceCreator, dispatchResources...); err != nil {
|
||||
return nil, nil, false, errors.WithMessage(err, "Dispatch")
|
||||
}
|
||||
_, isHealth, err = h.collectHealthStatus(ctx, wl, appRev, overrideNamespace, false)
|
||||
_, _, _, isHealth, err = h.collectHealthStatus(ctx, wl, appRev, overrideNamespace, false)
|
||||
if err != nil {
|
||||
return nil, nil, false, errors.WithMessage(err, "CollectHealthStatus")
|
||||
}
|
||||
|
||||
@@ -481,7 +481,13 @@ func CUEBasedHealthCheck(ctx context.Context, c client.Client, wlRef WorkloadRef
|
||||
return
|
||||
}
|
||||
accessor := util.NewApplicationResourceNamespaceAccessor(ns, "")
|
||||
isHealthy, err := wl.EvalHealth(pCtx, c, accessor)
|
||||
templateContext, err := wl.GetTemplateContext(pCtx, c, accessor)
|
||||
if err != nil {
|
||||
wlHealth.HealthStatus = StatusUnhealthy
|
||||
wlHealth.Diagnosis = errors.Wrap(err, errHealthCheck).Error()
|
||||
return
|
||||
}
|
||||
isHealthy, err := wl.EvalHealth(templateContext)
|
||||
if err != nil {
|
||||
wlHealth.HealthStatus = StatusUnhealthy
|
||||
wlHealth.Diagnosis = errors.Wrap(err, errHealthCheck).Error()
|
||||
@@ -493,7 +499,7 @@ func CUEBasedHealthCheck(ctx context.Context, c client.Client, wlRef WorkloadRef
|
||||
// TODO(wonderflow): we should add a custom way to let the template say why it's unhealthy, only a bool flag is not enough
|
||||
wlHealth.HealthStatus = StatusUnhealthy
|
||||
}
|
||||
wlHealth.CustomStatusMsg, err = wl.EvalStatus(pCtx, c, accessor)
|
||||
wlHealth.CustomStatusMsg, err = wl.EvalStatus(templateContext)
|
||||
if err != nil {
|
||||
wlHealth.Diagnosis = errors.Wrap(err, errHealthCheck).Error()
|
||||
}
|
||||
@@ -526,7 +532,14 @@ func CUEBasedHealthCheck(ctx context.Context, c client.Client, wlRef WorkloadRef
|
||||
continue
|
||||
}
|
||||
accessor := util.NewApplicationResourceNamespaceAccessor("", ns)
|
||||
isHealthy, err := tr.EvalHealth(pCtx, c, accessor)
|
||||
templateContext, err := tr.GetTemplateContext(pCtx, c, accessor)
|
||||
if err != nil {
|
||||
tHealth.HealthStatus = StatusUnhealthy
|
||||
tHealth.Diagnosis = errors.Wrap(err, errHealthCheck).Error()
|
||||
traits[i] = tHealth
|
||||
continue
|
||||
}
|
||||
isHealthy, err := tr.EvalHealth(templateContext)
|
||||
if err != nil {
|
||||
tHealth.HealthStatus = StatusUnhealthy
|
||||
tHealth.Diagnosis = errors.Wrap(err, errHealthCheck).Error()
|
||||
@@ -539,7 +552,7 @@ func CUEBasedHealthCheck(ctx context.Context, c client.Client, wlRef WorkloadRef
|
||||
// TODO(wonderflow): we should add a custom way to let the template say why it's unhealthy, only a bool flag is not enough
|
||||
tHealth.HealthStatus = StatusUnhealthy
|
||||
}
|
||||
tHealth.CustomStatusMsg, err = tr.EvalStatus(pCtx, c, accessor)
|
||||
tHealth.CustomStatusMsg, err = tr.EvalStatus(templateContext)
|
||||
if err != nil {
|
||||
tHealth.Diagnosis = errors.Wrap(err, errHealthCheck).Error()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user