diff --git a/pkg/controller/core.oam.dev/v1beta1/application/global_policy_cache.go b/pkg/controller/core.oam.dev/v1beta1/application/global_policy_cache.go index a2fa00366..55d62b755 100644 --- a/pkg/controller/core.oam.dev/v1beta1/application/global_policy_cache.go +++ b/pkg/controller/core.oam.dev/v1beta1/application/global_policy_cache.go @@ -41,6 +41,7 @@ type RenderedPolicyResult struct { Transforms interface{} // The transforms (*PolicyTransforms) from CUE template AdditionalContext map[string]interface{} // The additionalContext field from CUE template SkipReason string // Reason for skipping (if enabled=false or error) + Config *PolicyConfig // Policy configuration including refresh settings } // GlobalPolicyCacheEntry represents cached rendered results for an Application diff --git a/pkg/controller/core.oam.dev/v1beta1/application/policy_transforms.go b/pkg/controller/core.oam.dev/v1beta1/application/policy_transforms.go index 8b1561ec7..039cf30ca 100644 --- a/pkg/controller/core.oam.dev/v1beta1/application/policy_transforms.go +++ b/pkg/controller/core.oam.dev/v1beta1/application/policy_transforms.go @@ -77,6 +77,15 @@ func (h *AppHandler) ApplyApplicationScopeTransforms(ctx monitorContext.Context, // Clear previous global policy status app.Status.AppliedApplicationPolicies = nil + // CRITICAL: Compute Application hash BEFORE any policies run + // This hash is based on the original spec (not modified by policies yet) + // It's used for cache invalidation - only spec changes should invalidate cache + appHash, err := computeApplicationHash(app) + if err != nil { + ctx.Info("Failed to compute Application hash", "error", err) + appHash = "" // Continue without hash + } + // Step 1: Validate explicit policies are not global for _, policy := range app.Spec.Policies { if err := validateNotGlobalPolicy(ctx, h.Client, policy.Type, app.Namespace); err != nil { @@ -171,6 +180,9 @@ func (h *AppHandler) ApplyApplicationScopeTransforms(ctx monitorContext.Context, } } + // Track upstream policy outputs for cascade invalidation + var upstreamOutputs []*PolicyOutput + // Apply the rendered global policy results (either from cache or freshly rendered) for _, result := range globalRenderedResults { // Check feature gate for Application-scoped policies @@ -198,14 +210,30 @@ func (h *AppHandler) ApplyApplicationScopeTransforms(ctx monitorContext.Context, allPolicyChanges[result.PolicyName] = policyChanges } if result.Enabled { + // Compute cascade ID for this policy (hash of all upstream outputs) + cascadeID, err := computeCascadeID(upstreamOutputs) + if err != nil { + ctx.Info("Failed to compute cascade ID", "policy", result.PolicyName, "error", err) + cascadeID = "" + } + policyMetadata[result.PolicyName] = &policyConfigMapMetadata{ Name: result.PolicyName, Namespace: result.PolicyNamespace, Source: "global", Sequence: sequence, Priority: priority, + Config: result.Config, + CascadeID: cascadeID, } sequence++ // Increment sequence only if policy was applied (enabled=true) + + // Track this policy's output for cascade invalidation of downstream policies + if result.Transforms != nil { + if policyOutput, ok := result.Transforms.(*PolicyOutput); ok { + upstreamOutputs = append(upstreamOutputs, policyOutput) + } + } } } } else if shouldSkipGlobalPolicies(app) { @@ -215,7 +243,7 @@ func (h *AppHandler) ApplyApplicationScopeTransforms(ctx monitorContext.Context, } // Step 3: Apply explicit policies from Application spec - // These are NOT cached as they may have parameters specific to this Application + // Use same render+apply flow as global policies for caching and cascade tracking for _, policy := range app.Spec.Policies { // Load PolicyDefinition template templ, err := appfile.LoadTemplate(ctx, h.Client, policy.Type, types.TypePolicy, app.Annotations) @@ -240,70 +268,108 @@ func (h *AppHandler) ApplyApplicationScopeTransforms(ctx monitorContext.Context, ctx.Info("Applying explicit Application-scoped policy", "policy", policy.Type, "name", policy.Name) - // Render and apply (not cached - explicit policies can have unique parameters) - var changes *PolicyChanges - ctx, changes, err = h.applyPolicyTransform(ctx, app, policy, templ.PolicyDefinition) + // Render with upstream tracking (same as global policies) + result, err := h.renderPolicyWithUpstream(ctx, app, policy, templ.PolicyDefinition, upstreamOutputs) if err != nil { - // Record failure in status - recordApplicationPolicyStatus(app, policy.Name, templ.PolicyDefinition.Namespace, "explicit", sequence, 0, false, fmt.Sprintf("error: %s", err.Error()), nil) - return ctx, errors.Wrapf(err, "failed to apply transform from policy %s", policy.Type) + ctx.Info("Failed to render explicit policy, skipping", "policy", policy.Name, "error", err) + result.PolicyName = policy.Name + result.PolicyNamespace = templ.PolicyDefinition.Namespace + result.Enabled = false + result.SkipReason = fmt.Sprintf("render error: %s", err.Error()) } - // Record successful application in status - if changes != nil { - // SpecModified is already set correctly in applyPolicyTransform - allPolicyChanges[policy.Name] = changes // Store for ConfigMap serialization + // Apply the rendered result + priority := int32(0) // Explicit policies don't have priority + var policyChanges *PolicyChanges + ctx, policyChanges, err = h.applyRenderedPolicyResult(ctx, app, result, sequence, priority) + if err != nil { + return ctx, errors.Wrapf(err, "failed to apply explicit policy %s", policy.Name) } - recordApplicationPolicyStatus(app, policy.Name, templ.PolicyDefinition.Namespace, "explicit", sequence, 0, true, "", changes) - // Store metadata for ConfigMap - policyMetadata[policy.Name] = &policyConfigMapMetadata{ - Name: policy.Name, - Namespace: templ.PolicyDefinition.Namespace, - Source: "explicit", - Sequence: sequence, - Priority: 0, // Explicit policies don't have priority + // Store changes and metadata for ConfigMap storage + if policyChanges != nil { + allPolicyChanges[policy.Name] = policyChanges + } + if result.Enabled { + // Compute cascade ID for this policy (hash of all upstream outputs) + cascadeID, err := computeCascadeID(upstreamOutputs) + if err != nil { + ctx.Info("Failed to compute cascade ID", "policy", policy.Name, "error", err) + cascadeID = "" + } + + policyMetadata[policy.Name] = &policyConfigMapMetadata{ + Name: policy.Name, + Namespace: templ.PolicyDefinition.Namespace, + Source: "explicit", + Sequence: sequence, + Priority: priority, + Config: result.Config, + CascadeID: cascadeID, + } + sequence++ // Increment sequence only if policy was applied (enabled=true) + + // Track this policy's output for cascade invalidation of downstream policies + if result.Transforms != nil { + if policyOutput, ok := result.Transforms.(*PolicyOutput); ok { + upstreamOutputs = append(upstreamOutputs, policyOutput) + } + } } - sequence++ } // Step 4: Store all rendered policy outputs in ConfigMap for reuse and observability // This creates a persistent cache with TTL that can be used to avoid re-rendering orderedData := make(map[string]string) - // Compute hash of Application state (spec + metadata) for cache invalidation - appHash, err := computeApplicationHash(app) - if err != nil { - ctx.Info("Failed to compute Application hash", "error", err) - appHash = "" // Continue without hash - } + // Note: appHash was computed at the very beginning (before policies ran) + // This ensures we're hashing the original Application spec, not the modified version // Build ConfigMap data from metadata and changes tracked during reconciliation for policyName, metadata := range policyMetadata { - // TTL is no longer stored on PolicyDefinition - it's now part of config.refresh in the template - ttlSeconds := int32(-1) // Default for backwards compatibility with old format - - // Build the rendered policy record with everything needed to reapply it - policyRecord := map[string]interface{}{ - "policy": metadata.Name, - "namespace": metadata.Namespace, - "source": metadata.Source, - "sequence": metadata.Sequence, - "priority": metadata.Priority, - "rendered_at": time.Now().Format(time.RFC3339), - "ttl_seconds": ttlSeconds, - "enabled": true, - "application_hash": appHash, // Hash of Application for cache invalidation + // Get the full policy changes if available (includes output, labels, annotations, context) + policyChanges, hasPolicyChanges := allPolicyChanges[policyName] + if !hasPolicyChanges || policyChanges == nil || policyChanges.Output == nil { + // No output to cache - skip + continue } - // Get the full policy changes if available (includes output, labels, annotations, context) - if policyChanges, ok := allPolicyChanges[policyName]; ok && policyChanges != nil { + // Use new per-output-type cache record format if config is available + var policyJSON []byte + var err error + + if metadata.Config != nil { + // New format: per-output-type caching with refresh control + cacheRecord := createPolicyCacheRecord( + metadata.Name, + metadata.Namespace, + int(metadata.Priority), + metadata.Sequence, + appHash, + metadata.CascadeID, + policyChanges.Output, + metadata.Config, + ) + + policyJSON, err = json.MarshalIndent(cacheRecord, "", " ") + } else { + // Old format: monolithic cache (for backwards compatibility) + policyRecord := map[string]interface{}{ + "policy": metadata.Name, + "namespace": metadata.Namespace, + "source": metadata.Source, + "sequence": metadata.Sequence, + "priority": metadata.Priority, + "rendered_at": time.Now().Format(time.RFC3339), + "ttl_seconds": int32(-1), + "enabled": true, + "application_hash": appHash, + } + // Store the output object in reusable format - if policyChanges.Output != nil { - outputData := serializeOutputForStorage(policyChanges.Output) - if len(outputData) > 0 { - policyRecord["output"] = outputData - } + outputData := serializeOutputForStorage(policyChanges.Output) + if len(outputData) > 0 { + policyRecord["output"] = outputData } // Add additional context if available @@ -318,10 +384,10 @@ func (h *AppHandler) ApplyApplicationScopeTransforms(ctx monitorContext.Context, "spec_modified": policyChanges.SpecModified, "has_context": len(policyChanges.AdditionalContext) > 0, } + + policyJSON, err = json.MarshalIndent(policyRecord, "", " ") } - // Marshal to pretty JSON for human readability and tool consumption - policyJSON, err := json.MarshalIndent(policyRecord, "", " ") if err != nil { ctx.Info("Failed to marshal policy record", "policy", metadata.Name, "error", err) continue @@ -509,6 +575,11 @@ func (h *AppHandler) applyPolicyTransform(ctx monitorContext.Context, app *v1bet // renderPolicy renders a policy's CUE template and extracts the results for caching // Returns a RenderedPolicyResult that can be cached and reused func (h *AppHandler) renderPolicy(ctx monitorContext.Context, app *v1beta1.Application, policyRef v1beta1.AppPolicy, policyDef *v1beta1.PolicyDefinition) (RenderedPolicyResult, error) { + return h.renderPolicyWithUpstream(ctx, app, policyRef, policyDef, nil) +} + +// renderPolicyWithUpstream renders a policy with upstream output tracking for cascade invalidation +func (h *AppHandler) renderPolicyWithUpstream(ctx monitorContext.Context, app *v1beta1.Application, policyRef v1beta1.AppPolicy, policyDef *v1beta1.PolicyDefinition, upstreamOutputs []*PolicyOutput) (RenderedPolicyResult, error) { result := RenderedPolicyResult{ PolicyName: policyDef.Name, PolicyNamespace: policyDef.Namespace, @@ -516,44 +587,7 @@ func (h *AppHandler) renderPolicy(ctx monitorContext.Context, app *v1beta1.Appli Enabled: false, } - // Check if we have a valid cached result (old format - kept for backwards compatibility) - // New code should use loadCachedPolicyRecord with config.refresh instead - cachedRecord, err := loadCachedPolicyFromConfigMap(ctx, h.Client, app, policyDef.Name, -1) - if err != nil { - ctx.Info("Failed to load cached policy from ConfigMap", "policy", policyDef.Name, "error", err) - // Continue with rendering - } else if cachedRecord != nil { - ctx.Info("Using cached policy result (old format)", "policy", policyDef.Name) - - // Deserialize the cached output - if outputData, ok := cachedRecord["output"].(map[string]interface{}); ok { - result.Transforms = deserializeOutputFromStorage(outputData) - } else { - // No output in cache - policy needs re-rendering - klog.Warningf("Policy %s cached without output data - will re-render", policyDef.Name) - return RenderedPolicyResult{}, nil - } - - if additionalContext, ok := cachedRecord["additional_context"].(map[string]interface{}); ok { - result.AdditionalContext = additionalContext - } - - if enabled, ok := cachedRecord["enabled"].(bool); ok { - result.Enabled = enabled - } else { - result.Enabled = true // Default - } - - return result, nil - } - - // Validate policy has CUE schematic - if policyDef.Spec.Schematic == nil || policyDef.Spec.Schematic.CUE == nil { - result.SkipReason = "no CUE schematic" - return result, errors.Errorf("Application-scoped policy %s must have a CUE schematic", policyDef.Name) - } - - // Parse policy parameters + // Parse policy parameters first (needed for rendering) var policyParams map[string]interface{} if policyRef.Properties != nil && len(policyRef.Properties.Raw) > 0 { if err := json.Unmarshal(policyRef.Properties.Raw, &policyParams); err != nil { @@ -563,20 +597,30 @@ func (h *AppHandler) renderPolicy(ctx monitorContext.Context, app *v1beta1.Appli } // Load prior cached result (if any) to pass as context.prior - // Even if we're re-rendering (cache expired), pass the prior result to the template var priorResult map[string]interface{} if app.Status.ApplicationPoliciesConfigMap != "" { - priorResult, _ = loadCachedPolicyFromConfigMap(ctx, h.Client, app, policyDef.Name, -1) // Always load regardless of TTL + priorResult, _ = loadCachedPolicyFromConfigMap(ctx, h.Client, app, policyDef.Name, -1) } - // Render the CUE template with context.application and context.prior + // Render the CUE template to extract config (we need config.refresh before checking cache) rendered, err := h.renderPolicyCUETemplate(ctx, app, policyParams, policyDef, priorResult) if err != nil { result.SkipReason = fmt.Sprintf("CUE render error: %s", err.Error()) return result, errors.Wrap(err, "failed to render CUE template") } - // Extract enabled field (default: true) + // Extract config (including refresh settings) + policyConfig, err := h.extractRefreshConfig(rendered) + if err != nil { + result.SkipReason = fmt.Sprintf("config extraction error: %s", err.Error()) + return result, errors.Wrap(err, "failed to extract config") + } + if policyConfig == nil { + // No config - use defaults + policyConfig = applyRefreshDefaults(&PolicyConfig{Enabled: true}) + } + + // Extract enabled field enabled, err := h.extractEnabled(rendered) if err != nil { result.SkipReason = fmt.Sprintf("enabled extraction error: %s", err.Error()) @@ -589,6 +633,29 @@ func (h *AppHandler) renderPolicy(ctx monitorContext.Context, app *v1beta1.Appli return result, nil } + // Try to load from new cache format with cascade and refresh checks + cachedRecord, err := loadCachedPolicyRecord(ctx, h.Client, app, policyDef.Name, policyConfig, upstreamOutputs) + if err != nil { + ctx.Info("Cache load error, will render", "policy", policyDef.Name, "error", err) + } else if cachedRecord != nil { + ctx.Info("Using cached policy result (new format)", + "policy", policyDef.Name, + "hasSpec", cachedRecord.Spec != nil, + "hasLabels", cachedRecord.Labels != nil, + "hasAnnotations", cachedRecord.Annotations != nil, + "hasCtx", cachedRecord.Ctx != nil) + + // Reconstruct PolicyOutput from cache + result.Transforms = reconstructPolicyOutputFromCache(cachedRecord) + if policyOutput, ok := result.Transforms.(*PolicyOutput); ok { + result.AdditionalContext = policyOutput.Ctx + } + result.Enabled = true + + return result, nil + } + + // Cache miss or invalid - continue with fresh render (we already have the rendered CUE value) // Extract output (new API only) output, err := h.extractOutput(rendered) if err != nil { @@ -602,10 +669,10 @@ func (h *AppHandler) renderPolicy(ctx monitorContext.Context, app *v1beta1.Appli return result, errors.New("policy must specify 'output' field - see documentation for API reference") } - // Store output + // Store output and config result.Transforms = output - // Extract ctx as additionalContext result.AdditionalContext = output.Ctx + result.Config = policyConfig return result, nil } @@ -1250,11 +1317,13 @@ type PolicyChanges struct { // policyConfigMapMetadata tracks metadata needed for ConfigMap storage type policyConfigMapMetadata struct { - Name string - Namespace string - Source string // "global" or "explicit" - Sequence int - Priority int32 + Name string + Namespace string + Source string // "global" or "explicit" + Sequence int + Priority int32 + Config *PolicyConfig // Refresh configuration from policy template + CascadeID string // Hash of upstream policy outputs for cascade invalidation } // serializeTransformsForStorage converts PolicyTransforms to a format suitable for storage and reuse @@ -1766,24 +1835,15 @@ func loadCachedPolicyFromConfigMap(ctx context.Context, cli client.Client, app * } // computeApplicationHash computes a hash of the Application state that affects policy rendering -// This includes spec, labels, and annotations. Used for cache invalidation. +// This ONLY includes spec, following KubeVela's standard revision hash approach. +// Labels and annotations are NOT included because: +// - Policies can modify labels/annotations, which would invalidate downstream caches +// - Only spec changes should trigger cache invalidation +// - Per-output-type refresh control handles metadata changes independently func computeApplicationHash(app *v1beta1.Application) (string, error) { - // Build a structure with only the fields that affect policy rendering - hashInput := map[string]interface{}{ - "spec": app.Spec, - "labels": app.Labels, - "annotations": app.Annotations, - } - - // Marshal to JSON for consistent hashing - jsonBytes, err := json.Marshal(hashInput) - if err != nil { - return "", errors.Wrap(err, "failed to marshal Application for hashing") - } - - // Compute SHA256 hash - hash := sha256.Sum256(jsonBytes) - return hex.EncodeToString(hash[:]), nil + // Use apply.ComputeSpecHash which is KubeVela's standard approach + // This only hashes the spec, not labels or annotations + return apply.ComputeSpecHash(&app.Spec) } // computeCascadeID computes a hash of all upstream policy outputs @@ -1795,10 +1855,28 @@ func computeCascadeID(upstreamOutputs []*PolicyOutput) (string, error) { return "", nil } - // Marshal all upstream outputs to JSON for consistent hashing - jsonBytes, err := json.Marshal(upstreamOutputs) + // Extract ONLY spec fields from upstream outputs + // Labels, annotations, and ctx changes should NOT trigger cascade invalidation + // Only spec changes (components, workflow, policies) affect downstream policy behavior + type specOnly struct { + Components []common.ApplicationComponent `json:"components,omitempty"` + Workflow *v1beta1.Workflow `json:"workflow,omitempty"` + Policies []v1beta1.AppPolicy `json:"policies,omitempty"` + } + + upstreamSpecs := make([]specOnly, 0, len(upstreamOutputs)) + for _, output := range upstreamOutputs { + upstreamSpecs = append(upstreamSpecs, specOnly{ + Components: output.Components, + Workflow: output.Workflow, + Policies: output.Policies, + }) + } + + // Hash only the spec fields + jsonBytes, err := json.Marshal(upstreamSpecs) if err != nil { - return "", errors.Wrap(err, "failed to marshal upstream outputs for cascade ID") + return "", errors.Wrap(err, "failed to marshal upstream specs for cascade ID") } // Compute SHA256 hash