Feat: post dispatch output context (#7008)
Some checks failed
Webhook Upgrade Validation / webhook-upgrade-check (push) Failing after 1m46s

* exploring context data passing

Signed-off-by: Amit Singh <singhamitch@outlook.com>
Signed-off-by: semmet95 <singhamitch@outlook.com>
Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* adds output status fetch logic

Signed-off-by: Amit Singh <singhamitch@outlook.com>
Signed-off-by: semmet95 <singhamitch@outlook.com>
Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* fix: standardize  import in dispatcher.

Signed-off-by: Chaitanyareddy0702 <chaitanyareddy0702@gmail.com>
Signed-off-by: semmet95 <singhamitch@outlook.com>
Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* feat: Allow  traits to access workload output status in CUE context

Signed-off-by: Chaitanyareddy0702 <chaitanyareddy0702@gmail.com>
Signed-off-by: semmet95 <singhamitch@outlook.com>
Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* feat: Implement PostDispatch traits that apply after component health is confirmed.

Signed-off-by: Chaitanyareddy0702 <chaitanyareddy0702@gmail.com>
Signed-off-by: semmet95 <singhamitch@outlook.com>
Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* feat: Refactor  trait handling and status propagation in application dispatch.

Signed-off-by: Chaitanyareddy0702 <chaitanyareddy0702@gmail.com>
Signed-off-by: semmet95 <singhamitch@outlook.com>
Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* fix: run make reviewable

Signed-off-by: Chaitanyareddy0702 <chaitanyareddy0702@gmail.com>
Signed-off-by: semmet95 <singhamitch@outlook.com>
Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* feat: Implement and document PostDispatch traits, applying them after component health is confirmed and guarded by a feature flag, along with new example applications.

Signed-off-by: Chaitanyareddy0702 <chaitanyareddy0702@gmail.com>
Signed-off-by: semmet95 <singhamitch@outlook.com>
Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* feat: Add comments

Signed-off-by: Chaitanyareddy0702 <chaitanyareddy0702@gmail.com>
Signed-off-by: semmet95 <singhamitch@outlook.com>
Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* Fix: Restore the status field in ctx.

Signed-off-by: Vaibhav Agrawal <vaibhav.agrawal0096@gmail.com>
Signed-off-by: semmet95 <singhamitch@outlook.com>
Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* Fix: Error for evaluating the status of the trait

Signed-off-by: Chaitanyareddy0702 <chaitanyareddy0702@gmail.com>
Signed-off-by: semmet95 <singhamitch@outlook.com>
Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* refactor: removes minor unnecessary changes

Signed-off-by: Amit Singh <singhamitch@outlook.com>
Signed-off-by: semmet95 <singhamitch@outlook.com>
Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* refactor: minor linter changes

Signed-off-by: Amit Singh <singhamitch@outlook.com>
Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* test: Add comprehensive tests for PostDispatch traits and their status handling

Signed-off-by: Reetika Malhotra <malhotra.reetika25@gmail.com>
Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* Fix: Increase multi-cluster test time

Signed-off-by: Amit Singh <singhamitch@outlook.com>
Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* Chore: Add focus and print the application status

Signed-off-by: Amit Singh <singhamitch@outlook.com>
Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* Chore: print deployment status in the multicluster test

Signed-off-by: Amit Singh <singhamitch@outlook.com>
Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* Chore: add labels for the deployment

Signed-off-by: Amit Singh <singhamitch@outlook.com>
Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* debugging test failure

Signed-off-by: Amit Singh <singhamitch@outlook.com>
Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* debugging test failure by updating multi cluster ctx

Signed-off-by: Amit Singh <singhamitch@outlook.com>
Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* undoes multi cluster ctx change

Signed-off-by: Amit Singh <singhamitch@outlook.com>
Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* Feat: enable MultiStageComponentApply feature by default

Signed-off-by: Chaitanya Reddy Onteddu <chaitanyareddy0702@gmail.com>
Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* Feat: implement post-dispatch traits application in workflow states

Signed-off-by: Amit Singh <singhamitch@outlook.com>
Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* Chore: remove unnecessary blank lines in application_controller.go

Signed-off-by: Amit Singh <singhamitch@outlook.com>
Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* Feat: enhance output readiness handling in health checks

Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* Feat: add logic to determine need for post-dispatch outputs in workload processing

Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* Feat: enhance output extraction and dependency checking for post-dispatch traits

Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* fix code to exclude validation of post dispatch trait in webhook

Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* fix code to exclude validation of post dispatch trait in webhook

Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* commit for running the test again

Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* commit for running the test again

Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* commit for running the test again

Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* triggering checks

Signed-off-by: Amit Singh <singhamitch@outlook.com>

* chore: adds explanation comments

Signed-off-by: Amit Singh <singhamitch@outlook.com>

* chore: adds errors to context

Signed-off-by: Amit Singh <singhamitch@outlook.com>

* chore: minor improvements

Signed-off-by: Amit Singh <singhamitch@outlook.com>

* fix: update output handling for pending PostDispatch traits

Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* fix: improve output handling for PostDispatch traits in deploy process

Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* fix: streamline output handling in PostDispatch process

Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* chore: commit to re run the pipeline

Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* chore: commit to re run the pipeline

Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* chore: commit to re run the pipeline

Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* fix: enhance output status handling in PostDispatch context for multi-stage support

Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* chore: commit to re run the pipeline

Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* fix: increase timeout for PostDispatch trait verification in tests

Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* fix: enhance output status handling in PostDispatch context for multi-stage support

Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

* chore: commit to re run the pipeline

Signed-off-by: Vishal Kumar <vishal210893@gmail.com>

---------

Signed-off-by: Amit Singh <singhamitch@outlook.com>
Signed-off-by: semmet95 <singhamitch@outlook.com>
Signed-off-by: Vishal Kumar <vishal210893@gmail.com>
Signed-off-by: Chaitanyareddy0702 <chaitanyareddy0702@gmail.com>
Signed-off-by: Vaibhav Agrawal <vaibhav.agrawal0096@gmail.com>
Signed-off-by: Reetika Malhotra <malhotra.reetika25@gmail.com>
Signed-off-by: Chaitanya Reddy Onteddu <chaitanyareddy0702@gmail.com>
Co-authored-by: Chitanya Reddy Onteddu <chaitanyareddy0702@gmail.com>
Co-authored-by: Vishal Kumar <vishal210893@gmail.com>
This commit is contained in:
Amit Singh
2026-01-14 15:58:13 +05:30
committed by GitHub
parent 432ffd3ddd
commit 0b85d55e68
12 changed files with 1513 additions and 17 deletions

View File

@@ -27,6 +27,8 @@ import (
"github.com/kubevela/workflow/pkg/cue/model/value"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
"github.com/oam-dev/kubevela/pkg/features"
"github.com/pkg/errors"
@@ -64,6 +66,12 @@ func (p *Parser) ValidateCUESchematicAppfile(a *Appfile) error {
if tr.CapabilityCategory != types.CUECategory {
continue
}
if tr.FullTemplate != nil &&
tr.FullTemplate.TraitDefinition.Spec.Stage == v1beta1.PostDispatch {
// PostDispatch type trait validation at this point might fail as they could have
// references to fields that are populated/injected during runtime only
continue
}
if err := tr.EvalContext(pCtx); err != nil {
return errors.WithMessagef(err, "cannot evaluate trait %q", tr.Name)
}

View File

@@ -222,8 +222,25 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
workflowInstance.Status.Phase = workflowState
app.Status.Workflow = workflow.ConvertWorkflowStatus(workflowInstance.Status, app.Status.Workflow.AppRevision)
logCtx.Info(fmt.Sprintf("Workflow return state=%s", workflowState))
postDispatchApplied := false
applyPostDispatchTraits := func() error {
if postDispatchApplied || !feature.DefaultMutableFeatureGate.Enabled(features.MultiStageComponentApply) {
return nil
}
if err := handler.applyPostDispatchTraits(logCtx, appParser, appFile); err != nil {
logCtx.Error(err, "Failed to apply PostDispatch traits")
r.Recorder.Event(app, event.Warning(velatypes.ReasonFailedApply, err))
return err
}
app.Status.AppliedResources = handler.appliedResources
postDispatchApplied = true
return nil
}
switch workflowState {
case workflowv1alpha1.WorkflowStateSuspending:
if err := applyPostDispatchTraits(); err != nil {
return r.endWithNegativeCondition(logCtx, app, condition.ReconcileError(err), common.ApplicationWorkflowSuspending)
}
if duration := workflowExecutor.GetSuspendBackoffWaitTime(); duration > 0 {
_, err = r.gcResourceTrackers(logCtx, handler, common.ApplicationWorkflowSuspending, false, workflowUpdated)
return r.result(err).requeue(duration).ret()
@@ -243,6 +260,9 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
}
return r.gcResourceTrackers(logCtx, handler, common.ApplicationWorkflowFailed, false, workflowUpdated)
case workflowv1alpha1.WorkflowStateExecuting:
if err := applyPostDispatchTraits(); err != nil {
return r.endWithNegativeCondition(logCtx, app, condition.ReconcileError(err), common.ApplicationRunningWorkflow)
}
_, err = r.gcResourceTrackers(logCtx, handler, common.ApplicationRunningWorkflow, false, workflowUpdated)
return r.result(err).requeue(workflowExecutor.GetBackoffWaitTime()).ret()
case workflowv1alpha1.WorkflowStateSucceeded:
@@ -250,6 +270,9 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
r.doWorkflowFinish(logCtx, app, handler, workflowState)
}
case workflowv1alpha1.WorkflowStateSkipped:
if err := applyPostDispatchTraits(); err != nil {
return r.endWithNegativeCondition(logCtx, app, condition.ReconcileError(err), common.ApplicationRunningWorkflow)
}
return r.result(nil).requeue(workflowExecutor.GetBackoffWaitTime()).ret()
default:
}
@@ -260,6 +283,11 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
phase = common.ApplicationUnhealthy
}
// Apply PostDispatch traits for healthy components if not already done in workflow requeue branch
if err := applyPostDispatchTraits(); err != nil {
return r.endWithNegativeCondition(logCtx, app, condition.ReconcileError(err), phase)
}
r.stateKeep(logCtx, handler, app)
opts := []resourcekeeper.GCOption{

View File

@@ -36,6 +36,7 @@ import (
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
"github.com/oam-dev/kubevela/apis/types"
"github.com/oam-dev/kubevela/pkg/appfile"
velaprocess "github.com/oam-dev/kubevela/pkg/cue/process"
"github.com/oam-dev/kubevela/pkg/monitor/metrics"
"github.com/oam-dev/kubevela/pkg/multicluster"
"github.com/oam-dev/kubevela/pkg/oam"
@@ -430,9 +431,130 @@ func extractOutputAndOutputs(templateContext map[string]interface{}) (*unstructu
func extractOutputs(templateContext map[string]interface{}) []*unstructured.Unstructured {
outputs := make([]*unstructured.Unstructured, 0)
if templateContext["outputs"] != nil {
for _, v := range templateContext["outputs"].(map[string]interface{}) {
outputs = append(outputs, &unstructured.Unstructured{Object: v.(map[string]interface{})})
for k, v := range templateContext["outputs"].(map[string]interface{}) {
obj := &unstructured.Unstructured{Object: v.(map[string]interface{})}
labels := obj.GetLabels()
if labels == nil {
labels = map[string]string{}
}
if labels[oam.TraitResource] == "" && k != "" {
labels[oam.TraitResource] = k
}
obj.SetLabels(labels)
outputs = append(outputs, obj)
}
}
return outputs
}
// applyPostDispatchTraits applies PostDispatch stage traits for healthy components.
// This is called after the workflow succeeds and component health is confirmed.
func (h *AppHandler) applyPostDispatchTraits(ctx monitorContext.Context, appParser *appfile.Parser, af *appfile.Appfile) error {
for _, svc := range h.services {
if !svc.Healthy {
continue
}
// Find the component spec
var comp common.ApplicationComponent
found := false
for _, c := range h.app.Spec.Components {
if c.Name == svc.Name {
comp = c
found = true
break
}
}
if !found {
continue
}
// Parse the component to get all traits
wl, err := appParser.ParseComponentFromRevisionAndClient(ctx.GetContext(), comp, h.currentAppRev)
if err != nil {
return errors.WithMessagef(err, "failed to parse component %s for PostDispatch traits", comp.Name)
}
// Filter to keep ONLY PostDispatch traits
var postDispatchTraits []*appfile.Trait
for _, trait := range wl.Traits {
if trait.FullTemplate.TraitDefinition.Spec.Stage == v1beta1.PostDispatch {
postDispatchTraits = append(postDispatchTraits, trait)
}
}
if len(postDispatchTraits) == 0 {
continue
}
wl.Traits = postDispatchTraits
// Generate manifest with context that includes live workload status
manifest, err := af.GenerateComponentManifest(wl, func(ctxData *velaprocess.ContextData) {
if svc.Namespace != "" {
ctxData.Namespace = svc.Namespace
}
if svc.Cluster != "" {
ctxData.Cluster = svc.Cluster
} else {
ctxData.Cluster = pkgmulticluster.Local
}
ctxData.ClusterVersion = multicluster.GetVersionInfoFromObject(
pkgmulticluster.WithCluster(ctx.GetContext(), types.ClusterLocalName),
h.Client,
ctxData.Cluster,
)
// Fetch live workload status for PostDispatch traits to use if it's created on the cluster
tempCtx := appfile.NewBasicContext(*ctxData, wl.Params)
if err := wl.EvalContext(tempCtx); err != nil {
ctx.Error(err, "failed to evaluate context for workload %s", wl.Name)
return
}
base, _ := tempCtx.Output()
componentWorkload, err := base.Unstructured()
if err != nil {
ctx.Error(err, "failed to unstructure base component generated using workload %s", wl.Name)
return
}
if componentWorkload.GetName() == "" {
componentWorkload.SetName(ctxData.CompName)
}
_ctx := util.WithCluster(tempCtx.GetCtx(), componentWorkload)
object, err := util.GetResourceFromObj(_ctx, tempCtx, componentWorkload, h.Client, ctxData.Namespace, map[string]string{
oam.LabelOAMResourceType: oam.ResourceTypeWorkload,
oam.LabelAppComponent: ctxData.CompName,
oam.LabelAppName: ctxData.AppName,
}, "")
if err != nil {
ctx.Error(err, "failed to fetch workload output resource %s from the cluster", componentWorkload.GetName())
return
}
ctxData.Output = object
})
if err != nil {
return errors.WithMessagef(err, "failed to generate manifest for PostDispatch traits of component %s", comp.Name)
}
// Render traits
_, readyTraits, err := renderComponentsAndTraits(manifest, h.currentAppRev, svc.Cluster, svc.Namespace)
if err != nil {
return errors.WithMessagef(err, "failed to render PostDispatch traits for component %s", comp.Name)
}
// Add app ownership labels
for _, trait := range readyTraits {
util.AddLabels(trait, map[string]string{
oam.LabelAppName: h.app.GetName(),
oam.LabelAppNamespace: h.app.GetNamespace(),
})
}
// Dispatch the traits
dispatchCtx := multicluster.ContextWithClusterName(ctx.GetContext(), svc.Cluster)
if err := h.Dispatch(dispatchCtx, h.Client, svc.Cluster, common.WorkflowResourceCreator, readyTraits...); err != nil {
return errors.WithMessagef(err, "failed to dispatch PostDispatch traits for component %s", comp.Name)
}
}
return nil
}

View File

@@ -430,6 +430,30 @@ func (h *AppHandler) prepareWorkloadAndManifests(ctx context.Context,
return nil, nil, errors.WithMessage(err, "ParseWorkload")
}
wl.Patch = patcher
// Add all traits to the workload if MultiStageComponentApply is disabled
if utilfeature.DefaultMutableFeatureGate.Enabled(features.MultiStageComponentApply) {
serviceHealthy := false
needPostDispatchOutputs := componentOutputsConsumed(comp, af.Components)
for _, svc := range h.services {
if svc.Name == comp.Name {
serviceHealthy = svc.Healthy
break
}
}
// not including PostDispatch type traits in the workload if the component service is not healthy
// because PostDispatch type traits might have references to fields that are only populated when the service is healthy
if !serviceHealthy && !needPostDispatchOutputs {
nonPostDispatchTraits := []*appfile.Trait{}
for _, trait := range wl.Traits {
if trait.FullTemplate.TraitDefinition.Spec.Stage != v1beta1.PostDispatch {
nonPostDispatchTraits = append(nonPostDispatchTraits, trait)
}
}
wl.Traits = nonPostDispatchTraits
}
}
manifest, err := af.GenerateComponentManifest(wl, func(ctxData *velaprocess.ContextData) {
if ns := componentNamespaceFromContext(ctx); ns != "" {
ctxData.Namespace = ns
@@ -444,6 +468,32 @@ func (h *AppHandler) prepareWorkloadAndManifests(ctx context.Context,
// cluster info are secrets stored in the control plane cluster
ctxData.ClusterVersion = multicluster.GetVersionInfoFromObject(pkgmulticluster.WithCluster(ctx, types.ClusterLocalName), h.Client, ctxData.Cluster)
ctxData.CompRevision, _ = ctrlutil.ComputeSpecHash(comp)
if utilfeature.DefaultMutableFeatureGate.Enabled(features.MultiStageComponentApply) {
// inject the main workload output as "output" in the context
tempCtx := appfile.NewBasicContext(*ctxData, wl.Params)
if err := wl.EvalContext(tempCtx); err != nil {
return
}
base, _ := tempCtx.Output()
componentWorkload, err := base.Unstructured()
if err != nil {
return
}
if componentWorkload.GetName() == "" {
componentWorkload.SetName(ctxData.CompName)
}
_ctx := util.WithCluster(tempCtx.GetCtx(), componentWorkload)
object, err := util.GetResourceFromObj(_ctx, tempCtx, componentWorkload, h.Client, ctxData.Namespace, map[string]string{
oam.LabelOAMResourceType: oam.ResourceTypeWorkload,
oam.LabelAppComponent: ctxData.CompName,
oam.LabelAppName: ctxData.AppName,
}, "")
if err != nil {
return
}
ctxData.Output = object
}
})
if err != nil {
return nil, nil, errors.WithMessage(err, "GenerateComponentManifest")
@@ -475,6 +525,31 @@ func renderComponentsAndTraits(manifest *types.ComponentManifest, appRev *v1beta
return readyWorkload, readyTraits, nil
}
// componentOutputsConsumed returns true if any other component depends on outputs produced
// from PostDispatch traits (valueFrom starting with "outputs.").
func componentOutputsConsumed(comp common.ApplicationComponent, components []common.ApplicationComponent) bool {
outputNames := map[string]struct{}{}
for _, o := range comp.Outputs {
if strings.HasPrefix(o.ValueFrom, "outputs.") {
outputNames[o.Name] = struct{}{}
}
}
if len(outputNames) == 0 {
return false
}
for _, c := range components {
if c.Name == comp.Name {
continue
}
for _, in := range c.Inputs {
if _, ok := outputNames[in.From]; ok {
return true
}
}
}
return false
}
func checkSkipApplyWorkload(comp *appfile.Component) {
for _, trait := range comp.Traits {
if trait.FullTemplate.TraitDefinition.Spec.ManageWorkload {

View File

@@ -23,7 +23,10 @@ import (
"sort"
"strings"
"k8s.io/apiserver/pkg/util/feature"
"github.com/oam-dev/kubevela/pkg/cue/definition/health"
"github.com/oam-dev/kubevela/pkg/features"
"github.com/kubevela/pkg/cue/cuex"
@@ -261,10 +264,24 @@ func (td *traitDef) Complete(ctx process.Context, abstractTemplate string, param
buff += fmt.Sprintf("%s: %s\n", velaprocess.ParameterFieldName, string(bt))
}
}
multiStageEnabled := feature.DefaultMutableFeatureGate.Enabled(features.MultiStageComponentApply)
var statusBytes []byte
if multiStageEnabled {
statusBytes = outputStatusBytes(ctx)
}
c, err := ctx.BaseContextFile()
if err != nil {
return err
}
// When multi-stage is enabled, merge the existing output.status from ctx into the
// base context so downstream CUE can reference it deterministically.
if multiStageEnabled {
c = injectOutputStatusIntoBaseContext(ctx, c, statusBytes)
}
buff += c
val, err := cuex.DefaultCompiler.Get().CompileString(ctx.GetCtx(), buff)
@@ -341,6 +358,56 @@ func (td *traitDef) Complete(ctx process.Context, abstractTemplate string, param
return nil
}
func outputStatusBytes(ctx process.Context) []byte {
var statusBytes []byte
var outputMap map[string]interface{}
if output := ctx.GetData(OutputFieldName); output != nil {
if m, ok := output.(map[string]interface{}); ok {
outputMap = m
} else if ptr, ok := output.(*interface{}); ok && ptr != nil {
if m, ok := (*ptr).(map[string]interface{}); ok {
outputMap = m
}
}
if outputMap != nil {
if status, ok := outputMap["status"]; ok {
if b, err := json.Marshal(status); err == nil {
statusBytes = b
}
}
}
}
return statusBytes
}
func injectOutputStatusIntoBaseContext(ctx process.Context, c string, statusBytes []byte) string {
if len(statusBytes) > 0 {
// If output is an empty object, replace it with only the status field without trailing comma.
emptyOutputMarker := "\"output\":{}"
if strings.Contains(c, emptyOutputMarker) {
replacement := fmt.Sprintf("\"output\":{\"status\":%s}", string(statusBytes))
c = strings.Replace(c, emptyOutputMarker, replacement, 1)
} else {
// Otherwise, insert status as the first field and keep the comma to separate from existing fields.
replacement := fmt.Sprintf("\"output\":{\"status\":%s,", string(statusBytes))
c = strings.Replace(c, "\"output\":{", replacement, 1)
}
// Restore the status field to the current output in ctx.data
var status interface{}
if err := json.Unmarshal(statusBytes, &status); err == nil {
if currentOutput := ctx.GetData(OutputFieldName); currentOutput != nil {
if currentMap, ok := currentOutput.(map[string]interface{}); ok {
currentMap["status"] = status
ctx.PushData(OutputFieldName, currentMap)
}
}
}
}
return c
}
func parseErrors(errs cue.Value) error {
if it, e := errs.List(); e == nil {
for it.Next() {
@@ -399,8 +466,8 @@ func (td *traitDef) getTemplateContext(ctx process.Context, cli client.Reader, a
baseLabels := GetBaseContextLabels(ctx)
var root = initRoot(baseLabels)
var commonLabels = GetCommonLabels(baseLabels)
_, assists := ctx.Output()
outputs := make(map[string]interface{})
for _, assist := range assists {
if assist.Type != td.name {

View File

@@ -50,6 +50,7 @@ type ContextData struct {
AppAnnotations map[string]string
ClusterVersion types.ClusterVersion
Output interface{}
}
// NewContext creates a new process context
@@ -75,6 +76,9 @@ func NewContext(data ContextData) process.Context {
ctx.PushData(ContextAppRevisionNum, revNum)
ctx.PushData(ContextCluster, data.Cluster)
ctx.PushData(ContextClusterVersion, parseClusterVersion(data.ClusterVersion))
if data.Output != nil {
ctx.PushData(OutputFieldName, data.Output)
}
return ctx
}

View File

@@ -138,7 +138,7 @@ var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
GzipResourceTracker: {Default: false, PreRelease: featuregate.Alpha},
ZstdResourceTracker: {Default: false, PreRelease: featuregate.Alpha},
ApplyOnce: {Default: false, PreRelease: featuregate.Alpha},
MultiStageComponentApply: {Default: false, PreRelease: featuregate.Alpha},
MultiStageComponentApply: {Default: true, PreRelease: featuregate.Alpha},
GzipApplicationRevision: {Default: false, PreRelease: featuregate.Alpha},
ZstdApplicationRevision: {Default: false, PreRelease: featuregate.Alpha},
PreDispatchDryRun: {Default: true, PreRelease: featuregate.Alpha},

View File

@@ -27,6 +27,9 @@ import (
"strings"
"github.com/davecgh/go-spew/spew"
"github.com/kubevela/pkg/multicluster"
"github.com/kubevela/workflow/pkg/cue/model"
"github.com/kubevela/workflow/pkg/cue/process"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -835,3 +838,42 @@ func (accessor *applicationResourceNamespaceAccessor) Namespace() string {
func NewApplicationResourceNamespaceAccessor(appNs, overrideNs string) NamespaceAccessor {
return &applicationResourceNamespaceAccessor{applicationNamespace: appNs, overrideNamespace: overrideNs}
}
func WithCluster(ctx context.Context, o client.Object) context.Context {
if cluster := oam.GetCluster(o); cluster != "" {
return multicluster.WithCluster(ctx, cluster)
}
return ctx
}
func GetResourceFromObj(ctx context.Context, pctx process.Context, obj *unstructured.Unstructured, client client.Reader, namespace string, labels map[string]string, outputsResource string) (map[string]interface{}, error) {
if outputsResource != "" {
labels[oam.TraitResource] = outputsResource
}
if obj.GetName() != "" {
u, err := GetObjectGivenGVKAndName(ctx, client, obj.GroupVersionKind(), namespace, obj.GetName())
if err != nil {
return nil, err
}
return u.Object, nil
}
if ctxName, ok := pctx.GetData(model.ContextName).(string); ok && ctxName != "" {
u, err := GetObjectGivenGVKAndName(ctx, client, obj.GroupVersionKind(), namespace, ctxName)
if err == nil {
return u.Object, nil
}
}
list, err := GetObjectsGivenGVKAndLabels(ctx, client, obj.GroupVersionKind(), namespace, labels)
if err != nil {
return nil, err
}
if len(list.Items) == 1 {
return list.Items[0].Object, nil
}
for _, v := range list.Items {
if v.GetLabels()[oam.TraitResource] == outputsResource {
return v.Object, nil
}
}
return nil, errors.Errorf("no resources found gvk(%v) labels(%v)", obj.GroupVersionKind(), labels)
}

View File

@@ -296,9 +296,12 @@ type applyTaskResult struct {
healthy bool
err error
task *applyTask
// outputReady indicates whether all declared outputs are ready
outputReady bool
}
// applyComponents will apply components to placements.
// nolint:gocyclo
func applyComponents(ctx context.Context, apply oamprovidertypes.ComponentApply, healthCheck oamprovidertypes.ComponentHealthCheck, components []common.ApplicationComponent, placements []v1alpha1.PlacementDecision, parallelism int) (bool, string, error) {
var tasks []*applyTask
var cache = pkgmaps.NewSyncMap[string, cue.Value]()
@@ -321,6 +324,8 @@ func applyComponents(ctx context.Context, apply oamprovidertypes.ComponentApply,
}
unhealthyResults := make([]*applyTaskResult, 0)
maxHealthCheckTimes := len(tasks)
outputNotReadyReasons := make([]string, 0)
outputsReady := true
HealthCheck:
for i := 0; i < maxHealthCheckTimes; i++ {
checkTasks := make([]*applyTask, 0)
@@ -343,13 +348,25 @@ HealthCheck:
healthy, _, output, outputs, err := healthCheck(ctx, task.component, nil, task.placement.Cluster, task.placement.Namespace)
task.healthy = ptr.To(healthy)
if healthy {
err = task.generateOutput(output, outputs, cache, makeValue)
if errOutput := task.generateOutput(output, outputs, cache, makeValue); errOutput != nil {
var notFound workflowerrors.LookUpNotFoundErr
if errors.As(errOutput, &notFound) && strings.HasPrefix(string(notFound), "outputs.") && len(outputs) == 0 {
// PostDispatch traits are not rendered/applied yet, so trait outputs are unavailable.
// Skip blocking the deploy step; the outputs will be populated after PostDispatch runs.
errOutput = nil
}
err = errOutput
}
}
return &applyTaskResult{healthy: healthy, err: err, task: task}
return &applyTaskResult{healthy: healthy, err: err, task: task, outputReady: true}
}, slices.Parallelism(parallelism))
for _, res := range checkResults {
taskHealthyMap[res.task.key()] = res.healthy
if !res.outputReady {
outputsReady = false
outputNotReadyReasons = append(outputNotReadyReasons, fmt.Sprintf("%s outputs not ready", res.task.key()))
}
if !res.healthy || res.err != nil {
unhealthyResults = append(unhealthyResults, res)
}
@@ -374,13 +391,13 @@ HealthCheck:
results = slices.ParMap[*applyTask, *applyTaskResult](todoTasks, func(task *applyTask) *applyTaskResult {
err := task.fillInputs(cache, makeValue)
if err != nil {
return &applyTaskResult{healthy: false, err: err, task: task}
return &applyTaskResult{healthy: false, err: err, task: task, outputReady: true}
}
_, _, healthy, err := apply(ctx, task.component, nil, task.placement.Cluster, task.placement.Namespace)
if err != nil {
return &applyTaskResult{healthy: healthy, err: err, task: task}
return &applyTaskResult{healthy: healthy, err: err, task: task, outputReady: true}
}
return &applyTaskResult{healthy: healthy, err: err, task: task}
return &applyTaskResult{healthy: healthy, err: err, task: task, outputReady: true}
}, slices.Parallelism(parallelism))
}
var errs []error
@@ -401,11 +418,13 @@ HealthCheck:
}
}
reasons = append(reasons, outputNotReadyReasons...)
for _, t := range pendingTasks {
reasons = append(reasons, fmt.Sprintf("%s is waiting dependents", t.key()))
}
return allHealthy && len(pendingTasks) == 0, strings.Join(reasons, ","), velaerrors.AggregateErrors(errs)
return allHealthy && outputsReady && len(pendingTasks) == 0, strings.Join(reasons, ","), velaerrors.AggregateErrors(errs)
}
func fieldPathToComponent(input string) string {

View File

@@ -296,9 +296,12 @@ type applyTaskResult struct {
healthy bool
err error
task *applyTask
// outputReady indicates whether all declared outputs are ready
outputReady bool
}
// applyComponents will apply components to placements.
// nolint:gocyclo
func applyComponents(ctx context.Context, apply oamprovidertypes.ComponentApply, healthCheck oamprovidertypes.ComponentHealthCheck, components []common.ApplicationComponent, placements []v1alpha1.PlacementDecision, parallelism int) (bool, string, error) {
var tasks []*applyTask
var cache = pkgmaps.NewSyncMap[string, cue.Value]()
@@ -321,6 +324,8 @@ func applyComponents(ctx context.Context, apply oamprovidertypes.ComponentApply,
}
unhealthyResults := make([]*applyTaskResult, 0)
maxHealthCheckTimes := len(tasks)
outputNotReadyReasons := make([]string, 0)
outputsReady := true
HealthCheck:
for i := 0; i < maxHealthCheckTimes; i++ {
checkTasks := make([]*applyTask, 0)
@@ -343,13 +348,25 @@ HealthCheck:
healthy, _, output, outputs, err := healthCheck(ctx, task.component, nil, task.placement.Cluster, task.placement.Namespace)
task.healthy = ptr.To(healthy)
if healthy {
err = task.generateOutput(output, outputs, cache, makeValue)
if errOutput := task.generateOutput(output, outputs, cache, makeValue); errOutput != nil {
var notFound workflowerrors.LookUpNotFoundErr
if errors.As(errOutput, &notFound) && strings.HasPrefix(string(notFound), "outputs.") && len(outputs) == 0 {
// PostDispatch traits are not rendered/applied yet, so trait outputs are unavailable.
// Skip blocking the deploy step; the outputs will be populated after PostDispatch runs.
errOutput = nil
}
err = errOutput
}
}
return &applyTaskResult{healthy: healthy, err: err, task: task}
return &applyTaskResult{healthy: healthy, err: err, task: task, outputReady: true}
}, slices.Parallelism(parallelism))
for _, res := range checkResults {
taskHealthyMap[res.task.key()] = res.healthy
if !res.outputReady {
outputsReady = false
outputNotReadyReasons = append(outputNotReadyReasons, fmt.Sprintf("%s outputs not ready", res.task.key()))
}
if !res.healthy || res.err != nil {
unhealthyResults = append(unhealthyResults, res)
}
@@ -374,13 +391,13 @@ HealthCheck:
results = slices.ParMap[*applyTask, *applyTaskResult](todoTasks, func(task *applyTask) *applyTaskResult {
err := task.fillInputs(cache, makeValue)
if err != nil {
return &applyTaskResult{healthy: false, err: err, task: task}
return &applyTaskResult{healthy: false, err: err, task: task, outputReady: true}
}
_, _, healthy, err := apply(ctx, task.component, nil, task.placement.Cluster, task.placement.Namespace)
if err != nil {
return &applyTaskResult{healthy: healthy, err: err, task: task}
return &applyTaskResult{healthy: healthy, err: err, task: task, outputReady: true}
}
return &applyTaskResult{healthy: healthy, err: err, task: task}
return &applyTaskResult{healthy: healthy, err: err, task: task, outputReady: true}
}, slices.Parallelism(parallelism))
}
var errs []error
@@ -401,11 +418,13 @@ HealthCheck:
}
}
reasons = append(reasons, outputNotReadyReasons...)
for _, t := range pendingTasks {
reasons = append(reasons, fmt.Sprintf("%s is waiting dependents", t.key()))
}
return allHealthy && len(pendingTasks) == 0, strings.Join(reasons, ","), velaerrors.AggregateErrors(errs)
return allHealthy && outputsReady && len(pendingTasks) == 0, strings.Join(reasons, ","), velaerrors.AggregateErrors(errs)
}
func fieldPathToComponent(input string) string {

View File

@@ -21,6 +21,7 @@ import (
"encoding/json"
"fmt"
"os"
"os/exec"
"strings"
"time"
@@ -478,7 +479,12 @@ var _ = Describe("Test multicluster scenario", func() {
Eventually(func(g Gomega) {
g.Expect(k8sClient.Get(hubCtx, client.ObjectKeyFromObject(app), app)).Should(Succeed())
g.Expect(app.Status.Phase).Should(Equal(common.ApplicationRunning))
}, 20*time.Second).Should(Succeed())
}, 10*time.Minute).Should(Succeed())
By("print application status")
out, err := exec.Command("kubectl", "describe", "application", app.Name, "-n", testNamespace).CombinedOutput()
Expect(err).Should(Succeed())
fmt.Println(string(out))
By("test dispatched resource")
svc := &corev1.Service{}
@@ -489,6 +495,7 @@ var _ = Describe("Test multicluster scenario", func() {
Expect(cm.Data["host"]).Should(Equal(host))
Expect(k8sClient.Get(workerCtx, client.ObjectKey{Namespace: testNamespace, Name: app.Name}, cm)).Should(Succeed())
Expect(cm.Data["host"]).Should(Equal(host))
})
It("Test application with workflow change will rerun", func() {

File diff suppressed because it is too large Load Diff