WIP: Cascade invalidation approach (will be replaced)

This commit captures the cascade invalidation exploration before
we pivot to a simpler design. Key changes:

1. computeCascadeID() only hashes spec fields (not metadata)
2. computeApplicationHash() only hashes spec (not labels/annotations)
3. Application hash computed BEFORE policies run
4. Attempted to integrate explicit policies into new flow

Design Decision: This approach is too complex. We're pivoting to:
- Simple 1-minute in-memory cache
- Always render policies
- Store rendered vs applied spec in ConfigMap
- Auto-update annotation for spec changes

This commit preserved for historical reference.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Brian Kane
2026-02-17 12:01:07 +00:00
parent 83675a1aae
commit 68805310e3
2 changed files with 197 additions and 118 deletions

View File

@@ -41,6 +41,7 @@ type RenderedPolicyResult struct {
Transforms interface{} // The transforms (*PolicyTransforms) from CUE template
AdditionalContext map[string]interface{} // The additionalContext field from CUE template
SkipReason string // Reason for skipping (if enabled=false or error)
Config *PolicyConfig // Policy configuration including refresh settings
}
// GlobalPolicyCacheEntry represents cached rendered results for an Application

View File

@@ -77,6 +77,15 @@ func (h *AppHandler) ApplyApplicationScopeTransforms(ctx monitorContext.Context,
// Clear previous global policy status
app.Status.AppliedApplicationPolicies = nil
// CRITICAL: Compute Application hash BEFORE any policies run
// This hash is based on the original spec (not modified by policies yet)
// It's used for cache invalidation - only spec changes should invalidate cache
appHash, err := computeApplicationHash(app)
if err != nil {
ctx.Info("Failed to compute Application hash", "error", err)
appHash = "" // Continue without hash
}
// Step 1: Validate explicit policies are not global
for _, policy := range app.Spec.Policies {
if err := validateNotGlobalPolicy(ctx, h.Client, policy.Type, app.Namespace); err != nil {
@@ -171,6 +180,9 @@ func (h *AppHandler) ApplyApplicationScopeTransforms(ctx monitorContext.Context,
}
}
// Track upstream policy outputs for cascade invalidation
var upstreamOutputs []*PolicyOutput
// Apply the rendered global policy results (either from cache or freshly rendered)
for _, result := range globalRenderedResults {
// Check feature gate for Application-scoped policies
@@ -198,14 +210,30 @@ func (h *AppHandler) ApplyApplicationScopeTransforms(ctx monitorContext.Context,
allPolicyChanges[result.PolicyName] = policyChanges
}
if result.Enabled {
// Compute cascade ID for this policy (hash of all upstream outputs)
cascadeID, err := computeCascadeID(upstreamOutputs)
if err != nil {
ctx.Info("Failed to compute cascade ID", "policy", result.PolicyName, "error", err)
cascadeID = ""
}
policyMetadata[result.PolicyName] = &policyConfigMapMetadata{
Name: result.PolicyName,
Namespace: result.PolicyNamespace,
Source: "global",
Sequence: sequence,
Priority: priority,
Config: result.Config,
CascadeID: cascadeID,
}
sequence++ // Increment sequence only if policy was applied (enabled=true)
// Track this policy's output for cascade invalidation of downstream policies
if result.Transforms != nil {
if policyOutput, ok := result.Transforms.(*PolicyOutput); ok {
upstreamOutputs = append(upstreamOutputs, policyOutput)
}
}
}
}
} else if shouldSkipGlobalPolicies(app) {
@@ -215,7 +243,7 @@ func (h *AppHandler) ApplyApplicationScopeTransforms(ctx monitorContext.Context,
}
// Step 3: Apply explicit policies from Application spec
// These are NOT cached as they may have parameters specific to this Application
// Use same render+apply flow as global policies for caching and cascade tracking
for _, policy := range app.Spec.Policies {
// Load PolicyDefinition template
templ, err := appfile.LoadTemplate(ctx, h.Client, policy.Type, types.TypePolicy, app.Annotations)
@@ -240,70 +268,108 @@ func (h *AppHandler) ApplyApplicationScopeTransforms(ctx monitorContext.Context,
ctx.Info("Applying explicit Application-scoped policy", "policy", policy.Type, "name", policy.Name)
// Render and apply (not cached - explicit policies can have unique parameters)
var changes *PolicyChanges
ctx, changes, err = h.applyPolicyTransform(ctx, app, policy, templ.PolicyDefinition)
// Render with upstream tracking (same as global policies)
result, err := h.renderPolicyWithUpstream(ctx, app, policy, templ.PolicyDefinition, upstreamOutputs)
if err != nil {
// Record failure in status
recordApplicationPolicyStatus(app, policy.Name, templ.PolicyDefinition.Namespace, "explicit", sequence, 0, false, fmt.Sprintf("error: %s", err.Error()), nil)
return ctx, errors.Wrapf(err, "failed to apply transform from policy %s", policy.Type)
ctx.Info("Failed to render explicit policy, skipping", "policy", policy.Name, "error", err)
result.PolicyName = policy.Name
result.PolicyNamespace = templ.PolicyDefinition.Namespace
result.Enabled = false
result.SkipReason = fmt.Sprintf("render error: %s", err.Error())
}
// Record successful application in status
if changes != nil {
// SpecModified is already set correctly in applyPolicyTransform
allPolicyChanges[policy.Name] = changes // Store for ConfigMap serialization
// Apply the rendered result
priority := int32(0) // Explicit policies don't have priority
var policyChanges *PolicyChanges
ctx, policyChanges, err = h.applyRenderedPolicyResult(ctx, app, result, sequence, priority)
if err != nil {
return ctx, errors.Wrapf(err, "failed to apply explicit policy %s", policy.Name)
}
recordApplicationPolicyStatus(app, policy.Name, templ.PolicyDefinition.Namespace, "explicit", sequence, 0, true, "", changes)
// Store metadata for ConfigMap
policyMetadata[policy.Name] = &policyConfigMapMetadata{
Name: policy.Name,
Namespace: templ.PolicyDefinition.Namespace,
Source: "explicit",
Sequence: sequence,
Priority: 0, // Explicit policies don't have priority
// Store changes and metadata for ConfigMap storage
if policyChanges != nil {
allPolicyChanges[policy.Name] = policyChanges
}
if result.Enabled {
// Compute cascade ID for this policy (hash of all upstream outputs)
cascadeID, err := computeCascadeID(upstreamOutputs)
if err != nil {
ctx.Info("Failed to compute cascade ID", "policy", policy.Name, "error", err)
cascadeID = ""
}
policyMetadata[policy.Name] = &policyConfigMapMetadata{
Name: policy.Name,
Namespace: templ.PolicyDefinition.Namespace,
Source: "explicit",
Sequence: sequence,
Priority: priority,
Config: result.Config,
CascadeID: cascadeID,
}
sequence++ // Increment sequence only if policy was applied (enabled=true)
// Track this policy's output for cascade invalidation of downstream policies
if result.Transforms != nil {
if policyOutput, ok := result.Transforms.(*PolicyOutput); ok {
upstreamOutputs = append(upstreamOutputs, policyOutput)
}
}
}
sequence++
}
// Step 4: Store all rendered policy outputs in ConfigMap for reuse and observability
// This creates a persistent cache with TTL that can be used to avoid re-rendering
orderedData := make(map[string]string)
// Compute hash of Application state (spec + metadata) for cache invalidation
appHash, err := computeApplicationHash(app)
if err != nil {
ctx.Info("Failed to compute Application hash", "error", err)
appHash = "" // Continue without hash
}
// Note: appHash was computed at the very beginning (before policies ran)
// This ensures we're hashing the original Application spec, not the modified version
// Build ConfigMap data from metadata and changes tracked during reconciliation
for policyName, metadata := range policyMetadata {
// TTL is no longer stored on PolicyDefinition - it's now part of config.refresh in the template
ttlSeconds := int32(-1) // Default for backwards compatibility with old format
// Build the rendered policy record with everything needed to reapply it
policyRecord := map[string]interface{}{
"policy": metadata.Name,
"namespace": metadata.Namespace,
"source": metadata.Source,
"sequence": metadata.Sequence,
"priority": metadata.Priority,
"rendered_at": time.Now().Format(time.RFC3339),
"ttl_seconds": ttlSeconds,
"enabled": true,
"application_hash": appHash, // Hash of Application for cache invalidation
// Get the full policy changes if available (includes output, labels, annotations, context)
policyChanges, hasPolicyChanges := allPolicyChanges[policyName]
if !hasPolicyChanges || policyChanges == nil || policyChanges.Output == nil {
// No output to cache - skip
continue
}
// Get the full policy changes if available (includes output, labels, annotations, context)
if policyChanges, ok := allPolicyChanges[policyName]; ok && policyChanges != nil {
// Use new per-output-type cache record format if config is available
var policyJSON []byte
var err error
if metadata.Config != nil {
// New format: per-output-type caching with refresh control
cacheRecord := createPolicyCacheRecord(
metadata.Name,
metadata.Namespace,
int(metadata.Priority),
metadata.Sequence,
appHash,
metadata.CascadeID,
policyChanges.Output,
metadata.Config,
)
policyJSON, err = json.MarshalIndent(cacheRecord, "", " ")
} else {
// Old format: monolithic cache (for backwards compatibility)
policyRecord := map[string]interface{}{
"policy": metadata.Name,
"namespace": metadata.Namespace,
"source": metadata.Source,
"sequence": metadata.Sequence,
"priority": metadata.Priority,
"rendered_at": time.Now().Format(time.RFC3339),
"ttl_seconds": int32(-1),
"enabled": true,
"application_hash": appHash,
}
// Store the output object in reusable format
if policyChanges.Output != nil {
outputData := serializeOutputForStorage(policyChanges.Output)
if len(outputData) > 0 {
policyRecord["output"] = outputData
}
outputData := serializeOutputForStorage(policyChanges.Output)
if len(outputData) > 0 {
policyRecord["output"] = outputData
}
// Add additional context if available
@@ -318,10 +384,10 @@ func (h *AppHandler) ApplyApplicationScopeTransforms(ctx monitorContext.Context,
"spec_modified": policyChanges.SpecModified,
"has_context": len(policyChanges.AdditionalContext) > 0,
}
policyJSON, err = json.MarshalIndent(policyRecord, "", " ")
}
// Marshal to pretty JSON for human readability and tool consumption
policyJSON, err := json.MarshalIndent(policyRecord, "", " ")
if err != nil {
ctx.Info("Failed to marshal policy record", "policy", metadata.Name, "error", err)
continue
@@ -509,6 +575,11 @@ func (h *AppHandler) applyPolicyTransform(ctx monitorContext.Context, app *v1bet
// renderPolicy renders a policy's CUE template and extracts the results for caching
// Returns a RenderedPolicyResult that can be cached and reused
func (h *AppHandler) renderPolicy(ctx monitorContext.Context, app *v1beta1.Application, policyRef v1beta1.AppPolicy, policyDef *v1beta1.PolicyDefinition) (RenderedPolicyResult, error) {
return h.renderPolicyWithUpstream(ctx, app, policyRef, policyDef, nil)
}
// renderPolicyWithUpstream renders a policy with upstream output tracking for cascade invalidation
func (h *AppHandler) renderPolicyWithUpstream(ctx monitorContext.Context, app *v1beta1.Application, policyRef v1beta1.AppPolicy, policyDef *v1beta1.PolicyDefinition, upstreamOutputs []*PolicyOutput) (RenderedPolicyResult, error) {
result := RenderedPolicyResult{
PolicyName: policyDef.Name,
PolicyNamespace: policyDef.Namespace,
@@ -516,44 +587,7 @@ func (h *AppHandler) renderPolicy(ctx monitorContext.Context, app *v1beta1.Appli
Enabled: false,
}
// Check if we have a valid cached result (old format - kept for backwards compatibility)
// New code should use loadCachedPolicyRecord with config.refresh instead
cachedRecord, err := loadCachedPolicyFromConfigMap(ctx, h.Client, app, policyDef.Name, -1)
if err != nil {
ctx.Info("Failed to load cached policy from ConfigMap", "policy", policyDef.Name, "error", err)
// Continue with rendering
} else if cachedRecord != nil {
ctx.Info("Using cached policy result (old format)", "policy", policyDef.Name)
// Deserialize the cached output
if outputData, ok := cachedRecord["output"].(map[string]interface{}); ok {
result.Transforms = deserializeOutputFromStorage(outputData)
} else {
// No output in cache - policy needs re-rendering
klog.Warningf("Policy %s cached without output data - will re-render", policyDef.Name)
return RenderedPolicyResult{}, nil
}
if additionalContext, ok := cachedRecord["additional_context"].(map[string]interface{}); ok {
result.AdditionalContext = additionalContext
}
if enabled, ok := cachedRecord["enabled"].(bool); ok {
result.Enabled = enabled
} else {
result.Enabled = true // Default
}
return result, nil
}
// Validate policy has CUE schematic
if policyDef.Spec.Schematic == nil || policyDef.Spec.Schematic.CUE == nil {
result.SkipReason = "no CUE schematic"
return result, errors.Errorf("Application-scoped policy %s must have a CUE schematic", policyDef.Name)
}
// Parse policy parameters
// Parse policy parameters first (needed for rendering)
var policyParams map[string]interface{}
if policyRef.Properties != nil && len(policyRef.Properties.Raw) > 0 {
if err := json.Unmarshal(policyRef.Properties.Raw, &policyParams); err != nil {
@@ -563,20 +597,30 @@ func (h *AppHandler) renderPolicy(ctx monitorContext.Context, app *v1beta1.Appli
}
// Load prior cached result (if any) to pass as context.prior
// Even if we're re-rendering (cache expired), pass the prior result to the template
var priorResult map[string]interface{}
if app.Status.ApplicationPoliciesConfigMap != "" {
priorResult, _ = loadCachedPolicyFromConfigMap(ctx, h.Client, app, policyDef.Name, -1) // Always load regardless of TTL
priorResult, _ = loadCachedPolicyFromConfigMap(ctx, h.Client, app, policyDef.Name, -1)
}
// Render the CUE template with context.application and context.prior
// Render the CUE template to extract config (we need config.refresh before checking cache)
rendered, err := h.renderPolicyCUETemplate(ctx, app, policyParams, policyDef, priorResult)
if err != nil {
result.SkipReason = fmt.Sprintf("CUE render error: %s", err.Error())
return result, errors.Wrap(err, "failed to render CUE template")
}
// Extract enabled field (default: true)
// Extract config (including refresh settings)
policyConfig, err := h.extractRefreshConfig(rendered)
if err != nil {
result.SkipReason = fmt.Sprintf("config extraction error: %s", err.Error())
return result, errors.Wrap(err, "failed to extract config")
}
if policyConfig == nil {
// No config - use defaults
policyConfig = applyRefreshDefaults(&PolicyConfig{Enabled: true})
}
// Extract enabled field
enabled, err := h.extractEnabled(rendered)
if err != nil {
result.SkipReason = fmt.Sprintf("enabled extraction error: %s", err.Error())
@@ -589,6 +633,29 @@ func (h *AppHandler) renderPolicy(ctx monitorContext.Context, app *v1beta1.Appli
return result, nil
}
// Try to load from new cache format with cascade and refresh checks
cachedRecord, err := loadCachedPolicyRecord(ctx, h.Client, app, policyDef.Name, policyConfig, upstreamOutputs)
if err != nil {
ctx.Info("Cache load error, will render", "policy", policyDef.Name, "error", err)
} else if cachedRecord != nil {
ctx.Info("Using cached policy result (new format)",
"policy", policyDef.Name,
"hasSpec", cachedRecord.Spec != nil,
"hasLabels", cachedRecord.Labels != nil,
"hasAnnotations", cachedRecord.Annotations != nil,
"hasCtx", cachedRecord.Ctx != nil)
// Reconstruct PolicyOutput from cache
result.Transforms = reconstructPolicyOutputFromCache(cachedRecord)
if policyOutput, ok := result.Transforms.(*PolicyOutput); ok {
result.AdditionalContext = policyOutput.Ctx
}
result.Enabled = true
return result, nil
}
// Cache miss or invalid - continue with fresh render (we already have the rendered CUE value)
// Extract output (new API only)
output, err := h.extractOutput(rendered)
if err != nil {
@@ -602,10 +669,10 @@ func (h *AppHandler) renderPolicy(ctx monitorContext.Context, app *v1beta1.Appli
return result, errors.New("policy must specify 'output' field - see documentation for API reference")
}
// Store output
// Store output and config
result.Transforms = output
// Extract ctx as additionalContext
result.AdditionalContext = output.Ctx
result.Config = policyConfig
return result, nil
}
@@ -1250,11 +1317,13 @@ type PolicyChanges struct {
// policyConfigMapMetadata tracks metadata needed for ConfigMap storage
type policyConfigMapMetadata struct {
Name string
Namespace string
Source string // "global" or "explicit"
Sequence int
Priority int32
Name string
Namespace string
Source string // "global" or "explicit"
Sequence int
Priority int32
Config *PolicyConfig // Refresh configuration from policy template
CascadeID string // Hash of upstream policy outputs for cascade invalidation
}
// serializeTransformsForStorage converts PolicyTransforms to a format suitable for storage and reuse
@@ -1766,24 +1835,15 @@ func loadCachedPolicyFromConfigMap(ctx context.Context, cli client.Client, app *
}
// computeApplicationHash computes a hash of the Application state that affects policy rendering
// This includes spec, labels, and annotations. Used for cache invalidation.
// This ONLY includes spec, following KubeVela's standard revision hash approach.
// Labels and annotations are NOT included because:
// - Policies can modify labels/annotations, which would invalidate downstream caches
// - Only spec changes should trigger cache invalidation
// - Per-output-type refresh control handles metadata changes independently
func computeApplicationHash(app *v1beta1.Application) (string, error) {
// Build a structure with only the fields that affect policy rendering
hashInput := map[string]interface{}{
"spec": app.Spec,
"labels": app.Labels,
"annotations": app.Annotations,
}
// Marshal to JSON for consistent hashing
jsonBytes, err := json.Marshal(hashInput)
if err != nil {
return "", errors.Wrap(err, "failed to marshal Application for hashing")
}
// Compute SHA256 hash
hash := sha256.Sum256(jsonBytes)
return hex.EncodeToString(hash[:]), nil
// Use apply.ComputeSpecHash which is KubeVela's standard approach
// This only hashes the spec, not labels or annotations
return apply.ComputeSpecHash(&app.Spec)
}
// computeCascadeID computes a hash of all upstream policy outputs
@@ -1795,10 +1855,28 @@ func computeCascadeID(upstreamOutputs []*PolicyOutput) (string, error) {
return "", nil
}
// Marshal all upstream outputs to JSON for consistent hashing
jsonBytes, err := json.Marshal(upstreamOutputs)
// Extract ONLY spec fields from upstream outputs
// Labels, annotations, and ctx changes should NOT trigger cascade invalidation
// Only spec changes (components, workflow, policies) affect downstream policy behavior
type specOnly struct {
Components []common.ApplicationComponent `json:"components,omitempty"`
Workflow *v1beta1.Workflow `json:"workflow,omitempty"`
Policies []v1beta1.AppPolicy `json:"policies,omitempty"`
}
upstreamSpecs := make([]specOnly, 0, len(upstreamOutputs))
for _, output := range upstreamOutputs {
upstreamSpecs = append(upstreamSpecs, specOnly{
Components: output.Components,
Workflow: output.Workflow,
Policies: output.Policies,
})
}
// Hash only the spec fields
jsonBytes, err := json.Marshal(upstreamSpecs)
if err != nil {
return "", errors.Wrap(err, "failed to marshal upstream outputs for cascade ID")
return "", errors.Wrap(err, "failed to marshal upstream specs for cascade ID")
}
// Compute SHA256 hash