diff --git a/apis/types/componentmanifest.go b/apis/types/componentmanifest.go index d30a2847d..af953cd5d 100644 --- a/apis/types/componentmanifest.go +++ b/apis/types/componentmanifest.go @@ -28,9 +28,11 @@ type ComponentManifest struct { RevisionName string RevisionHash string ExternalRevision string + // StandardWorkload contains K8s resource generated from "output" block of ComponentDefinition StandardWorkload *unstructured.Unstructured - Traits []*unstructured.Unstructured - Scopes []*corev1.ObjectReference + // Traits contains both resources generated from "outputs" block of ComponentDefinition and resources generated from TraitDefinition + Traits []*unstructured.Unstructured + Scopes []*corev1.ObjectReference // PackagedWorkloadResources contain all the workload related resources. It could be a Helm // Release, Git Repo or anything that can package and run a workload. diff --git a/pkg/appfile/appfile.go b/pkg/appfile/appfile.go index df74d6827..46594cda7 100644 --- a/pkg/appfile/appfile.go +++ b/pkg/appfile/appfile.go @@ -99,22 +99,31 @@ func (wl *Workload) EvalContext(ctx process.Context) error { return wl.engine.Complete(ctx, wl.FullTemplate.TemplateStr, wl.Params) } +// GetTemplateContext get workload template context, it will be used to eval status and health +func (wl *Workload) GetTemplateContext(ctx process.Context, client client.Client, accessor util.NamespaceAccessor) (map[string]interface{}, error) { + // if the standard workload is managed by trait, just return empty context + if wl.SkipApplyWorkload { + return nil, nil + } + return wl.engine.GetTemplateContext(ctx, client, accessor) +} + // EvalStatus eval workload status -func (wl *Workload) EvalStatus(ctx process.Context, cli client.Client, accessor util.NamespaceAccessor) (string, error) { - // if the standard workload is managed by trait always return empty message +func (wl *Workload) EvalStatus(templateContext map[string]interface{}) (string, error) { + // if the standard workload is managed by trait always return empty message if wl.SkipApplyWorkload { return "", nil } - return wl.engine.Status(ctx, cli, accessor, wl.FullTemplate.CustomStatus, wl.Params) + return wl.engine.Status(templateContext, wl.FullTemplate.CustomStatus, wl.Params) } // EvalHealth eval workload health check -func (wl *Workload) EvalHealth(ctx process.Context, client client.Client, accessor util.NamespaceAccessor) (bool, error) { +func (wl *Workload) EvalHealth(templateContext map[string]interface{}) (bool, error) { // if health of template is not set or standard workload is managed by trait always return true if wl.SkipApplyWorkload { return true, nil } - return wl.engine.HealthCheck(ctx, client, accessor, wl.FullTemplate.Health, wl.Params) + return wl.engine.HealthCheck(templateContext, wl.FullTemplate.Health, wl.Params) } // Scope defines the scope of workload @@ -147,14 +156,19 @@ func (trait *Trait) EvalContext(ctx process.Context) error { return trait.engine.Complete(ctx, trait.Template, trait.Params) } +// GetTemplateContext get trait template context, it will be used to eval status and health +func (trait *Trait) GetTemplateContext(ctx process.Context, client client.Client, accessor util.NamespaceAccessor) (map[string]interface{}, error) { + return trait.engine.GetTemplateContext(ctx, client, accessor) +} + // EvalStatus eval trait status -func (trait *Trait) EvalStatus(ctx process.Context, cli client.Client, accessor util.NamespaceAccessor) (string, error) { - return trait.engine.Status(ctx, cli, accessor, trait.CustomStatusFormat, trait.Params) +func (trait *Trait) EvalStatus(templateContext map[string]interface{}) (string, error) { + return trait.engine.Status(templateContext, trait.CustomStatusFormat, trait.Params) } // EvalHealth eval trait health check -func (trait *Trait) EvalHealth(ctx process.Context, client client.Client, accessor util.NamespaceAccessor) (bool, error) { - return trait.engine.HealthCheck(ctx, client, accessor, trait.HealthCheckPolicy, trait.Params) +func (trait *Trait) EvalHealth(templateContext map[string]interface{}) (bool, error) { + return trait.engine.HealthCheck(templateContext, trait.HealthCheckPolicy, trait.Params) } // Appfile describes application diff --git a/pkg/controller/core.oam.dev/v1alpha2/application/apply.go b/pkg/controller/core.oam.dev/v1alpha2/application/apply.go index 4dc364669..b1a4a8e8f 100644 --- a/pkg/controller/core.oam.dev/v1alpha2/application/apply.go +++ b/pkg/controller/core.oam.dev/v1alpha2/application/apply.go @@ -228,7 +228,7 @@ func (h *AppHandler) ProduceArtifacts(ctx context.Context, comps []*types.Compon } // collectTraitHealthStatus collect trait health status -func (h *AppHandler) collectTraitHealthStatus(wl *appfile.Workload, tr *appfile.Trait, appRev *v1beta1.ApplicationRevision, overrideNamespace string) (common.ApplicationTraitStatus, error) { +func (h *AppHandler) collectTraitHealthStatus(wl *appfile.Workload, tr *appfile.Trait, appRev *v1beta1.ApplicationRevision, overrideNamespace string) (common.ApplicationTraitStatus, []*unstructured.Unstructured, error) { defer func(clusterName string) { wl.Ctx.SetCtx(pkgmulticluster.WithCluster(wl.Ctx.GetCtx(), clusterName)) }(multicluster.ClusterNameInContext(wl.Ctx.GetCtx())) @@ -248,22 +248,27 @@ func (h *AppHandler) collectTraitHealthStatus(wl *appfile.Workload, tr *appfile. pCtx.SetCtx(pkgmulticluster.WithCluster(pCtx.GetCtx(), pkgmulticluster.Local)) } _accessor := util.NewApplicationResourceNamespaceAccessor(h.app.Namespace, traitOverrideNamespace) - if ok, err := tr.EvalHealth(pCtx, h.r.Client, _accessor); !ok || err != nil { + templateContext, err := tr.GetTemplateContext(pCtx, h.r.Client, _accessor) + if err != nil { + return common.ApplicationTraitStatus{}, nil, errors.WithMessagef(err, "app=%s, comp=%s, trait=%s, get template context error", appName, wl.Name, tr.Name) + } + if ok, err := tr.EvalHealth(templateContext); !ok || err != nil { traitStatus.Healthy = false } - traitStatus.Message, err = tr.EvalStatus(pCtx, h.r.Client, _accessor) + traitStatus.Message, err = tr.EvalStatus(templateContext) if err != nil { - return common.ApplicationTraitStatus{}, errors.WithMessagef(err, "app=%s, comp=%s, trait=%s, evaluate status message error", appName, wl.Name, tr.Name) + return common.ApplicationTraitStatus{}, nil, errors.WithMessagef(err, "app=%s, comp=%s, trait=%s, evaluate status message error", appName, wl.Name, tr.Name) } - return traitStatus, nil + return traitStatus, extractOutputs(templateContext), nil } // collectWorkloadHealthStatus collect workload health status -func (h *AppHandler) collectWorkloadHealthStatus(ctx context.Context, wl *appfile.Workload, appRev *v1beta1.ApplicationRevision, status *common.ApplicationComponentStatus, accessor util.NamespaceAccessor) (bool, error) { +func (h *AppHandler) collectWorkloadHealthStatus(ctx context.Context, wl *appfile.Workload, appRev *v1beta1.ApplicationRevision, status *common.ApplicationComponentStatus, accessor util.NamespaceAccessor) (bool, *unstructured.Unstructured, []*unstructured.Unstructured, error) { + var output *unstructured.Unstructured + var outputs []*unstructured.Unstructured var ( appName = appRev.Spec.Application.Name isHealth = true - err error ) if wl.CapabilityCategory == types.TerraformCategory { var configuration terraforv1beta2.Configuration @@ -271,32 +276,40 @@ func (h *AppHandler) collectWorkloadHealthStatus(ctx context.Context, wl *appfil if kerrors.IsNotFound(err) { var legacyConfiguration terraforv1beta1.Configuration if err := h.r.Client.Get(ctx, client.ObjectKey{Name: wl.Name, Namespace: accessor.Namespace()}, &legacyConfiguration); err != nil { - return false, errors.WithMessagef(err, "app=%s, comp=%s, check health error", appName, wl.Name) + return false, nil, nil, errors.WithMessagef(err, "app=%s, comp=%s, check health error", appName, wl.Name) } isHealth = setStatus(status, legacyConfiguration.Status.ObservedGeneration, legacyConfiguration.Generation, legacyConfiguration.GetLabels(), appRev.Name, legacyConfiguration.Status.Apply.State, legacyConfiguration.Status.Apply.Message) } else { - return false, errors.WithMessagef(err, "app=%s, comp=%s, check health error", appName, wl.Name) + return false, nil, nil, errors.WithMessagef(err, "app=%s, comp=%s, check health error", appName, wl.Name) } } else { isHealth = setStatus(status, configuration.Status.ObservedGeneration, configuration.Generation, configuration.GetLabels(), appRev.Name, configuration.Status.Apply.State, configuration.Status.Apply.Message) } } else { - if ok, err := wl.EvalHealth(wl.Ctx, h.r.Client, accessor); !ok || err != nil { + templateContext, err := wl.GetTemplateContext(wl.Ctx, h.r.Client, accessor) + if err != nil { + return false, nil, nil, errors.WithMessagef(err, "app=%s, comp=%s, get template context error", appName, wl.Name) + } + if ok, err := wl.EvalHealth(templateContext); !ok || err != nil { isHealth = false } status.Healthy = isHealth - status.Message, err = wl.EvalStatus(wl.Ctx, h.r.Client, accessor) + status.Message, err = wl.EvalStatus(templateContext) if err != nil { - return false, errors.WithMessagef(err, "app=%s, comp=%s, evaluate workload status message error", appName, wl.Name) + return false, nil, nil, errors.WithMessagef(err, "app=%s, comp=%s, evaluate workload status message error", appName, wl.Name) } + output, outputs = extractOutputAndOutputs(templateContext) } - return isHealth, nil + return isHealth, output, outputs, nil } // nolint -func (h *AppHandler) collectHealthStatus(ctx context.Context, wl *appfile.Workload, appRev *v1beta1.ApplicationRevision, overrideNamespace string, skipWorkload bool, traitFilters ...TraitFilter) (*common.ApplicationComponentStatus, bool, error) { +// collectHealthStatus will collect health status of component, including component itself and traits. +func (h *AppHandler) collectHealthStatus(ctx context.Context, wl *appfile.Workload, appRev *v1beta1.ApplicationRevision, overrideNamespace string, skipWorkload bool, traitFilters ...TraitFilter) (*common.ApplicationComponentStatus, *unstructured.Unstructured, []*unstructured.Unstructured, bool, error) { + output := new(unstructured.Unstructured) + outputs := make([]*unstructured.Unstructured, 0) accessor := util.NewApplicationResourceNamespaceAccessor(h.app.Namespace, overrideNamespace) var ( status = common.ApplicationComponentStatus{ @@ -312,9 +325,9 @@ func (h *AppHandler) collectHealthStatus(ctx context.Context, wl *appfile.Worklo status = h.getServiceStatus(status) if !skipWorkload { - isHealth, err = h.collectWorkloadHealthStatus(ctx, wl, appRev, &status, accessor) + isHealth, output, outputs, err = h.collectWorkloadHealthStatus(ctx, wl, appRev, &status, accessor) if err != nil { - return nil, false, err + return nil, nil, nil, false, err } } @@ -328,10 +341,11 @@ collectNext: } } - traitStatus, err := h.collectTraitHealthStatus(wl, tr, appRev, overrideNamespace) + traitStatus, _outputs, err := h.collectTraitHealthStatus(wl, tr, appRev, overrideNamespace) if err != nil { - return nil, false, err + return nil, nil, nil, false, err } + outputs = append(outputs, _outputs...) isHealth = isHealth && traitStatus.Healthy if status.Message == "" && traitStatus.Message != "" { @@ -350,7 +364,7 @@ collectNext: status.Traits = append(status.Traits, traitStatusList...) status.Scopes = generateScopeReference(wl.Scopes) h.addServiceStatus(true, status) - return &status, isHealth, nil + return &status, output, outputs, isHealth, nil } func setStatus(status *common.ApplicationComponentStatus, observedGeneration, generation int64, labels map[string]string, @@ -434,3 +448,22 @@ func (h *AppHandler) ApplyPolicies(ctx context.Context, af *appfile.Appfile) err } return nil } + +func extractOutputAndOutputs(templateContext map[string]interface{}) (*unstructured.Unstructured, []*unstructured.Unstructured) { + output := new(unstructured.Unstructured) + if templateContext["output"] != nil { + output = &unstructured.Unstructured{Object: templateContext["output"].(map[string]interface{})} + } + outputs := extractOutputs(templateContext) + return output, outputs +} + +func extractOutputs(templateContext map[string]interface{}) []*unstructured.Unstructured { + outputs := make([]*unstructured.Unstructured, 0) + if templateContext["outputs"] != nil { + for _, v := range templateContext["outputs"].(map[string]interface{}) { + outputs = append(outputs, &unstructured.Unstructured{Object: v.(map[string]interface{})}) + } + } + return outputs +} diff --git a/pkg/controller/core.oam.dev/v1alpha2/application/dispatcher.go b/pkg/controller/core.oam.dev/v1alpha2/application/dispatcher.go index b0837b094..3b81559ea 100644 --- a/pkg/controller/core.oam.dev/v1alpha2/application/dispatcher.go +++ b/pkg/controller/core.oam.dev/v1alpha2/application/dispatcher.go @@ -127,7 +127,7 @@ func (h *AppHandler) generateDispatcher(appRev *v1beta1.ApplicationRevision, rea if !h.resourceKeeper.ContainsResources(manifests) { return false, nil } - _, isHealth, err := h.collectHealthStatus(ctx, wl, appRev, options.OverrideNamespace, skipWorkload, + _, _, _, isHealth, err := h.collectHealthStatus(ctx, wl, appRev, options.OverrideNamespace, skipWorkload, ByTraitType(readyTraits, options.Traits)) if err != nil { return false, err @@ -140,7 +140,7 @@ func (h *AppHandler) generateDispatcher(appRev *v1beta1.ApplicationRevision, rea if err := h.Dispatch(ctx, clusterName, common.WorkflowResourceCreator, dispatchManifests...); err != nil { return false, errors.WithMessage(err, "Dispatch") } - status, isHealth, err := h.collectHealthStatus(ctx, wl, appRev, options.OverrideNamespace, skipWorkload, + status, _, _, isHealth, err := h.collectHealthStatus(ctx, wl, appRev, options.OverrideNamespace, skipWorkload, ByTraitType(readyTraits, options.Traits)) if err != nil { return false, errors.WithMessage(err, "CollectHealthStatus") diff --git a/pkg/controller/core.oam.dev/v1alpha2/application/generator.go b/pkg/controller/core.oam.dev/v1alpha2/application/generator.go index 587f20356..8e945ef9d 100644 --- a/pkg/controller/core.oam.dev/v1alpha2/application/generator.go +++ b/pkg/controller/core.oam.dev/v1alpha2/application/generator.go @@ -307,20 +307,20 @@ func (h *AppHandler) renderComponentFunc(appParser *appfile.Parser, appRev *v1be } func (h *AppHandler) checkComponentHealth(appParser *appfile.Parser, appRev *v1beta1.ApplicationRevision, af *appfile.Appfile) oamProvider.ComponentHealthCheck { - return func(baseCtx context.Context, comp common.ApplicationComponent, patcher *value.Value, clusterName string, overrideNamespace string, env string) (bool, error) { + return func(baseCtx context.Context, comp common.ApplicationComponent, patcher *value.Value, clusterName string, overrideNamespace string, env string) (bool, *unstructured.Unstructured, []*unstructured.Unstructured, error) { ctx := multicluster.ContextWithClusterName(baseCtx, clusterName) ctx = contextWithComponentNamespace(ctx, overrideNamespace) ctx = contextWithReplicaKey(ctx, comp.ReplicaKey) wl, manifest, err := h.prepareWorkloadAndManifests(ctx, appParser, comp, appRev, patcher, af) if err != nil { - return false, err + return false, nil, nil, err } wl.Ctx.SetCtx(auth.ContextWithUserInfo(ctx, h.app)) readyWorkload, readyTraits, err := renderComponentsAndTraits(h.r.Client, manifest, appRev, clusterName, overrideNamespace, env) if err != nil { - return false, err + return false, nil, nil, err } checkSkipApplyWorkload(wl) @@ -329,11 +329,15 @@ func (h *AppHandler) checkComponentHealth(appParser *appfile.Parser, appRev *v1b dispatchResources = append([]*unstructured.Unstructured{readyWorkload}, readyTraits...) } if !h.resourceKeeper.ContainsResources(dispatchResources) { - return false, err + return false, nil, nil, err } - _, isHealth, err := h.collectHealthStatus(auth.ContextWithUserInfo(ctx, h.app), wl, appRev, overrideNamespace, false) - return isHealth, err + _, output, outputs, isHealth, err := h.collectHealthStatus(auth.ContextWithUserInfo(ctx, h.app), wl, appRev, overrideNamespace, false) + if err != nil { + return false, nil, nil, err + } + + return isHealth, output, outputs, err } } @@ -385,7 +389,7 @@ func (h *AppHandler) applyComponentFunc(appParser *appfile.Parser, appRev *v1bet if err := h.Dispatch(ctx, clusterName, common.WorkflowResourceCreator, dispatchResources...); err != nil { return nil, nil, false, errors.WithMessage(err, "Dispatch") } - _, isHealth, err = h.collectHealthStatus(ctx, wl, appRev, overrideNamespace, false) + _, _, _, isHealth, err = h.collectHealthStatus(ctx, wl, appRev, overrideNamespace, false) if err != nil { return nil, nil, false, errors.WithMessage(err, "CollectHealthStatus") } diff --git a/pkg/controller/core.oam.dev/v1alpha2/core/scopes/healthscope/healthscope.go b/pkg/controller/core.oam.dev/v1alpha2/core/scopes/healthscope/healthscope.go index bde1247d9..1945d9d59 100644 --- a/pkg/controller/core.oam.dev/v1alpha2/core/scopes/healthscope/healthscope.go +++ b/pkg/controller/core.oam.dev/v1alpha2/core/scopes/healthscope/healthscope.go @@ -481,7 +481,13 @@ func CUEBasedHealthCheck(ctx context.Context, c client.Client, wlRef WorkloadRef return } accessor := util.NewApplicationResourceNamespaceAccessor(ns, "") - isHealthy, err := wl.EvalHealth(pCtx, c, accessor) + templateContext, err := wl.GetTemplateContext(pCtx, c, accessor) + if err != nil { + wlHealth.HealthStatus = StatusUnhealthy + wlHealth.Diagnosis = errors.Wrap(err, errHealthCheck).Error() + return + } + isHealthy, err := wl.EvalHealth(templateContext) if err != nil { wlHealth.HealthStatus = StatusUnhealthy wlHealth.Diagnosis = errors.Wrap(err, errHealthCheck).Error() @@ -493,7 +499,7 @@ func CUEBasedHealthCheck(ctx context.Context, c client.Client, wlRef WorkloadRef // TODO(wonderflow): we should add a custom way to let the template say why it's unhealthy, only a bool flag is not enough wlHealth.HealthStatus = StatusUnhealthy } - wlHealth.CustomStatusMsg, err = wl.EvalStatus(pCtx, c, accessor) + wlHealth.CustomStatusMsg, err = wl.EvalStatus(templateContext) if err != nil { wlHealth.Diagnosis = errors.Wrap(err, errHealthCheck).Error() } @@ -526,7 +532,14 @@ func CUEBasedHealthCheck(ctx context.Context, c client.Client, wlRef WorkloadRef continue } accessor := util.NewApplicationResourceNamespaceAccessor("", ns) - isHealthy, err := tr.EvalHealth(pCtx, c, accessor) + templateContext, err := tr.GetTemplateContext(pCtx, c, accessor) + if err != nil { + tHealth.HealthStatus = StatusUnhealthy + tHealth.Diagnosis = errors.Wrap(err, errHealthCheck).Error() + traits[i] = tHealth + continue + } + isHealthy, err := tr.EvalHealth(templateContext) if err != nil { tHealth.HealthStatus = StatusUnhealthy tHealth.Diagnosis = errors.Wrap(err, errHealthCheck).Error() @@ -539,7 +552,7 @@ func CUEBasedHealthCheck(ctx context.Context, c client.Client, wlRef WorkloadRef // TODO(wonderflow): we should add a custom way to let the template say why it's unhealthy, only a bool flag is not enough tHealth.HealthStatus = StatusUnhealthy } - tHealth.CustomStatusMsg, err = tr.EvalStatus(pCtx, c, accessor) + tHealth.CustomStatusMsg, err = tr.EvalStatus(templateContext) if err != nil { tHealth.Diagnosis = errors.Wrap(err, errHealthCheck).Error() } diff --git a/pkg/cue/definition/template.go b/pkg/cue/definition/template.go index 80b00b8ad..5cd173d32 100644 --- a/pkg/cue/definition/template.go +++ b/pkg/cue/definition/template.go @@ -66,8 +66,9 @@ const ( // AbstractEngine defines Definition's Render interface type AbstractEngine interface { Complete(ctx process.Context, abstractTemplate string, params interface{}) error - HealthCheck(ctx process.Context, cli client.Client, accessor util.NamespaceAccessor, healthPolicyTemplate string, parameter interface{}) (bool, error) - Status(ctx process.Context, cli client.Client, accessor util.NamespaceAccessor, customStatusTemplate string, parameter interface{}) (string, error) + HealthCheck(templateContext map[string]interface{}, healthPolicyTemplate string, parameter interface{}) (bool, error) + Status(templateContext map[string]interface{}, customStatusTemplate string, parameter interface{}) (string, error) + GetTemplateContext(ctx process.Context, cli client.Client, accessor util.NamespaceAccessor) (map[string]interface{}, error) } type def struct { @@ -224,11 +225,7 @@ func formatRuntimeContext(templateContext map[string]interface{}, parameter inte } // HealthCheck address health check for workload -func (wd *workloadDef) HealthCheck(ctx process.Context, cli client.Client, accessor util.NamespaceAccessor, healthPolicyTemplate string, parameter interface{}) (bool, error) { - templateContext, err := wd.getTemplateContext(ctx, cli, accessor) - if err != nil { - return false, errors.WithMessage(err, "get template context") - } +func (wd *workloadDef) HealthCheck(templateContext map[string]interface{}, healthPolicyTemplate string, parameter interface{}) (bool, error) { return checkHealth(templateContext, healthPolicyTemplate, parameter) } @@ -251,11 +248,7 @@ func checkHealth(templateContext map[string]interface{}, healthPolicyTemplate st } // Status get workload status by customStatusTemplate -func (wd *workloadDef) Status(ctx process.Context, cli client.Client, accessor util.NamespaceAccessor, customStatusTemplate string, parameter interface{}) (string, error) { - templateContext, err := wd.getTemplateContext(ctx, cli, accessor) - if err != nil { - return "", errors.WithMessage(err, "get template context") - } +func (wd *workloadDef) Status(templateContext map[string]interface{}, customStatusTemplate string, parameter interface{}) (string, error) { return getStatusMessage(wd.pd, templateContext, customStatusTemplate, parameter) } @@ -280,6 +273,10 @@ func getStatusMessage(pd *packages.PackageDiscover, templateContext map[string]i return message, nil } +func (wd *workloadDef) GetTemplateContext(ctx process.Context, cli client.Client, accessor util.NamespaceAccessor) (map[string]interface{}, error) { + return wd.getTemplateContext(ctx, cli, accessor) +} + type traitDef struct { def } @@ -466,23 +463,19 @@ func (td *traitDef) getTemplateContext(ctx process.Context, cli client.Reader, a } // Status get trait status by customStatusTemplate -func (td *traitDef) Status(ctx process.Context, cli client.Client, accessor util.NamespaceAccessor, customStatusTemplate string, parameter interface{}) (string, error) { - templateContext, err := td.getTemplateContext(ctx, cli, accessor) - if err != nil { - return "", errors.WithMessage(err, "get template context") - } +func (td *traitDef) Status(templateContext map[string]interface{}, customStatusTemplate string, parameter interface{}) (string, error) { return getStatusMessage(td.pd, templateContext, customStatusTemplate, parameter) } // HealthCheck address health check for trait -func (td *traitDef) HealthCheck(ctx process.Context, cli client.Client, accessor util.NamespaceAccessor, healthPolicyTemplate string, parameter interface{}) (bool, error) { - templateContext, err := td.getTemplateContext(ctx, cli, accessor) - if err != nil { - return false, errors.WithMessage(err, "get template context") - } +func (td *traitDef) HealthCheck(templateContext map[string]interface{}, healthPolicyTemplate string, parameter interface{}) (bool, error) { return checkHealth(templateContext, healthPolicyTemplate, parameter) } +func (td *traitDef) GetTemplateContext(ctx process.Context, cli client.Client, accessor util.NamespaceAccessor) (map[string]interface{}, error) { + return td.getTemplateContext(ctx, cli, accessor) +} + func getResourceFromObj(ctx process.Context, obj *unstructured.Unstructured, client client.Reader, namespace string, labels map[string]string, outputsResource string) (map[string]interface{}, error) { if outputsResource != "" { labels[oam.TraitResource] = outputsResource diff --git a/pkg/workflow/providers/multicluster/deploy.go b/pkg/workflow/providers/multicluster/deploy.go index 7ff9c7067..12202f61a 100644 --- a/pkg/workflow/providers/multicluster/deploy.go +++ b/pkg/workflow/providers/multicluster/deploy.go @@ -20,8 +20,14 @@ import ( "context" "fmt" "strings" + "sync" + "github.com/kubevela/pkg/util/slices" + pkgsync "github.com/kubevela/pkg/util/sync" + "github.com/kubevela/workflow/pkg/cue/model/value" "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/oam-dev/kubevela/apis/core.oam.dev/common" @@ -29,48 +35,58 @@ import ( "github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1" "github.com/oam-dev/kubevela/apis/types" "github.com/oam-dev/kubevela/pkg/appfile" + "github.com/oam-dev/kubevela/pkg/oam" pkgpolicy "github.com/oam-dev/kubevela/pkg/policy" "github.com/oam-dev/kubevela/pkg/policy/envbinding" "github.com/oam-dev/kubevela/pkg/resourcekeeper" "github.com/oam-dev/kubevela/pkg/utils" velaerrors "github.com/oam-dev/kubevela/pkg/utils/errors" - "github.com/oam-dev/kubevela/pkg/utils/parallel" oamProvider "github.com/oam-dev/kubevela/pkg/workflow/providers/oam" ) +// DeployParameter is the parameter of deploy workflow step +type DeployParameter struct { + // Declare the policies that used for this deployment. If not specified, the components will be deployed to the hub cluster. + Policies []string + // Maximum number of concurrent delivered components. + Parallelism int64 + // If set false, this step will apply the components with the terraform workload. + IgnoreTerraformComponent bool +} + // DeployWorkflowStepExecutor executor to run deploy workflow step type DeployWorkflowStepExecutor interface { - Deploy(ctx context.Context, policyNames []string, parallelism int) (healthy bool, reason string, err error) + Deploy(ctx context.Context) (healthy bool, reason string, err error) } // NewDeployWorkflowStepExecutor . -func NewDeployWorkflowStepExecutor(cli client.Client, af *appfile.Appfile, apply oamProvider.ComponentApply, healthCheck oamProvider.ComponentHealthCheck, renderer oamProvider.WorkloadRenderer, ignoreTerraformComponent bool) DeployWorkflowStepExecutor { +func NewDeployWorkflowStepExecutor(cli client.Client, af *appfile.Appfile, apply oamProvider.ComponentApply, healthCheck oamProvider.ComponentHealthCheck, renderer oamProvider.WorkloadRenderer, parameter DeployParameter) DeployWorkflowStepExecutor { return &deployWorkflowStepExecutor{ - cli: cli, - af: af, - apply: apply, - healthCheck: healthCheck, - renderer: renderer, - ignoreTerraformComponent: ignoreTerraformComponent, + cli: cli, + af: af, + apply: apply, + healthCheck: healthCheck, + renderer: renderer, + parameter: parameter, } } type deployWorkflowStepExecutor struct { - cli client.Client - af *appfile.Appfile - apply oamProvider.ComponentApply - healthCheck oamProvider.ComponentHealthCheck - renderer oamProvider.WorkloadRenderer - ignoreTerraformComponent bool + cli client.Client + af *appfile.Appfile + apply oamProvider.ComponentApply + healthCheck oamProvider.ComponentHealthCheck + renderer oamProvider.WorkloadRenderer + parameter DeployParameter } // Deploy execute deploy workflow step -func (executor *deployWorkflowStepExecutor) Deploy(ctx context.Context, policyNames []string, parallelism int) (bool, string, error) { - policies, err := selectPolicies(executor.af.Policies, policyNames) +func (executor *deployWorkflowStepExecutor) Deploy(ctx context.Context) (bool, string, error) { + policies, err := selectPolicies(executor.af.Policies, executor.parameter.Policies) if err != nil { return false, "", err } - components, err := loadComponents(ctx, executor.renderer, executor.cli, executor.af, executor.af.Components, executor.ignoreTerraformComponent) + components, err := loadComponents(ctx, executor.renderer, executor.cli, executor.af, executor.af.Components, executor.parameter.IgnoreTerraformComponent) if err != nil { return false, "", err } @@ -88,7 +104,7 @@ func (executor *deployWorkflowStepExecutor) Deploy(ctx context.Context, policyNa if err != nil { return false, "", err } - return applyComponents(ctx, executor.apply, executor.healthCheck, components, placements, parallelism) + return applyComponents(ctx, executor.apply, executor.healthCheck, components, placements, int(executor.parameter.Parallelism)) } func selectPolicies(policies []v1beta1.AppPolicy, policyNames []string) ([]v1beta1.AppPolicy, error) { @@ -148,80 +164,233 @@ func overrideConfiguration(policies []v1beta1.AppPolicy, components []common.App return components, nil } +type valueBuilder func(s string) (*value.Value, error) + type applyTask struct { component common.ApplicationComponent placement v1alpha1.PlacementDecision + healthy *bool } func (t *applyTask) key() string { - return fmt.Sprintf("%s/%s/%s", t.placement.Cluster, t.placement.Namespace, t.component.Name) + return fmt.Sprintf("%s/%s/%s/%s", t.placement.Cluster, t.placement.Namespace, t.component.ReplicaKey, t.component.Name) } -func (t *applyTask) dependents() []string { - var dependents []string - for _, dependent := range t.component.DependsOn { - dependents = append(dependents, fmt.Sprintf("%s/%s/%s", t.placement.Cluster, t.placement.Namespace, dependent)) +func (t *applyTask) varKey(v string) string { + return fmt.Sprintf("%s/%s/%s/%s", t.placement.Cluster, t.placement.Namespace, t.component.ReplicaKey, v) +} + +func (t *applyTask) varKeyWithoutReplica(v string) string { + return fmt.Sprintf("%s/%s/%s/%s", t.placement.Cluster, t.placement.Namespace, "", v) +} + +func (t *applyTask) getVar(from string, cache *pkgsync.Map[string, *value.Value]) *value.Value { + key := t.varKey(from) + keyWithNoReplica := t.varKeyWithoutReplica(from) + var val *value.Value + var ok bool + if val, ok = cache.Get(key); !ok { + if val, ok = cache.Get(keyWithNoReplica); !ok { + return nil + } } - return dependents + return val +} + +func (t *applyTask) fillInputs(inputs *pkgsync.Map[string, *value.Value], build valueBuilder) error { + if len(t.component.Inputs) == 0 { + return nil + } + + x, err := component2Value(t.component, build) + if err != nil { + return err + } + + for _, input := range t.component.Inputs { + var inputVal *value.Value + if inputVal = t.getVar(input.From, inputs); inputVal == nil { + return fmt.Errorf("input %s is not ready", input) + } + + err = x.FillValueByScript(inputVal, fieldPathToComponent(input.ParameterKey)) + if err != nil { + return errors.Wrap(err, "fill value to component") + } + } + newComp, err := value2Component(x) + if err != nil { + return err + } + t.component = *newComp + return nil +} + +func (t *applyTask) generateOutput(output *unstructured.Unstructured, outputs []*unstructured.Unstructured, cache *pkgsync.Map[string, *value.Value], build valueBuilder) error { + if len(t.component.Outputs) == 0 { + return nil + } + + var cueString string + if output != nil { + outputJSON, err := output.MarshalJSON() + if err != nil { + return errors.Wrap(err, "marshal output") + } + cueString += fmt.Sprintf("output:%s\n", string(outputJSON)) + } + componentVal, err := build(cueString) + if err != nil { + return errors.Wrap(err, "create cue value from component") + } + + for _, os := range outputs { + name := os.GetLabels()[oam.TraitResource] + if name != "" { + if err := componentVal.FillObject(os.Object, "outputs", name); err != nil { + return errors.WithMessage(err, "FillOutputs") + } + } + } + + for _, o := range t.component.Outputs { + pathToSetVar := t.varKey(o.Name) + actualOutput, err := componentVal.LookupValue(o.ValueFrom) + if err != nil { + return errors.Wrap(err, "lookup output") + } + cache.Set(pathToSetVar, actualOutput) + } + return nil +} + +func (t *applyTask) allDependsReady(healthyMap map[string]bool) bool { + for _, d := range t.component.DependsOn { + dKey := fmt.Sprintf("%s/%s/%s/%s", t.placement.Cluster, t.placement.Namespace, t.component.ReplicaKey, d) + dKeyWithoutReplica := fmt.Sprintf("%s/%s/%s/%s", t.placement.Cluster, t.placement.Namespace, "", d) + if !healthyMap[dKey] && !healthyMap[dKeyWithoutReplica] { + return false + } + } + return true +} + +func (t *applyTask) allInputReady(cache *pkgsync.Map[string, *value.Value]) bool { + for _, in := range t.component.Inputs { + if val := t.getVar(in.From, cache); val == nil { + return false + } + } + + return true } type applyTaskResult struct { healthy bool err error + task *applyTask } +// applyComponents will apply components to placements. func applyComponents(ctx context.Context, apply oamProvider.ComponentApply, healthCheck oamProvider.ComponentHealthCheck, components []common.ApplicationComponent, placements []v1alpha1.PlacementDecision, parallelism int) (bool, string, error) { var tasks []*applyTask + var cache = pkgsync.NewMap[string, *value.Value]() + rootValue, err := value.NewValue("{}", nil, "") + if err != nil { + return false, "", err + } + var cueMutex sync.Mutex + var makeValue = func(s string) (*value.Value, error) { + cueMutex.Lock() + defer cueMutex.Unlock() + return rootValue.MakeValue(s) + } + + taskHealthyMap := map[string]bool{} for _, comp := range components { for _, pl := range placements { tasks = append(tasks, &applyTask{component: comp, placement: pl}) } } - healthCheckResults := parallel.Run(func(task *applyTask) *applyTaskResult { - healthy, err := healthCheck(ctx, task.component, nil, task.placement.Cluster, task.placement.Namespace, "") - return &applyTaskResult{healthy: healthy, err: err} - }, tasks, parallelism).([]*applyTaskResult) - taskHealthyMap := map[string]bool{} - for i, res := range healthCheckResults { - taskHealthyMap[tasks[i].key()] = res.healthy + unhealthyResults := make([]*applyTaskResult, 0) + maxHealthCheckTimes := len(tasks) +HealthCheck: + for i := 0; i < maxHealthCheckTimes; i++ { + checkTasks := make([]*applyTask, 0) + for _, task := range tasks { + if task.healthy == nil && task.allDependsReady(taskHealthyMap) && task.allInputReady(cache) { + task.healthy = new(bool) + err := task.fillInputs(cache, makeValue) + if err != nil { + taskHealthyMap[task.key()] = false + unhealthyResults = append(unhealthyResults, &applyTaskResult{healthy: false, err: err, task: task}) + continue + } + checkTasks = append(checkTasks, task) + } + } + if len(checkTasks) == 0 { + break HealthCheck + } + checkResults := slices.ParMap[*applyTask, *applyTaskResult](checkTasks, func(task *applyTask) *applyTaskResult { + healthy, output, outputs, err := healthCheck(ctx, task.component, nil, task.placement.Cluster, task.placement.Namespace, "") + task.healthy = pointer.Bool(healthy) + if healthy { + err = task.generateOutput(output, outputs, cache, makeValue) + } + return &applyTaskResult{healthy: healthy, err: err, task: task} + }, slices.Parallelism(parallelism)) + + for _, res := range checkResults { + taskHealthyMap[res.task.key()] = res.healthy + if !res.healthy || res.err != nil { + unhealthyResults = append(unhealthyResults, res) + } + } } var pendingTasks []*applyTask var todoTasks []*applyTask + for _, task := range tasks { if healthy, ok := taskHealthyMap[task.key()]; healthy && ok { continue } - pending := false - for _, dep := range task.dependents() { - if healthy, ok := taskHealthyMap[dep]; ok && !healthy { - pending = true - break - } - } - if pending { - pendingTasks = append(pendingTasks, task) - } else { + if task.allDependsReady(taskHealthyMap) && task.allInputReady(cache) { todoTasks = append(todoTasks, task) + } else { + pendingTasks = append(pendingTasks, task) } } var results []*applyTaskResult if len(todoTasks) > 0 { - results = parallel.Run(func(task *applyTask) *applyTaskResult { + results = slices.ParMap[*applyTask, *applyTaskResult](todoTasks, func(task *applyTask) *applyTaskResult { + err := task.fillInputs(cache, makeValue) + if err != nil { + return &applyTaskResult{healthy: false, err: err, task: task} + } _, _, healthy, err := apply(ctx, task.component, nil, task.placement.Cluster, task.placement.Namespace, "") - return &applyTaskResult{healthy: healthy, err: err} - }, todoTasks, parallelism).([]*applyTaskResult) + if err != nil { + return &applyTaskResult{healthy: healthy, err: err, task: task} + } + return &applyTaskResult{healthy: healthy, err: err, task: task} + }, slices.Parallelism(parallelism)) } var errs []error var allHealthy = true var reasons []string - for i, res := range results { + for _, res := range unhealthyResults { if res.err != nil { - errs = append(errs, fmt.Errorf("error encountered in cluster %s: %w", todoTasks[i].placement.Cluster, res.err)) + errs = append(errs, fmt.Errorf("error health check from %s: %w", res.task.key(), res.err)) + } + } + for _, res := range results { + if res.err != nil { + errs = append(errs, fmt.Errorf("error encountered in cluster %s: %w", res.task.placement.Cluster, res.err)) } if !res.healthy { allHealthy = false - reasons = append(reasons, fmt.Sprintf("%s is not healthy", todoTasks[i].key())) + reasons = append(reasons, fmt.Sprintf("%s is not healthy", res.task.key())) } } @@ -231,3 +400,36 @@ func applyComponents(ctx context.Context, apply oamProvider.ComponentApply, heal return allHealthy && len(pendingTasks) == 0, strings.Join(reasons, ","), velaerrors.AggregateErrors(errs) } + +func fieldPathToComponent(input string) string { + return fmt.Sprintf("properties.%s", strings.TrimSpace(input)) +} + +func component2Value(comp common.ApplicationComponent, build valueBuilder) (*value.Value, error) { + x, err := build("") + if err != nil { + return nil, err + } + err = x.FillObject(comp, "") + if err != nil { + return nil, err + } + // Component.ReplicaKey have no json tag, so we need to set it manually + err = x.FillObject(comp.ReplicaKey, "replicaKey") + if err != nil { + return nil, err + } + return x, nil +} + +func value2Component(v *value.Value) (*common.ApplicationComponent, error) { + var comp common.ApplicationComponent + err := v.UnmarshalTo(&comp) + if err != nil { + return nil, err + } + if rk, err := v.GetString("replicaKey"); err == nil { + comp.ReplicaKey = rk + } + return &comp, nil +} diff --git a/pkg/workflow/providers/multicluster/deploy_test.go b/pkg/workflow/providers/multicluster/deploy_test.go index 4b1c3bceb..657e06c55 100644 --- a/pkg/workflow/providers/multicluster/deploy_test.go +++ b/pkg/workflow/providers/multicluster/deploy_test.go @@ -28,11 +28,14 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + workflowv1alpha1 "github.com/kubevela/workflow/api/v1alpha1" "github.com/kubevela/workflow/pkg/cue/model/value" apicommon "github.com/oam-dev/kubevela/apis/core.oam.dev/common" + "github.com/oam-dev/kubevela/apis/core.oam.dev/v1alpha1" "github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1" + "github.com/oam-dev/kubevela/pkg/oam" ) func TestOverrideConfiguration(t *testing.T) { @@ -91,7 +94,7 @@ func TestOverrideConfiguration(t *testing.T) { } } -func TestApplyComponents(t *testing.T) { +func TestApplyComponentsDepends(t *testing.T) { r := require.New(t) const n, m = 50, 5 var components []apicommon.ApplicationComponent @@ -116,9 +119,9 @@ func TestApplyComponents(t *testing.T) { applyMap.Store(fmt.Sprintf("%s/%s", clusterName, comp.Name), true) return nil, nil, true, nil } - healthCheck := func(_ context.Context, comp apicommon.ApplicationComponent, patcher *value.Value, clusterName string, overrideNamespace string, env string) (bool, error) { + healthCheck := func(_ context.Context, comp apicommon.ApplicationComponent, patcher *value.Value, clusterName string, overrideNamespace string, env string) (bool, *unstructured.Unstructured, []*unstructured.Unstructured, error) { _, found := applyMap.Load(fmt.Sprintf("%s/%s", clusterName, comp.Name)) - return found, nil + return found, nil, nil, nil } parallelism := 10 @@ -146,3 +149,262 @@ func TestApplyComponents(t *testing.T) { r.True(healthy) r.Equal(3*n*m, countMap()) } + +func TestApplyComponentsIO(t *testing.T) { + r := require.New(t) + + var ( + parallelism = 10 + applyMap = new(sync.Map) + ctx = context.Background() + ) + apply := func(_ context.Context, comp apicommon.ApplicationComponent, patcher *value.Value, clusterName string, overrideNamespace string, env string) (*unstructured.Unstructured, []*unstructured.Unstructured, bool, error) { + time.Sleep(time.Duration(rand.Intn(200)+25) * time.Millisecond) + applyMap.Store(fmt.Sprintf("%s/%s", clusterName, comp.Name), true) + return nil, nil, true, nil + } + healthCheck := func(_ context.Context, comp apicommon.ApplicationComponent, patcher *value.Value, clusterName string, overrideNamespace string, env string) (bool, *unstructured.Unstructured, []*unstructured.Unstructured, error) { + _, found := applyMap.Load(fmt.Sprintf("%s/%s", clusterName, comp.Name)) + return found, &unstructured.Unstructured{Object: map[string]interface{}{ + "spec": map[string]interface{}{ + "path": fmt.Sprintf("%s/%s", clusterName, comp.Name), + }, + }}, []*unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "metadata": map[string]interface{}{ + "labels": map[string]interface{}{ + oam.TraitResource: "obj", + }, + }, + "spec": map[string]interface{}{ + "path": fmt.Sprintf("%s/%s", clusterName, comp.Name), + }, + }, + }, + }, nil + } + + resetStore := func() { + applyMap = &sync.Map{} + } + countMap := func() int { + cnt := 0 + applyMap.Range(func(key, value interface{}) bool { + cnt++ + return true + }) + return cnt + } + + t.Run("apply components with io successfully", func(t *testing.T) { + resetStore() + const n, m = 10, 5 + var components []apicommon.ApplicationComponent + var placements []v1alpha1.PlacementDecision + for i := 0; i < n; i++ { + comp := apicommon.ApplicationComponent{ + Name: fmt.Sprintf("comp-%d", i), + Properties: &runtime.RawExtension{Raw: []byte(fmt.Sprintf(`{"placeholder":%d}`, i))}, + } + if i != 0 { + comp.Inputs = workflowv1alpha1.StepInputs{ + { + ParameterKey: "input_slot_1", + From: fmt.Sprintf("var-output-%d", i-1), + }, + { + ParameterKey: "input_slot_2", + From: fmt.Sprintf("var-outputs-%d", i-1), + }, + } + } + if i != n-1 { + comp.Outputs = workflowv1alpha1.StepOutputs{ + { + ValueFrom: "output.spec.path", + Name: fmt.Sprintf("var-output-%d", i), + }, + { + ValueFrom: "outputs.obj.spec.path", + Name: fmt.Sprintf("var-outputs-%d", i), + }, + } + } + components = append(components, comp) + } + for i := 0; i < m; i++ { + placements = append(placements, v1alpha1.PlacementDecision{Cluster: fmt.Sprintf("cluster-%d", i)}) + } + + for i := 0; i < n; i++ { + healthy, _, err := applyComponents(ctx, apply, healthCheck, components, placements, parallelism) + r.NoError(err) + r.Equal((i+1)*m, countMap()) + if i == n-1 { + r.True(healthy) + } else { + r.False(healthy) + } + } + }) + + t.Run("apply components with io failed", func(t *testing.T) { + resetStore() + components := []apicommon.ApplicationComponent{ + { + Name: "comp-0", + Outputs: workflowv1alpha1.StepOutputs{ + { + ValueFrom: "output.spec.error_path", + Name: "var1", + }, + }, + }, + { + Name: "comp-1", + Inputs: workflowv1alpha1.StepInputs{ + { + ParameterKey: "input_slot_1", + From: "var1", + }, + }, + }, + } + placements := []v1alpha1.PlacementDecision{ + {Cluster: "cluster-0"}, + } + healthy, _, err := applyComponents(ctx, apply, healthCheck, components, placements, parallelism) + r.NoError(err) + r.False(healthy) + healthy, _, err = applyComponents(ctx, apply, healthCheck, components, placements, parallelism) + r.ErrorContains(err, "failed to lookup value") + r.False(healthy) + }) + + t.Run("apply components with io and replication", func(t *testing.T) { + // comp-0 ---> comp1-beijing --> comp2-beijing + // |-> comp1-shanghai --> comp2-shanghai + resetStore() + storeKey := func(clusterName string, comp apicommon.ApplicationComponent) string { + return fmt.Sprintf("%s/%s/%s", clusterName, comp.Name, comp.ReplicaKey) + } + type applyResult struct { + output *unstructured.Unstructured + outputs []*unstructured.Unstructured + } + apply := func(_ context.Context, comp apicommon.ApplicationComponent, patcher *value.Value, clusterName string, overrideNamespace string, env string) (*unstructured.Unstructured, []*unstructured.Unstructured, bool, error) { + time.Sleep(time.Duration(rand.Intn(200)+25) * time.Millisecond) + key := storeKey(clusterName, comp) + result := applyResult{ + output: &unstructured.Unstructured{Object: map[string]interface{}{ + "spec": map[string]interface{}{ + "path": key, + "anotherPath": key, + }, + }}, outputs: []*unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "metadata": map[string]interface{}{ + "labels": map[string]interface{}{ + oam.TraitResource: "obj", + }, + }, + "spec": map[string]interface{}{ + "path": key, + }, + }, + }, + }, + } + applyMap.Store(storeKey(clusterName, comp), result) + return nil, nil, true, nil + } + healthCheck := func(_ context.Context, comp apicommon.ApplicationComponent, patcher *value.Value, clusterName string, overrideNamespace string, env string) (bool, *unstructured.Unstructured, []*unstructured.Unstructured, error) { + key := storeKey(clusterName, comp) + r, found := applyMap.Load(key) + result, _ := r.(applyResult) + return found, result.output, result.outputs, nil + } + + inputSlot := "input_slot" + components := []apicommon.ApplicationComponent{ + { + Name: "comp-0", + Outputs: workflowv1alpha1.StepOutputs{ + { + ValueFrom: "output.spec.path", + Name: "var1", + }, + }, + }, + { + Name: "comp-1", + Inputs: workflowv1alpha1.StepInputs{ + { + ParameterKey: inputSlot, + From: "var1", + }, + }, + Outputs: workflowv1alpha1.StepOutputs{ + { + ValueFrom: "output.spec.anotherPath", + Name: "var2", + }, + }, + ReplicaKey: "beijing", + }, + { + Name: "comp-1", + Inputs: workflowv1alpha1.StepInputs{ + { + ParameterKey: inputSlot, + From: "var1", + }, + }, + Outputs: workflowv1alpha1.StepOutputs{ + { + ValueFrom: "output.spec.anotherPath", + Name: "var2", + }, + }, + ReplicaKey: "shanghai", + }, + { + Name: "comp-2", + Inputs: workflowv1alpha1.StepInputs{ + { + ParameterKey: inputSlot, + From: "var2", + }, + }, + ReplicaKey: "beijing", + }, + { + Name: "comp-2", + Inputs: workflowv1alpha1.StepInputs{ + { + ParameterKey: inputSlot, + From: "var2", + }, + }, + ReplicaKey: "shanghai", + }, + } + placements := []v1alpha1.PlacementDecision{ + {Cluster: "cluster-0"}, + } + healthy, _, err := applyComponents(ctx, apply, healthCheck, components, placements, parallelism) + r.NoError(err) + r.False(healthy) + + healthy, _, err = applyComponents(ctx, apply, healthCheck, components, placements, parallelism) + r.NoError(err) + r.False(healthy) + + healthy, _, err = applyComponents(ctx, apply, healthCheck, components, placements, parallelism) + r.NoError(err) + r.True(healthy) + + }) +} diff --git a/pkg/workflow/providers/multicluster/multicluster.go b/pkg/workflow/providers/multicluster/multicluster.go index 8ff7c2a94..a7e1a5ed0 100644 --- a/pkg/workflow/providers/multicluster/multicluster.go +++ b/pkg/workflow/providers/multicluster/multicluster.go @@ -171,7 +171,7 @@ func (p *provider) ListClusters(ctx monitorContext.Context, wfCtx wfContext.Cont return v.FillObject(clusters, "outputs", "clusters") } -func (p *provider) Deploy(ctx monitorContext.Context, wfCtx wfContext.Context, v *value.Value, act wfTypes.Action) error { +func (p *provider) Deploy(ctx monitorContext.Context, _ wfContext.Context, v *value.Value, act wfTypes.Action) error { policyNames, err := v.GetStringSlice("policies") if err != nil { return err @@ -187,8 +187,13 @@ func (p *provider) Deploy(ctx monitorContext.Context, wfCtx wfContext.Context, v if err != nil { return err } - executor := NewDeployWorkflowStepExecutor(p.Client, p.af, p.apply, p.healthCheck, p.renderer, ignoreTerraformComponent) - healthy, reason, err := executor.Deploy(ctx, policyNames, int(parallelism)) + param := DeployParameter{ + Policies: policyNames, + Parallelism: parallelism, + IgnoreTerraformComponent: ignoreTerraformComponent, + } + executor := NewDeployWorkflowStepExecutor(p.Client, p.af, p.apply, p.healthCheck, p.renderer, param) + healthy, reason, err := executor.Deploy(ctx) if err != nil { return err } diff --git a/pkg/workflow/providers/oam/apply.go b/pkg/workflow/providers/oam/apply.go index 7fc4a4c3c..7298c2803 100644 --- a/pkg/workflow/providers/oam/apply.go +++ b/pkg/workflow/providers/oam/apply.go @@ -50,7 +50,7 @@ type ComponentApply func(ctx context.Context, comp common.ApplicationComponent, type ComponentRender func(ctx context.Context, comp common.ApplicationComponent, patcher *value.Value, clusterName string, overrideNamespace string, env string) (*unstructured.Unstructured, []*unstructured.Unstructured, error) // ComponentHealthCheck health check oam component. -type ComponentHealthCheck func(ctx context.Context, comp common.ApplicationComponent, patcher *value.Value, clusterName string, overrideNamespace string, env string) (bool, error) +type ComponentHealthCheck func(ctx context.Context, comp common.ApplicationComponent, patcher *value.Value, clusterName string, overrideNamespace string, env string) (bool, *unstructured.Unstructured, []*unstructured.Unstructured, error) // WorkloadRenderer renderer to render application component into workload type WorkloadRenderer func(ctx context.Context, comp common.ApplicationComponent) (*appfile.Workload, error) diff --git a/test/e2e-multicluster-test/multicluster_test.go b/test/e2e-multicluster-test/multicluster_test.go index 8285e9e16..1a586e866 100644 --- a/test/e2e-multicluster-test/multicluster_test.go +++ b/test/e2e-multicluster-test/multicluster_test.go @@ -784,5 +784,47 @@ var _ = Describe("Test multicluster scenario", func() { g.Expect(kerrors.IsNotFound(k8sClient.Get(hubCtx, deployKey, deploy))).Should(BeTrue()) }, 20*time.Second).Should(Succeed()) }) + + It("Test application with input/output in deploy step", func() { + By("create application") + bs, err := os.ReadFile("./testdata/app/app-deploy-io.yaml") + Expect(err).Should(Succeed()) + app := &v1beta1.Application{} + Expect(yaml.Unmarshal(bs, app)).Should(Succeed()) + app.SetNamespace(namespace) + Expect(k8sClient.Create(hubCtx, app)).Should(Succeed()) + Eventually(func(g Gomega) { + g.Expect(k8sClient.Get(hubCtx, client.ObjectKeyFromObject(app), app)).Should(Succeed()) + g.Expect(app.Status.Phase).Should(Equal(common.ApplicationRunning)) + }, 30*time.Second).Should(Succeed()) + + By("Check input/output work properly") + cm := &corev1.ConfigMap{} + cmKey := client.ObjectKey{Namespace: namespace, Name: "deployment-msg"} + var ( + ipLocal string + ipWorker string + ) + Eventually(func(g Gomega) { + g.Expect(k8sClient.Get(hubCtx, cmKey, cm)).Should(Succeed()) + g.Expect(cm.Data["msg"]).Should(Equal("Deployment has minimum availability.")) + ipLocal = cm.Data["ip"] + g.Expect(ipLocal).ShouldNot(BeEmpty()) + }, 20*time.Second).Should(Succeed()) + Eventually(func(g Gomega) { + g.Expect(k8sClient.Get(workerCtx, cmKey, cm)).Should(Succeed()) + g.Expect(cm.Data["msg"]).Should(Equal("Deployment has minimum availability.")) + ipWorker = cm.Data["ip"] + g.Expect(ipWorker).ShouldNot(BeEmpty()) + }, 20*time.Second).Should(Succeed()) + Expect(ipLocal).ShouldNot(Equal(ipWorker)) + + By("delete application") + appKey := client.ObjectKeyFromObject(app) + Expect(k8sClient.Delete(hubCtx, app)).Should(Succeed()) + Eventually(func(g Gomega) { + g.Expect(kerrors.IsNotFound(k8sClient.Get(hubCtx, appKey, app))).Should(BeTrue()) + }, 20*time.Second).Should(Succeed()) + }) }) }) diff --git a/test/e2e-multicluster-test/testdata/app/app-deploy-io.yaml b/test/e2e-multicluster-test/testdata/app/app-deploy-io.yaml new file mode 100644 index 000000000..45015de58 --- /dev/null +++ b/test/e2e-multicluster-test/testdata/app/app-deploy-io.yaml @@ -0,0 +1,49 @@ +apiVersion: core.oam.dev/v1beta1 +kind: Application +metadata: + name: app-deploy-io +spec: + components: + - name: podinfo + outputs: + - name: message + valueFrom: output.status.conditions[0].message + - name: ip + valueFrom: outputs.service.spec.clusterIP + + properties: + image: stefanprodan/podinfo:4.0.3 + type: webservice + traits: + - type: expose + properties: + port: [ 80 ] + - name: configmap + properties: + apiVersion: v1 + kind: ConfigMap + metadata: + name: deployment-msg + type: raw + inputs: + - from: message + parameterKey: data.msg + - from: ip + parameterKey: data.ip + policies: + - name: topo + properties: + clusters: [ "local","cluster-worker" ] + type: topology + - name: override + properties: + selector: + - configmap + - podinfo + type: override + workflow: + steps: + - name: deploy + properties: + policies: [ "topo", "override" ] + type: deploy