mirror of
https://github.com/kubevela/kubevela.git
synced 2026-05-19 15:56:54 +00:00
Checkpoint - working with caching and globals
This commit is contained in:
@@ -168,7 +168,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
|
||||
}
|
||||
|
||||
// Emit events for applied global policies (for observability)
|
||||
for _, appliedPolicy := range app.Status.AppliedGlobalPolicies {
|
||||
for _, appliedPolicy := range app.Status.AppliedApplicationPolicies {
|
||||
if appliedPolicy.Applied {
|
||||
r.Recorder.Event(app, event.Normal("GlobalPolicyApplied",
|
||||
fmt.Sprintf("Applied global policy %s from namespace %s", appliedPolicy.Name, appliedPolicy.Namespace)))
|
||||
|
||||
@@ -401,7 +401,7 @@ func (h *AppHandler) ApplyPolicies(ctx context.Context, af *appfile.Appfile) err
|
||||
}))
|
||||
defer subCtx.Commit("finish apply policies")
|
||||
}
|
||||
policyManifests, err := af.GeneratePolicyManifests(ctx)
|
||||
policyManifests, err := af.GeneratePolicyManifests(ctx, h.Client)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to render policy manifests")
|
||||
}
|
||||
|
||||
@@ -110,7 +110,7 @@ var _ = Describe("Test Application workflow generator", func() {
|
||||
}
|
||||
af, err := appParser.GenerateAppFile(ctx, app)
|
||||
Expect(err).Should(BeNil())
|
||||
_, err = af.GeneratePolicyManifests(context.Background())
|
||||
_, err = af.GeneratePolicyManifests(context.Background(), k8sClient)
|
||||
Expect(err).Should(BeNil())
|
||||
|
||||
handler, err := NewAppHandler(ctx, reconciler, app)
|
||||
@@ -153,7 +153,7 @@ var _ = Describe("Test Application workflow generator", func() {
|
||||
}
|
||||
af, err := appParser.GenerateAppFile(ctx, app)
|
||||
Expect(err).Should(BeNil())
|
||||
_, err = af.GeneratePolicyManifests(context.Background())
|
||||
_, err = af.GeneratePolicyManifests(context.Background(), k8sClient)
|
||||
Expect(err).Should(BeNil())
|
||||
|
||||
handler, err := NewAppHandler(ctx, reconciler, app)
|
||||
|
||||
365
pkg/controller/core.oam.dev/v1beta1/application/policy_dryrun.go
Normal file
365
pkg/controller/core.oam.dev/v1beta1/application/policy_dryrun.go
Normal file
@@ -0,0 +1,365 @@
|
||||
/*
|
||||
Copyright 2021 The KubeVela Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package application
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
monitorContext "github.com/kubevela/pkg/monitor/context"
|
||||
"github.com/pkg/errors"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
|
||||
"github.com/oam-dev/kubevela/pkg/features"
|
||||
"github.com/oam-dev/kubevela/pkg/oam"
|
||||
)
|
||||
|
||||
// PolicyDryRunMode defines how policies are discovered and applied in dry-run
|
||||
type PolicyDryRunMode string
|
||||
|
||||
const (
|
||||
// DryRunModeIsolated tests only specified policies
|
||||
DryRunModeIsolated PolicyDryRunMode = "isolated"
|
||||
// DryRunModeAdditive tests specified policies with existing globals
|
||||
DryRunModeAdditive PolicyDryRunMode = "additive"
|
||||
// DryRunModeFull simulates complete policy chain (globals + app policies)
|
||||
DryRunModeFull PolicyDryRunMode = "full"
|
||||
)
|
||||
|
||||
// PolicyDryRunOptions contains configuration for policy dry-run simulation
|
||||
type PolicyDryRunOptions struct {
|
||||
// Mode determines which policies are applied
|
||||
Mode PolicyDryRunMode
|
||||
// SpecifiedPolicies are the policy names to test (for isolated/additive modes)
|
||||
SpecifiedPolicies []string
|
||||
// IncludeAppPolicies includes policies from Application spec (for full mode)
|
||||
IncludeAppPolicies bool
|
||||
}
|
||||
|
||||
// PolicyDryRunResult contains the results of a policy dry-run simulation
|
||||
type PolicyDryRunResult struct {
|
||||
// Application is the final state after all policies applied
|
||||
Application *v1beta1.Application
|
||||
// ExecutionPlan shows which policies were discovered and in what order
|
||||
ExecutionPlan []PolicyExecutionStep
|
||||
// PolicyResults contains detailed results for each policy
|
||||
PolicyResults []PolicyApplicationResult
|
||||
// Diffs contains the JSON patches for each policy that modified the spec
|
||||
Diffs map[string][]byte
|
||||
// Warnings contains any warnings detected during simulation
|
||||
Warnings []string
|
||||
// Errors contains any errors encountered
|
||||
Errors []string
|
||||
}
|
||||
|
||||
// PolicyExecutionStep represents a policy in the execution plan
|
||||
type PolicyExecutionStep struct {
|
||||
Sequence int
|
||||
PolicyName string
|
||||
PolicyNamespace string
|
||||
Priority int32
|
||||
Source string // "global", "app-spec", or "specified"
|
||||
}
|
||||
|
||||
// PolicyApplicationResult contains the results of applying a single policy
|
||||
type PolicyApplicationResult struct {
|
||||
Sequence int
|
||||
PolicyName string
|
||||
PolicyNamespace string
|
||||
Priority int32
|
||||
Enabled bool
|
||||
Applied bool
|
||||
SpecModified bool
|
||||
AddedLabels map[string]string
|
||||
AddedAnnotations map[string]string
|
||||
AdditionalContext *runtime.RawExtension
|
||||
SkipReason string
|
||||
Error string
|
||||
}
|
||||
|
||||
// SimulatePolicyApplication performs a dry-run simulation of policy application
|
||||
// This function can be used by CLI tools to preview policy effects without persisting changes
|
||||
func SimulatePolicyApplication(ctx context.Context, cli client.Client, app *v1beta1.Application, opts PolicyDryRunOptions) (*PolicyDryRunResult, error) {
|
||||
// Create a deep copy of the application to avoid modifying the original
|
||||
appCopy := app.DeepCopy()
|
||||
|
||||
// Create a monitor context
|
||||
monCtx := monitorContext.NewTraceContext(ctx, "")
|
||||
|
||||
// Create AppHandler for policy operations
|
||||
handler := &AppHandler{
|
||||
Client: cli,
|
||||
app: appCopy,
|
||||
}
|
||||
|
||||
result := &PolicyDryRunResult{
|
||||
Application: appCopy,
|
||||
ExecutionPlan: []PolicyExecutionStep{},
|
||||
PolicyResults: []PolicyApplicationResult{},
|
||||
Diffs: make(map[string][]byte),
|
||||
Warnings: []string{},
|
||||
Errors: []string{},
|
||||
}
|
||||
|
||||
// Clear any existing policy status
|
||||
appCopy.Status.AppliedApplicationPolicies = nil
|
||||
|
||||
// Step 1: Build execution plan based on mode
|
||||
var policiesToApply []v1beta1.PolicyDefinition
|
||||
var sequence int = 1
|
||||
|
||||
switch opts.Mode {
|
||||
case DryRunModeIsolated:
|
||||
// Test only specified policies
|
||||
if len(opts.SpecifiedPolicies) == 0 {
|
||||
return nil, errors.New("isolated mode requires at least one policy to be specified")
|
||||
}
|
||||
for _, policyName := range opts.SpecifiedPolicies {
|
||||
// Try to load from vela-system first, then app namespace
|
||||
policy, err := loadPolicyDefinition(ctx, cli, policyName, oam.SystemDefinitionNamespace)
|
||||
if err != nil {
|
||||
// Try app namespace
|
||||
policy, err = loadPolicyDefinition(ctx, cli, policyName, appCopy.Namespace)
|
||||
if err != nil {
|
||||
result.Errors = append(result.Errors, fmt.Sprintf("Policy %s not found in vela-system or %s", policyName, appCopy.Namespace))
|
||||
continue
|
||||
}
|
||||
}
|
||||
policiesToApply = append(policiesToApply, *policy)
|
||||
|
||||
result.ExecutionPlan = append(result.ExecutionPlan, PolicyExecutionStep{
|
||||
Sequence: sequence,
|
||||
PolicyName: policy.Name,
|
||||
PolicyNamespace: policy.Namespace,
|
||||
Priority: policy.Spec.Priority,
|
||||
Source: "specified",
|
||||
})
|
||||
sequence++
|
||||
}
|
||||
|
||||
case DryRunModeAdditive:
|
||||
// Include global policies + specified policies
|
||||
if !shouldSkipGlobalPolicies(appCopy) && utilfeature.DefaultMutableFeatureGate.Enabled(features.EnableGlobalPolicies) {
|
||||
// Discover global policies
|
||||
globalPolicies, err := discoverAndDeduplicateGlobalPolicies(monCtx, cli, appCopy.Namespace)
|
||||
if err != nil {
|
||||
result.Warnings = append(result.Warnings, fmt.Sprintf("Failed to discover global policies: %v", err))
|
||||
} else {
|
||||
for _, policy := range globalPolicies {
|
||||
policiesToApply = append(policiesToApply, policy)
|
||||
result.ExecutionPlan = append(result.ExecutionPlan, PolicyExecutionStep{
|
||||
Sequence: sequence,
|
||||
PolicyName: policy.Name,
|
||||
PolicyNamespace: policy.Namespace,
|
||||
Priority: policy.Spec.Priority,
|
||||
Source: "global",
|
||||
})
|
||||
sequence++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add specified policies
|
||||
for _, policyName := range opts.SpecifiedPolicies {
|
||||
policy, err := loadPolicyDefinition(ctx, cli, policyName, oam.SystemDefinitionNamespace)
|
||||
if err != nil {
|
||||
policy, err = loadPolicyDefinition(ctx, cli, policyName, appCopy.Namespace)
|
||||
if err != nil {
|
||||
result.Errors = append(result.Errors, fmt.Sprintf("Policy %s not found", policyName))
|
||||
continue
|
||||
}
|
||||
}
|
||||
policiesToApply = append(policiesToApply, *policy)
|
||||
result.ExecutionPlan = append(result.ExecutionPlan, PolicyExecutionStep{
|
||||
Sequence: sequence,
|
||||
PolicyName: policy.Name,
|
||||
PolicyNamespace: policy.Namespace,
|
||||
Priority: policy.Spec.Priority,
|
||||
Source: "specified",
|
||||
})
|
||||
sequence++
|
||||
}
|
||||
|
||||
case DryRunModeFull:
|
||||
// Full simulation: global + app policies
|
||||
if !shouldSkipGlobalPolicies(appCopy) && utilfeature.DefaultMutableFeatureGate.Enabled(features.EnableGlobalPolicies) {
|
||||
globalPolicies, err := discoverAndDeduplicateGlobalPolicies(monCtx, cli, appCopy.Namespace)
|
||||
if err != nil {
|
||||
result.Warnings = append(result.Warnings, fmt.Sprintf("Failed to discover global policies: %v", err))
|
||||
} else {
|
||||
for _, policy := range globalPolicies {
|
||||
policiesToApply = append(policiesToApply, policy)
|
||||
result.ExecutionPlan = append(result.ExecutionPlan, PolicyExecutionStep{
|
||||
Sequence: sequence,
|
||||
PolicyName: policy.Name,
|
||||
PolicyNamespace: policy.Namespace,
|
||||
Priority: policy.Spec.Priority,
|
||||
Source: "global",
|
||||
})
|
||||
sequence++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add app spec policies if requested
|
||||
if opts.IncludeAppPolicies {
|
||||
for _, policyRef := range appCopy.Spec.Policies {
|
||||
policy, err := loadPolicyDefinition(ctx, cli, policyRef.Type, appCopy.Namespace)
|
||||
if err != nil {
|
||||
policy, err = loadPolicyDefinition(ctx, cli, policyRef.Type, oam.SystemDefinitionNamespace)
|
||||
if err != nil {
|
||||
result.Errors = append(result.Errors, fmt.Sprintf("Policy %s not found", policyRef.Type))
|
||||
continue
|
||||
}
|
||||
}
|
||||
policiesToApply = append(policiesToApply, *policy)
|
||||
result.ExecutionPlan = append(result.ExecutionPlan, PolicyExecutionStep{
|
||||
Sequence: sequence,
|
||||
PolicyName: policy.Name,
|
||||
PolicyNamespace: policy.Namespace,
|
||||
Priority: policy.Spec.Priority,
|
||||
Source: "app-spec",
|
||||
})
|
||||
sequence++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Step 2: Apply each policy and track results
|
||||
policySequence := 1
|
||||
for _, policy := range policiesToApply {
|
||||
// Take snapshot before applying policy (for future use in diff computation)
|
||||
_, err := deepCopyAppSpec(&appCopy.Spec)
|
||||
if err != nil {
|
||||
result.Warnings = append(result.Warnings, fmt.Sprintf("Failed to snapshot spec for policy %s: %v", policy.Name, err))
|
||||
}
|
||||
|
||||
// Render the policy
|
||||
policyRef := v1beta1.AppPolicy{
|
||||
Name: policy.Name,
|
||||
Type: policy.Name,
|
||||
}
|
||||
|
||||
renderedResult, err := handler.renderPolicy(monCtx, appCopy, policyRef, &policy)
|
||||
if err != nil {
|
||||
result.Errors = append(result.Errors, fmt.Sprintf("Policy %s render error: %v", policy.Name, err))
|
||||
result.PolicyResults = append(result.PolicyResults, PolicyApplicationResult{
|
||||
Sequence: policySequence,
|
||||
PolicyName: policy.Name,
|
||||
PolicyNamespace: policy.Namespace,
|
||||
Priority: policy.Spec.Priority,
|
||||
Enabled: false,
|
||||
Applied: false,
|
||||
Error: err.Error(),
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
// Apply the rendered policy
|
||||
var policyChanges *PolicyChanges
|
||||
monCtx, policyChanges, err = handler.applyRenderedPolicyResult(monCtx, appCopy, renderedResult, policySequence, policy.Spec.Priority)
|
||||
|
||||
policyResult := PolicyApplicationResult{
|
||||
Sequence: policySequence,
|
||||
PolicyName: policy.Name,
|
||||
PolicyNamespace: policy.Namespace,
|
||||
Priority: policy.Spec.Priority,
|
||||
Enabled: renderedResult.Enabled,
|
||||
Applied: renderedResult.Enabled && err == nil,
|
||||
SkipReason: renderedResult.SkipReason,
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
policyResult.Error = err.Error()
|
||||
result.Errors = append(result.Errors, fmt.Sprintf("Policy %s application error: %v", policy.Name, err))
|
||||
} else if renderedResult.Enabled && policyChanges != nil {
|
||||
// Extract changes from policyChanges
|
||||
policyResult.SpecModified = policyChanges.SpecModified
|
||||
policyResult.AddedLabels = policyChanges.AddedLabels
|
||||
policyResult.AddedAnnotations = policyChanges.AddedAnnotations
|
||||
|
||||
// Convert additional context from map to RawExtension
|
||||
if policyChanges.AdditionalContext != nil {
|
||||
contextBytes, err := json.Marshal(policyChanges.AdditionalContext)
|
||||
if err == nil {
|
||||
policyResult.AdditionalContext = &runtime.RawExtension{Raw: contextBytes}
|
||||
}
|
||||
}
|
||||
|
||||
policySequence++
|
||||
}
|
||||
|
||||
result.PolicyResults = append(result.PolicyResults, policyResult)
|
||||
}
|
||||
|
||||
result.Application = appCopy
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// loadPolicyDefinition loads a PolicyDefinition from the cluster
|
||||
func loadPolicyDefinition(ctx context.Context, cli client.Client, name, namespace string) (*v1beta1.PolicyDefinition, error) {
|
||||
policy := &v1beta1.PolicyDefinition{}
|
||||
err := cli.Get(ctx, client.ObjectKey{Name: name, Namespace: namespace}, policy)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return policy, nil
|
||||
}
|
||||
|
||||
// discoverAndDeduplicateGlobalPolicies discovers global policies from both vela-system and app namespace
|
||||
// and returns the deduplicated list (namespace policies win over vela-system)
|
||||
func discoverAndDeduplicateGlobalPolicies(ctx monitorContext.Context, cli client.Client, appNamespace string) ([]v1beta1.PolicyDefinition, error) {
|
||||
var globalPolicies []v1beta1.PolicyDefinition
|
||||
|
||||
// Discover from vela-system
|
||||
velaSystemPolicies, err := discoverGlobalPolicies(ctx, cli, oam.SystemDefinitionNamespace)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Discover from app namespace (if different)
|
||||
var namespacePolicies []v1beta1.PolicyDefinition
|
||||
if appNamespace != oam.SystemDefinitionNamespace {
|
||||
namespacePolicies, err = discoverGlobalPolicies(ctx, cli, appNamespace)
|
||||
if err != nil {
|
||||
// Non-fatal, continue with vela-system policies only
|
||||
namespacePolicies = []v1beta1.PolicyDefinition{}
|
||||
}
|
||||
}
|
||||
|
||||
// Deduplicate: namespace policies win
|
||||
namespacePolicyNames := make(map[string]bool)
|
||||
for _, policy := range namespacePolicies {
|
||||
namespacePolicyNames[policy.Name] = true
|
||||
}
|
||||
|
||||
// Add namespace policies first
|
||||
globalPolicies = append(globalPolicies, namespacePolicies...)
|
||||
|
||||
// Add vela-system policies (skip if name exists in namespace)
|
||||
for _, policy := range velaSystemPolicies {
|
||||
if !namespacePolicyNames[policy.Name] {
|
||||
globalPolicies = append(globalPolicies, policy)
|
||||
}
|
||||
}
|
||||
|
||||
return globalPolicies, nil
|
||||
}
|
||||
@@ -18,13 +18,16 @@ package application
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"cuelang.org/go/cue"
|
||||
"github.com/crossplane/crossplane-runtime/pkg/meta"
|
||||
jsonpatch "github.com/evanphx/json-patch"
|
||||
"github.com/kubevela/pkg/cue/cuex"
|
||||
monitorContext "github.com/kubevela/pkg/monitor/context"
|
||||
@@ -50,13 +53,22 @@ const (
|
||||
|
||||
// ApplyApplicationScopeTransforms iterates through policies in the Application spec
|
||||
// and applies transforms from any Application-scoped PolicyDefinitions.
|
||||
//
|
||||
// Two-level caching strategy:
|
||||
// 1. In-memory global cache (globalPolicyCache) - Caches rendered policy results for rapid
|
||||
// reconciliations. Invalidated when Application or global policy set changes.
|
||||
// 2. ConfigMap persistent cache - Stores individual policy results with TTL control:
|
||||
// - TTL=-1: Never refresh (deterministic policies)
|
||||
// - TTL=0: Never cache (policies with external dependencies)
|
||||
// - TTL>0: Refresh after N seconds
|
||||
//
|
||||
// It first discovers and applies any global policies (if feature gate enabled),
|
||||
// then applies explicit policies from the Application spec.
|
||||
// This modifies the in-memory Application object before it's parsed into an AppFile.
|
||||
// Returns the updated context with any additionalContext from policies.
|
||||
func (h *AppHandler) ApplyApplicationScopeTransforms(ctx monitorContext.Context, app *v1beta1.Application) (monitorContext.Context, error) {
|
||||
// Clear previous global policy status
|
||||
app.Status.AppliedGlobalPolicies = nil
|
||||
app.Status.AppliedApplicationPolicies = nil
|
||||
|
||||
// Step 1: Validate explicit policies are not global
|
||||
for _, policy := range app.Spec.Policies {
|
||||
@@ -67,8 +79,9 @@ func (h *AppHandler) ApplyApplicationScopeTransforms(ctx monitorContext.Context,
|
||||
|
||||
// Step 2: Handle global policies (if feature gate enabled and not opted out)
|
||||
var globalRenderedResults []RenderedPolicyResult
|
||||
allDiffs := make(map[string][]byte) // Track spec diffs: policy-name -> JSON patch
|
||||
sequence := 1 // Track execution order
|
||||
allPolicyChanges := make(map[string]*PolicyChanges) // Track full changes for ConfigMap storage
|
||||
policyMetadata := make(map[string]*policyConfigMapMetadata) // Track metadata for ConfigMap
|
||||
sequence := 1 // Track execution order
|
||||
|
||||
if !shouldSkipGlobalPolicies(app) && utilfeature.DefaultMutableFeatureGate.Enabled(features.EnableGlobalPolicies) {
|
||||
// Compute current global policy hash for cache validation
|
||||
@@ -158,20 +171,25 @@ func (h *AppHandler) ApplyApplicationScopeTransforms(ctx monitorContext.Context,
|
||||
// Get priority from the result (stored during render)
|
||||
priority := result.Priority
|
||||
|
||||
var diffBytes []byte
|
||||
ctx, diffBytes, err = h.applyRenderedPolicyResult(ctx, app, result, sequence, priority)
|
||||
var policyChanges *PolicyChanges
|
||||
ctx, policyChanges, err = h.applyRenderedPolicyResult(ctx, app, result, sequence, priority)
|
||||
if err != nil {
|
||||
return ctx, errors.Wrapf(err, "failed to apply global policy %s", result.PolicyName)
|
||||
}
|
||||
|
||||
// Store diff if policy modified spec
|
||||
if diffBytes != nil && len(diffBytes) > 0 {
|
||||
allDiffs[result.PolicyName] = diffBytes
|
||||
// Store changes and metadata for ConfigMap storage
|
||||
if policyChanges != nil {
|
||||
allPolicyChanges[result.PolicyName] = policyChanges
|
||||
}
|
||||
|
||||
// Increment sequence only if policy was applied (enabled=true)
|
||||
if result.Enabled {
|
||||
sequence++
|
||||
policyMetadata[result.PolicyName] = &policyConfigMapMetadata{
|
||||
Name: result.PolicyName,
|
||||
Namespace: result.PolicyNamespace,
|
||||
Source: "global",
|
||||
Sequence: sequence,
|
||||
Priority: priority,
|
||||
}
|
||||
sequence++ // Increment sequence only if policy was applied (enabled=true)
|
||||
}
|
||||
}
|
||||
} else if shouldSkipGlobalPolicies(app) {
|
||||
@@ -198,33 +216,110 @@ func (h *AppHandler) ApplyApplicationScopeTransforms(ctx monitorContext.Context,
|
||||
ctx.Info("Applying explicit Application-scoped policy", "policy", policy.Type, "name", policy.Name)
|
||||
|
||||
// Render and apply (not cached - explicit policies can have unique parameters)
|
||||
ctx, err = h.applyPolicyTransform(ctx, app, policy, templ.PolicyDefinition)
|
||||
var changes *PolicyChanges
|
||||
ctx, changes, err = h.applyPolicyTransform(ctx, app, policy, templ.PolicyDefinition)
|
||||
if err != nil {
|
||||
// Record failure in status
|
||||
recordApplicationPolicyStatus(app, policy.Name, templ.PolicyDefinition.Namespace, "explicit", sequence, 0, false, fmt.Sprintf("error: %s", err.Error()), nil)
|
||||
return ctx, errors.Wrapf(err, "failed to apply transform from policy %s", policy.Type)
|
||||
}
|
||||
|
||||
// Record successful application in status
|
||||
if changes != nil {
|
||||
// Determine if spec was modified by checking if transforms.Spec exists
|
||||
changes.SpecModified = changes.Transforms != nil && changes.Transforms.Spec != nil
|
||||
allPolicyChanges[policy.Name] = changes // Store for ConfigMap serialization
|
||||
}
|
||||
recordApplicationPolicyStatus(app, policy.Name, templ.PolicyDefinition.Namespace, "explicit", sequence, 0, true, "", changes)
|
||||
|
||||
// Store metadata for ConfigMap
|
||||
policyMetadata[policy.Name] = &policyConfigMapMetadata{
|
||||
Name: policy.Name,
|
||||
Namespace: templ.PolicyDefinition.Namespace,
|
||||
Source: "explicit",
|
||||
Sequence: sequence,
|
||||
Priority: 0, // Explicit policies don't have priority
|
||||
}
|
||||
sequence++
|
||||
}
|
||||
|
||||
// Step 4: Store all spec diffs in a single ConfigMap (if any policies modified spec)
|
||||
if len(allDiffs) > 0 {
|
||||
// Build ConfigMap data with sequence-prefixed keys
|
||||
orderedData := make(map[string]string)
|
||||
for _, appliedPolicy := range app.Status.AppliedGlobalPolicies {
|
||||
if diff, ok := allDiffs[appliedPolicy.Name]; ok {
|
||||
key := fmt.Sprintf("%03d-%s", appliedPolicy.Sequence, appliedPolicy.Name)
|
||||
orderedData[key] = string(diff)
|
||||
// Step 4: Store all rendered policy outputs in ConfigMap for reuse and observability
|
||||
// This creates a persistent cache with TTL that can be used to avoid re-rendering
|
||||
orderedData := make(map[string]string)
|
||||
|
||||
// Compute hash of Application state (spec + metadata) for cache invalidation
|
||||
appHash, err := computeApplicationHash(app)
|
||||
if err != nil {
|
||||
ctx.Info("Failed to compute Application hash", "error", err)
|
||||
appHash = "" // Continue without hash
|
||||
}
|
||||
|
||||
// Build ConfigMap data from metadata and changes tracked during reconciliation
|
||||
for policyName, metadata := range policyMetadata {
|
||||
// Get TTL from PolicyDefinition (default -1 = never refresh)
|
||||
ttlSeconds := int32(-1)
|
||||
policyDef := &v1beta1.PolicyDefinition{}
|
||||
if err := h.Client.Get(ctx, client.ObjectKey{Name: metadata.Name, Namespace: metadata.Namespace}, policyDef); err == nil {
|
||||
ttlSeconds = policyDef.Spec.CacheTTLSeconds
|
||||
}
|
||||
|
||||
// Build the rendered policy record with everything needed to reapply it
|
||||
policyRecord := map[string]interface{}{
|
||||
"policy": metadata.Name,
|
||||
"namespace": metadata.Namespace,
|
||||
"source": metadata.Source,
|
||||
"sequence": metadata.Sequence,
|
||||
"priority": metadata.Priority,
|
||||
"rendered_at": time.Now().Format(time.RFC3339),
|
||||
"ttl_seconds": ttlSeconds,
|
||||
"enabled": true,
|
||||
"application_hash": appHash, // Hash of Application for cache invalidation
|
||||
}
|
||||
|
||||
// Get the full policy changes if available (includes transforms, labels, annotations, context)
|
||||
if policyChanges, ok := allPolicyChanges[policyName]; ok && policyChanges != nil {
|
||||
// Store the full transforms object in reusable format
|
||||
if policyChanges.Transforms != nil {
|
||||
transformsData := serializeTransformsForStorage(policyChanges.Transforms)
|
||||
if len(transformsData) > 0 {
|
||||
policyRecord["transforms"] = transformsData
|
||||
}
|
||||
}
|
||||
|
||||
// Add additional context if available
|
||||
if policyChanges.AdditionalContext != nil && len(policyChanges.AdditionalContext) > 0 {
|
||||
policyRecord["additional_context"] = policyChanges.AdditionalContext
|
||||
}
|
||||
|
||||
// Add observability summary
|
||||
policyRecord["summary"] = map[string]interface{}{
|
||||
"labels_added": len(policyChanges.AddedLabels),
|
||||
"annotations_added": len(policyChanges.AddedAnnotations),
|
||||
"spec_modified": policyChanges.SpecModified,
|
||||
"has_context": len(policyChanges.AdditionalContext) > 0,
|
||||
}
|
||||
}
|
||||
|
||||
// Create/update ConfigMap
|
||||
if len(orderedData) > 0 {
|
||||
err := createOrUpdateDiffsConfigMap(ctx, h.Client, app, orderedData)
|
||||
if err != nil {
|
||||
ctx.Info("Failed to store policy diffs in ConfigMap", "error", err)
|
||||
// Don't fail reconciliation - observability is optional
|
||||
} else {
|
||||
app.Status.PolicyDiffsConfigMap = fmt.Sprintf("%s-policy-diffs", app.Name)
|
||||
ctx.Info("Stored policy diffs in ConfigMap", "configmap", app.Status.PolicyDiffsConfigMap, "count", len(orderedData))
|
||||
}
|
||||
// Marshal to pretty JSON for human readability and tool consumption
|
||||
policyJSON, err := json.MarshalIndent(policyRecord, "", " ")
|
||||
if err != nil {
|
||||
ctx.Info("Failed to marshal policy record", "policy", metadata.Name, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
key := fmt.Sprintf("%03d-%s", metadata.Sequence, metadata.Name)
|
||||
orderedData[key] = string(policyJSON)
|
||||
}
|
||||
|
||||
// Create/update ConfigMap if any policies were applied
|
||||
if len(orderedData) > 0 {
|
||||
err := createOrUpdateDiffsConfigMap(ctx, h.Client, app, orderedData)
|
||||
if err != nil {
|
||||
ctx.Info("Failed to store policy records in ConfigMap", "error", err)
|
||||
// Don't fail reconciliation - observability/caching is optional
|
||||
} else {
|
||||
app.Status.ApplicationPoliciesConfigMap = fmt.Sprintf("application-policies-%s-%s", app.Namespace, app.Name)
|
||||
ctx.Info("Stored policy records in ConfigMap", "configmap", app.Status.ApplicationPoliciesConfigMap, "policies", len(orderedData))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -233,30 +328,37 @@ func (h *AppHandler) ApplyApplicationScopeTransforms(ctx monitorContext.Context,
|
||||
|
||||
// applyPolicyTransform renders the policy's CUE template and applies transforms to the Application.
|
||||
// Returns the updated context with any additionalContext merged in.
|
||||
func (h *AppHandler) applyPolicyTransform(ctx monitorContext.Context, app *v1beta1.Application, policyRef v1beta1.AppPolicy, policyDef *v1beta1.PolicyDefinition) (monitorContext.Context, error) {
|
||||
func (h *AppHandler) applyPolicyTransform(ctx monitorContext.Context, app *v1beta1.Application, policyRef v1beta1.AppPolicy, policyDef *v1beta1.PolicyDefinition) (monitorContext.Context, *PolicyChanges, error) {
|
||||
// Validate policy has CUE schematic
|
||||
if policyDef.Spec.Schematic == nil || policyDef.Spec.Schematic.CUE == nil {
|
||||
return ctx, errors.Errorf("Application-scoped policy %s must have a CUE schematic", policyDef.Name)
|
||||
return ctx, nil, errors.Errorf("Application-scoped policy %s must have a CUE schematic", policyDef.Name)
|
||||
}
|
||||
|
||||
// Parse policy parameters
|
||||
var policyParams map[string]interface{}
|
||||
if policyRef.Properties != nil && len(policyRef.Properties.Raw) > 0 {
|
||||
if err := json.Unmarshal(policyRef.Properties.Raw, &policyParams); err != nil {
|
||||
return ctx, errors.Wrap(err, "failed to unmarshal policy parameters")
|
||||
return ctx, nil, errors.Wrap(err, "failed to unmarshal policy parameters")
|
||||
}
|
||||
}
|
||||
|
||||
// Render the CUE template with context.application
|
||||
rendered, err := h.renderPolicyCUETemplate(ctx, app, policyParams, policyDef)
|
||||
// Load prior cached result (if any) to pass as context.prior
|
||||
// This allows the policy template to access previous rendered values
|
||||
var priorResult map[string]interface{}
|
||||
if app.Status.ApplicationPoliciesConfigMap != "" {
|
||||
priorResult, _ = loadCachedPolicyFromConfigMap(ctx, h.Client, app, policyDef.Name, -1) // Always load regardless of TTL
|
||||
}
|
||||
|
||||
// Render the CUE template with context.application and context.prior
|
||||
rendered, err := h.renderPolicyCUETemplate(ctx, app, policyParams, policyDef, priorResult)
|
||||
if err != nil {
|
||||
return ctx, errors.Wrap(err, "failed to render CUE template")
|
||||
return ctx, nil, errors.Wrap(err, "failed to render CUE template")
|
||||
}
|
||||
|
||||
// Check if the transform should be applied (default: true)
|
||||
shouldApply, err := h.extractEnabled(rendered)
|
||||
if err != nil {
|
||||
return ctx, errors.Wrap(err, "failed to extract enabled")
|
||||
return ctx, nil, errors.Wrap(err, "failed to extract enabled")
|
||||
}
|
||||
|
||||
if !shouldApply {
|
||||
@@ -265,38 +367,84 @@ func (h *AppHandler) applyPolicyTransform(ctx monitorContext.Context, app *v1bet
|
||||
// Note: This should not happen for explicit policies as we validate earlier
|
||||
// that global policies cannot be explicitly referenced
|
||||
if policyDef.Spec.Global {
|
||||
recordGlobalPolicyStatus(app, policyRef.Name, policyDef.Namespace, 0, policyDef.Spec.Priority, false, "enabled=false", nil)
|
||||
recordApplicationPolicyStatus(app, policyRef.Name, policyDef.Namespace, "global", 0, policyDef.Spec.Priority, false, "enabled=false", nil)
|
||||
}
|
||||
return ctx, nil
|
||||
return ctx, nil, nil
|
||||
}
|
||||
|
||||
// Extract transforms field
|
||||
transforms, err := h.extractTransforms(rendered)
|
||||
if err != nil {
|
||||
return ctx, errors.Wrap(err, "failed to extract transforms")
|
||||
return ctx, nil, errors.Wrap(err, "failed to extract transforms")
|
||||
}
|
||||
|
||||
// Track changes from before transform application
|
||||
changes := &PolicyChanges{
|
||||
Enabled: true, // We already checked it's enabled above
|
||||
Transforms: transforms,
|
||||
}
|
||||
|
||||
// Take snapshot of labels and annotations BEFORE applying transform
|
||||
labelsBefore := make(map[string]string)
|
||||
if app.Labels != nil {
|
||||
for k, v := range app.Labels {
|
||||
labelsBefore[k] = v
|
||||
}
|
||||
}
|
||||
annotationsBefore := make(map[string]string)
|
||||
if app.Annotations != nil {
|
||||
for k, v := range app.Annotations {
|
||||
annotationsBefore[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
// Apply transforms to the in-memory Application
|
||||
if transforms != nil {
|
||||
if err := h.applyTransformsToApplication(ctx, app, transforms); err != nil {
|
||||
return ctx, errors.Wrap(err, "failed to apply transforms")
|
||||
return ctx, nil, errors.Wrap(err, "failed to apply transforms")
|
||||
}
|
||||
ctx.Info("Applied transforms to Application", "policy", policyRef.Type)
|
||||
}
|
||||
|
||||
// Compare AFTER to capture actual changes (works regardless of how CUE modifies the Application)
|
||||
labelsAdded := make(map[string]string)
|
||||
if app.Labels != nil {
|
||||
for k, v := range app.Labels {
|
||||
if labelsBefore[k] != v {
|
||||
labelsAdded[k] = v
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(labelsAdded) > 0 {
|
||||
changes.AddedLabels = labelsAdded
|
||||
}
|
||||
|
||||
annotationsAdded := make(map[string]string)
|
||||
if app.Annotations != nil {
|
||||
for k, v := range app.Annotations {
|
||||
if annotationsBefore[k] != v {
|
||||
annotationsAdded[k] = v
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(annotationsAdded) > 0 {
|
||||
changes.AddedAnnotations = annotationsAdded
|
||||
}
|
||||
|
||||
// Extract and store additionalContext
|
||||
additionalContext, err := h.extractAdditionalContext(rendered)
|
||||
if err != nil {
|
||||
return ctx, errors.Wrap(err, "failed to extract additionalContext")
|
||||
return ctx, nil, errors.Wrap(err, "failed to extract additionalContext")
|
||||
}
|
||||
|
||||
if additionalContext != nil {
|
||||
ctx = storeAdditionalContextInCtx(ctx, additionalContext)
|
||||
ctx.Info("Stored additionalContext in context", "policy", policyRef.Type, "keys", len(additionalContext))
|
||||
changes.AdditionalContext = additionalContext
|
||||
}
|
||||
|
||||
ctx.Info("Successfully applied transform", "policy", policyRef.Type)
|
||||
return ctx, nil
|
||||
return ctx, changes, nil
|
||||
}
|
||||
|
||||
// renderPolicy renders a policy's CUE template and extracts the results for caching
|
||||
@@ -309,6 +457,33 @@ func (h *AppHandler) renderPolicy(ctx monitorContext.Context, app *v1beta1.Appli
|
||||
Enabled: false,
|
||||
}
|
||||
|
||||
// Check if we have a valid cached result based on TTL
|
||||
ttlSeconds := policyDef.Spec.CacheTTLSeconds
|
||||
cachedRecord, err := loadCachedPolicyFromConfigMap(ctx, h.Client, app, policyDef.Name, ttlSeconds)
|
||||
if err != nil {
|
||||
ctx.Info("Failed to load cached policy from ConfigMap", "policy", policyDef.Name, "error", err)
|
||||
// Continue with rendering
|
||||
} else if cachedRecord != nil {
|
||||
ctx.Info("Using cached policy result", "policy", policyDef.Name, "ttl", ttlSeconds)
|
||||
|
||||
// Deserialize the cached result
|
||||
if transformsData, ok := cachedRecord["transforms"].(map[string]interface{}); ok {
|
||||
result.Transforms = deserializeTransformsFromStorage(transformsData)
|
||||
}
|
||||
|
||||
if additionalContext, ok := cachedRecord["additional_context"].(map[string]interface{}); ok {
|
||||
result.AdditionalContext = additionalContext
|
||||
}
|
||||
|
||||
if enabled, ok := cachedRecord["enabled"].(bool); ok {
|
||||
result.Enabled = enabled
|
||||
} else {
|
||||
result.Enabled = true // Default
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// Validate policy has CUE schematic
|
||||
if policyDef.Spec.Schematic == nil || policyDef.Spec.Schematic.CUE == nil {
|
||||
result.SkipReason = "no CUE schematic"
|
||||
@@ -324,8 +499,15 @@ func (h *AppHandler) renderPolicy(ctx monitorContext.Context, app *v1beta1.Appli
|
||||
}
|
||||
}
|
||||
|
||||
// Render the CUE template with context.application
|
||||
rendered, err := h.renderPolicyCUETemplate(ctx, app, policyParams, policyDef)
|
||||
// Load prior cached result (if any) to pass as context.prior
|
||||
// Even if we're re-rendering (cache expired), pass the prior result to the template
|
||||
var priorResult map[string]interface{}
|
||||
if app.Status.ApplicationPoliciesConfigMap != "" {
|
||||
priorResult, _ = loadCachedPolicyFromConfigMap(ctx, h.Client, app, policyDef.Name, -1) // Always load regardless of TTL
|
||||
}
|
||||
|
||||
// Render the CUE template with context.application and context.prior
|
||||
rendered, err := h.renderPolicyCUETemplate(ctx, app, policyParams, policyDef, priorResult)
|
||||
if err != nil {
|
||||
result.SkipReason = fmt.Sprintf("CUE render error: %s", err.Error())
|
||||
return result, errors.Wrap(err, "failed to render CUE template")
|
||||
@@ -365,79 +547,67 @@ func (h *AppHandler) renderPolicy(ctx monitorContext.Context, app *v1beta1.Appli
|
||||
|
||||
// applyRenderedPolicyResult applies a cached/rendered policy result to the Application
|
||||
// This skips all the expensive CUE rendering and just applies the pre-computed transforms
|
||||
// Returns the diff bytes if spec was modified
|
||||
func (h *AppHandler) applyRenderedPolicyResult(ctx monitorContext.Context, app *v1beta1.Application, result RenderedPolicyResult, sequence int, priority int32) (monitorContext.Context, []byte, error) {
|
||||
// Returns the updated context and the PolicyChanges
|
||||
func (h *AppHandler) applyRenderedPolicyResult(ctx monitorContext.Context, app *v1beta1.Application, result RenderedPolicyResult, sequence int, priority int32) (monitorContext.Context, *PolicyChanges, error) {
|
||||
if !result.Enabled {
|
||||
ctx.Info("Skipping policy (from cache)", "policy", result.PolicyName, "reason", result.SkipReason)
|
||||
recordGlobalPolicyStatus(app, result.PolicyName, result.PolicyNamespace, sequence, priority, false, result.SkipReason, nil)
|
||||
recordApplicationPolicyStatus(app, result.PolicyName, result.PolicyNamespace, "global", sequence, priority, false, result.SkipReason, nil)
|
||||
return ctx, nil, nil
|
||||
}
|
||||
|
||||
// Cast transforms from interface{}
|
||||
transforms, ok := result.Transforms.(*PolicyTransforms)
|
||||
if !ok && result.Transforms != nil {
|
||||
return ctx, nil, errors.Errorf("cached transforms have invalid type for policy %s", result.PolicyName)
|
||||
}
|
||||
|
||||
// Track what changes we're making
|
||||
changes := &PolicyChanges{
|
||||
AdditionalContext: result.AdditionalContext,
|
||||
Enabled: result.Enabled,
|
||||
Transforms: transforms,
|
||||
}
|
||||
|
||||
// Take snapshot of spec BEFORE applying transforms (for diff tracking)
|
||||
specBefore, err := deepCopyAppSpec(&app.Spec)
|
||||
if err != nil {
|
||||
ctx.Info("Failed to copy spec for diff tracking", "error", err)
|
||||
specBefore = nil // Continue without diff tracking
|
||||
}
|
||||
|
||||
var diffBytes []byte
|
||||
|
||||
// Apply transforms to the in-memory Application
|
||||
if result.Transforms != nil {
|
||||
// Cast from interface{} back to *PolicyTransforms
|
||||
transforms, ok := result.Transforms.(*PolicyTransforms)
|
||||
if !ok {
|
||||
return ctx, nil, errors.Errorf("cached transforms have invalid type for policy %s", result.PolicyName)
|
||||
}
|
||||
|
||||
// Capture what labels/annotations/spec will be changed
|
||||
if transforms.Labels != nil && transforms.Labels.Value != nil {
|
||||
if labelMap, ok := transforms.Labels.Value.(map[string]interface{}); ok {
|
||||
changes.AddedLabels = make(map[string]string)
|
||||
for k, v := range labelMap {
|
||||
if str, ok := v.(string); ok {
|
||||
changes.AddedLabels[k] = str
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if transforms.Annotations != nil && transforms.Annotations.Value != nil {
|
||||
if annotationMap, ok := transforms.Annotations.Value.(map[string]interface{}); ok {
|
||||
changes.AddedAnnotations = make(map[string]string)
|
||||
for k, v := range annotationMap {
|
||||
if str, ok := v.(string); ok {
|
||||
changes.AddedAnnotations[k] = str
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if transforms.Spec != nil {
|
||||
changes.SpecModified = true
|
||||
}
|
||||
|
||||
if transforms != nil {
|
||||
if err := h.applyTransformsToApplication(ctx, app, transforms); err != nil {
|
||||
return ctx, nil, errors.Wrap(err, "failed to apply transforms")
|
||||
}
|
||||
ctx.Info("Applied cached transforms to Application", "policy", result.PolicyName)
|
||||
|
||||
// Compute diff if spec was modified
|
||||
if specBefore != nil && changes.SpecModified {
|
||||
// Check if spec actually changed
|
||||
if !reflect.DeepEqual(specBefore, &app.Spec) {
|
||||
diff, diffErr := computeJSONPatch(specBefore, &app.Spec)
|
||||
if diffErr != nil {
|
||||
ctx.Info("Failed to compute diff", "policy", result.PolicyName, "error", diffErr)
|
||||
} else {
|
||||
diffBytes = diff
|
||||
ctx.Info("Computed spec diff", "policy", result.PolicyName, "size", len(diff))
|
||||
// Extract changes directly from transforms
|
||||
if transforms.Labels != nil {
|
||||
if labelsMap, ok := transforms.Labels.Value.(map[string]string); ok {
|
||||
changes.AddedLabels = labelsMap
|
||||
} else if labelsMap, ok := transforms.Labels.Value.(map[string]interface{}); ok {
|
||||
// Convert from interface{} map
|
||||
stringMap := make(map[string]string)
|
||||
for k, v := range labelsMap {
|
||||
if strVal, ok := v.(string); ok {
|
||||
stringMap[k] = strVal
|
||||
}
|
||||
}
|
||||
changes.AddedLabels = stringMap
|
||||
}
|
||||
}
|
||||
|
||||
if transforms.Annotations != nil {
|
||||
if annotationsMap, ok := transforms.Annotations.Value.(map[string]string); ok {
|
||||
changes.AddedAnnotations = annotationsMap
|
||||
} else if annotationsMap, ok := transforms.Annotations.Value.(map[string]interface{}); ok {
|
||||
// Convert from interface{} map
|
||||
stringMap := make(map[string]string)
|
||||
for k, v := range annotationsMap {
|
||||
if strVal, ok := v.(string); ok {
|
||||
stringMap[k] = strVal
|
||||
}
|
||||
}
|
||||
changes.AddedAnnotations = stringMap
|
||||
}
|
||||
}
|
||||
|
||||
// Check if spec was modified
|
||||
changes.SpecModified = transforms.Spec != nil
|
||||
}
|
||||
|
||||
// Store additionalContext in context
|
||||
@@ -446,47 +616,26 @@ func (h *AppHandler) applyRenderedPolicyResult(ctx monitorContext.Context, app *
|
||||
ctx.Info("Stored cached additionalContext in context", "policy", result.PolicyName, "keys", len(result.AdditionalContext))
|
||||
}
|
||||
|
||||
recordGlobalPolicyStatus(app, result.PolicyName, result.PolicyNamespace, sequence, priority, true, "", changes)
|
||||
recordApplicationPolicyStatus(app, result.PolicyName, result.PolicyNamespace, "global", sequence, priority, true, "", changes)
|
||||
ctx.Info("Successfully applied cached policy result", "policy", result.PolicyName)
|
||||
return ctx, diffBytes, nil
|
||||
return ctx, changes, nil
|
||||
}
|
||||
|
||||
// policyTransformSchema provides type-safe schema for Application-scoped policy transforms
|
||||
const policyTransformSchema = `
|
||||
parameter: {[string]: _}
|
||||
enabled: *true | bool
|
||||
transforms?: {
|
||||
spec?: {
|
||||
type: "replace" | "merge"
|
||||
value: {...}
|
||||
}
|
||||
labels?: {
|
||||
type: "merge"
|
||||
value: {[string]: string}
|
||||
}
|
||||
annotations?: {
|
||||
type: "merge"
|
||||
value: {[string]: string}
|
||||
}
|
||||
}
|
||||
additionalContext?: {...}
|
||||
context: {
|
||||
application: {...}
|
||||
}
|
||||
`
|
||||
|
||||
// renderPolicyCUETemplate renders the policy CUE template with parameter and context.application
|
||||
func (h *AppHandler) renderPolicyCUETemplate(ctx monitorContext.Context, app *v1beta1.Application, params map[string]interface{}, policyDef *v1beta1.PolicyDefinition) (cue.Value, error) {
|
||||
// Build CUE source with parameter and context
|
||||
// Follows the same pattern as workloadDef.Complete() and traitDef.Complete() to properly handle import statements
|
||||
func (h *AppHandler) renderPolicyCUETemplate(ctx monitorContext.Context, app *v1beta1.Application, params map[string]interface{}, policyDef *v1beta1.PolicyDefinition, priorResult map[string]interface{}) (cue.Value, error) {
|
||||
// Build CUE source following the pattern from pkg/cue/definition/template.go
|
||||
// Order matters: template (with imports) must come first, then type annotations, then values
|
||||
var cueSources []string
|
||||
|
||||
// Add type safety schema
|
||||
cueSources = append(cueSources, policyTransformSchema)
|
||||
|
||||
// Add the policy template
|
||||
// 1. Add the policy template FIRST (preserves any import statements at the top)
|
||||
cueSources = append(cueSources, policyDef.Spec.Schematic.CUE.Template)
|
||||
|
||||
// Add parameter
|
||||
// 2. Add type annotations (following renderTemplate() pattern from template.go:489)
|
||||
cueSources = append(cueSources, "parameter: _")
|
||||
cueSources = append(cueSources, "context: _")
|
||||
|
||||
// 3. Add parameter values
|
||||
if params != nil {
|
||||
paramJSON, err := json.Marshal(params)
|
||||
if err != nil {
|
||||
@@ -497,13 +646,22 @@ func (h *AppHandler) renderPolicyCUETemplate(ctx monitorContext.Context, app *v1
|
||||
cueSources = append(cueSources, "parameter: {}")
|
||||
}
|
||||
|
||||
// Add context.application (convert Application to JSON)
|
||||
// 4. Add context.application (convert Application to JSON)
|
||||
appJSON, err := json.Marshal(app)
|
||||
if err != nil {
|
||||
return cue.Value{}, errors.Wrap(err, "failed to marshal Application")
|
||||
}
|
||||
cueSources = append(cueSources, fmt.Sprintf("context: application: %s", string(appJSON)))
|
||||
|
||||
// 5. Add context.prior if available (previous cached policy result)
|
||||
if priorResult != nil {
|
||||
priorJSON, err := json.Marshal(priorResult)
|
||||
if err != nil {
|
||||
return cue.Value{}, errors.Wrap(err, "failed to marshal prior result")
|
||||
}
|
||||
cueSources = append(cueSources, fmt.Sprintf("context: prior: %s", string(priorJSON)))
|
||||
}
|
||||
|
||||
// Compile the CUE using the default CueX compiler
|
||||
cueSource := strings.Join(cueSources, "\n")
|
||||
val, err := cuex.DefaultCompiler.Get().CompileString(ctx.GetContext(), cueSource)
|
||||
@@ -832,26 +990,26 @@ func validateNotGlobalPolicy(ctx monitorContext.Context, cli client.Client, poli
|
||||
return nil
|
||||
}
|
||||
|
||||
// recordGlobalPolicyStatus records the application status of a global policy
|
||||
func recordGlobalPolicyStatus(app *v1beta1.Application, policyName, policyNamespace string, sequence int, priority int32, applied bool, reason string, changes *PolicyChanges) {
|
||||
entry := common.AppliedGlobalPolicy{
|
||||
// recordApplicationPolicyStatus records the application status of an Application-scoped policy
|
||||
// (global or explicit)
|
||||
func recordApplicationPolicyStatus(app *v1beta1.Application, policyName, policyNamespace, source string, sequence int, priority int32, applied bool, reason string, changes *PolicyChanges) {
|
||||
entry := common.AppliedApplicationPolicy{
|
||||
Name: policyName,
|
||||
Namespace: policyNamespace,
|
||||
Applied: applied,
|
||||
Reason: reason,
|
||||
Sequence: sequence,
|
||||
Priority: priority,
|
||||
}
|
||||
|
||||
// Record what was changed (if policy was applied)
|
||||
// Record summary counts of what was changed (if policy was applied)
|
||||
// Full details are stored in the ApplicationPoliciesConfigMap
|
||||
if applied && changes != nil {
|
||||
entry.AddedLabels = changes.AddedLabels
|
||||
entry.AddedAnnotations = changes.AddedAnnotations
|
||||
entry.AdditionalContext = changes.AdditionalContext
|
||||
entry.SpecModified = changes.SpecModified
|
||||
entry.LabelsCount = len(changes.AddedLabels)
|
||||
entry.AnnotationsCount = len(changes.AddedAnnotations)
|
||||
entry.HasContext = changes.AdditionalContext != nil && len(changes.AdditionalContext) > 0
|
||||
}
|
||||
|
||||
app.Status.AppliedGlobalPolicies = append(app.Status.AppliedGlobalPolicies, entry)
|
||||
app.Status.AppliedApplicationPolicies = append(app.Status.AppliedApplicationPolicies, entry)
|
||||
}
|
||||
|
||||
// PolicyChanges tracks what a policy modified
|
||||
@@ -860,6 +1018,188 @@ type PolicyChanges struct {
|
||||
AddedAnnotations map[string]string
|
||||
AdditionalContext map[string]interface{}
|
||||
SpecModified bool
|
||||
|
||||
// Full rendered output for caching/reuse
|
||||
Enabled bool
|
||||
Transforms *PolicyTransforms
|
||||
}
|
||||
|
||||
// policyConfigMapMetadata tracks metadata needed for ConfigMap storage
|
||||
type policyConfigMapMetadata struct {
|
||||
Name string
|
||||
Namespace string
|
||||
Source string // "global" or "explicit"
|
||||
Sequence int
|
||||
Priority int32
|
||||
}
|
||||
|
||||
// serializeTransformsForStorage converts PolicyTransforms to a format suitable for storage and reuse
|
||||
func serializeTransformsForStorage(transforms *PolicyTransforms) map[string]interface{} {
|
||||
if transforms == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
result := make(map[string]interface{})
|
||||
|
||||
if transforms.Labels != nil {
|
||||
result["labels"] = map[string]interface{}{
|
||||
"type": transforms.Labels.Type,
|
||||
"value": transforms.Labels.Value,
|
||||
}
|
||||
}
|
||||
|
||||
if transforms.Annotations != nil {
|
||||
result["annotations"] = map[string]interface{}{
|
||||
"type": transforms.Annotations.Type,
|
||||
"value": transforms.Annotations.Value,
|
||||
}
|
||||
}
|
||||
|
||||
if transforms.Spec != nil {
|
||||
result["spec"] = map[string]interface{}{
|
||||
"type": transforms.Spec.Type,
|
||||
"value": transforms.Spec.Value,
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// loadCachedPolicyFromConfigMap attempts to load a cached policy result from the ConfigMap
|
||||
// Returns the cached result if found and valid according to TTL and Application state, nil otherwise
|
||||
func loadCachedPolicyFromConfigMap(ctx context.Context, cli client.Client, app *v1beta1.Application, policyName string, ttlSeconds int32) (map[string]interface{}, error) {
|
||||
if app.Status.ApplicationPoliciesConfigMap == "" {
|
||||
return nil, nil // No ConfigMap exists yet
|
||||
}
|
||||
|
||||
// Get the ConfigMap
|
||||
cm := &corev1.ConfigMap{}
|
||||
cmName := app.Status.ApplicationPoliciesConfigMap
|
||||
if err := cli.Get(ctx, client.ObjectKey{Name: cmName, Namespace: app.Namespace}, cm); err != nil {
|
||||
if client.IgnoreNotFound(err) == nil {
|
||||
return nil, nil // ConfigMap doesn't exist
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Find the entry for this policy
|
||||
var cachedData string
|
||||
for key, value := range cm.Data {
|
||||
// Keys are formatted as "001-policy-name"
|
||||
if strings.HasSuffix(key, "-"+policyName) {
|
||||
cachedData = value
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if cachedData == "" {
|
||||
return nil, nil // Policy not in ConfigMap
|
||||
}
|
||||
|
||||
// Parse the cached record
|
||||
var record map[string]interface{}
|
||||
if err := json.Unmarshal([]byte(cachedData), &record); err != nil {
|
||||
return nil, errors.Wrap(err, "failed to unmarshal cached policy record")
|
||||
}
|
||||
|
||||
// Check if Application state has changed (cache invalidation)
|
||||
currentHash, err := computeApplicationHash(app)
|
||||
if err == nil && currentHash != "" {
|
||||
cachedHash, _ := record["application_hash"].(string)
|
||||
if cachedHash != currentHash {
|
||||
// Application changed - cache is invalid even if TTL hasn't expired
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Check TTL
|
||||
if ttlSeconds == -1 {
|
||||
// Never refresh - always use cached (if Application hasn't changed)
|
||||
return record, nil
|
||||
}
|
||||
|
||||
if ttlSeconds == 0 {
|
||||
// Never cache - always re-render
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Check if cache is still valid by time
|
||||
renderedAtStr, ok := record["rendered_at"].(string)
|
||||
if !ok {
|
||||
return nil, nil // Invalid format
|
||||
}
|
||||
|
||||
renderedAt, err := time.Parse(time.RFC3339, renderedAtStr)
|
||||
if err != nil {
|
||||
return nil, nil // Invalid timestamp
|
||||
}
|
||||
|
||||
elapsed := time.Since(renderedAt)
|
||||
ttl := time.Duration(ttlSeconds) * time.Second
|
||||
|
||||
if elapsed < ttl {
|
||||
// Cache is still valid (both time and Application state)
|
||||
return record, nil
|
||||
}
|
||||
|
||||
// Cache expired
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// computeApplicationHash computes a hash of the Application state that affects policy rendering
|
||||
// This includes spec, labels, and annotations. Used for cache invalidation.
|
||||
func computeApplicationHash(app *v1beta1.Application) (string, error) {
|
||||
// Build a structure with only the fields that affect policy rendering
|
||||
hashInput := map[string]interface{}{
|
||||
"spec": app.Spec,
|
||||
"labels": app.Labels,
|
||||
"annotations": app.Annotations,
|
||||
}
|
||||
|
||||
// Marshal to JSON for consistent hashing
|
||||
jsonBytes, err := json.Marshal(hashInput)
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, "failed to marshal Application for hashing")
|
||||
}
|
||||
|
||||
// Compute SHA256 hash
|
||||
hash := sha256.Sum256(jsonBytes)
|
||||
return hex.EncodeToString(hash[:]), nil
|
||||
}
|
||||
|
||||
// deserializeTransformsFromStorage converts stored transforms back to PolicyTransforms
|
||||
func deserializeTransformsFromStorage(transformsData map[string]interface{}) *PolicyTransforms {
|
||||
if transformsData == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
transforms := &PolicyTransforms{}
|
||||
|
||||
if labelsData, ok := transformsData["labels"].(map[string]interface{}); ok {
|
||||
typeStr, _ := labelsData["type"].(string)
|
||||
transforms.Labels = &Transform{
|
||||
Type: TransformOperationType(typeStr),
|
||||
Value: labelsData["value"],
|
||||
}
|
||||
}
|
||||
|
||||
if annotationsData, ok := transformsData["annotations"].(map[string]interface{}); ok {
|
||||
typeStr, _ := annotationsData["type"].(string)
|
||||
transforms.Annotations = &Transform{
|
||||
Type: TransformOperationType(typeStr),
|
||||
Value: annotationsData["value"],
|
||||
}
|
||||
}
|
||||
|
||||
if specData, ok := transformsData["spec"].(map[string]interface{}); ok {
|
||||
typeStr, _ := specData["type"].(string)
|
||||
transforms.Spec = &Transform{
|
||||
Type: TransformOperationType(typeStr),
|
||||
Value: specData["value"],
|
||||
}
|
||||
}
|
||||
|
||||
return transforms
|
||||
}
|
||||
|
||||
// computeCurrentGlobalPolicyHash discovers current global policies and computes their hash
|
||||
@@ -926,9 +1266,9 @@ func computeJSONPatch(before, after *v1beta1.ApplicationSpec) ([]byte, error) {
|
||||
return patch, nil
|
||||
}
|
||||
|
||||
// createOrUpdateDiffsConfigMap creates or updates a ConfigMap containing all policy diffs
|
||||
// createOrUpdateDiffsConfigMap creates or updates a ConfigMap containing all policy records
|
||||
func createOrUpdateDiffsConfigMap(ctx context.Context, cli client.Client, app *v1beta1.Application, orderedData map[string]string) error {
|
||||
cmName := fmt.Sprintf("%s-policy-diffs", app.Name)
|
||||
cmName := fmt.Sprintf("application-policies-%s-%s", app.Namespace, app.Name)
|
||||
|
||||
// Create the ConfigMap
|
||||
cm := &corev1.ConfigMap{
|
||||
@@ -937,8 +1277,9 @@ func createOrUpdateDiffsConfigMap(ctx context.Context, cli client.Client, app *v
|
||||
Namespace: app.Namespace,
|
||||
OwnerReferences: []metav1.OwnerReference{
|
||||
{
|
||||
APIVersion: app.APIVersion,
|
||||
Kind: app.Kind,
|
||||
// Use hardcoded values since TypeMeta is cleared by k8s client after Create/Get
|
||||
APIVersion: v1beta1.SchemeGroupVersion.String(),
|
||||
Kind: v1beta1.ApplicationKind,
|
||||
Name: app.Name,
|
||||
UID: app.UID,
|
||||
Controller: ptrBool(true),
|
||||
@@ -949,6 +1290,19 @@ func createOrUpdateDiffsConfigMap(ctx context.Context, cli client.Client, app *v
|
||||
Data: orderedData,
|
||||
}
|
||||
|
||||
// Add standard KubeVela labels (following ResourceTracker pattern)
|
||||
meta.AddLabels(cm, map[string]string{
|
||||
oam.LabelAppName: app.Name,
|
||||
oam.LabelAppNamespace: app.Namespace,
|
||||
oam.LabelAppUID: string(app.UID),
|
||||
"app.oam.dev/application-policies": "true", // Identify this as an application-policies ConfigMap
|
||||
})
|
||||
|
||||
// Add annotations to track update time
|
||||
meta.AddAnnotations(cm, map[string]string{
|
||||
oam.AnnotationLastAppliedTime: time.Now().Format(time.RFC3339),
|
||||
})
|
||||
|
||||
// Try to create the ConfigMap
|
||||
err := cli.Create(ctx, cm)
|
||||
if err != nil {
|
||||
@@ -960,8 +1314,11 @@ func createOrUpdateDiffsConfigMap(ctx context.Context, cli client.Client, app *v
|
||||
return errors.Wrap(getErr, "failed to get existing ConfigMap")
|
||||
}
|
||||
|
||||
// Update data
|
||||
// Update data and annotations
|
||||
existing.Data = orderedData
|
||||
meta.AddAnnotations(existing, map[string]string{
|
||||
oam.AnnotationLastAppliedTime: time.Now().Format(time.RFC3339),
|
||||
})
|
||||
if updateErr := cli.Update(ctx, existing); updateErr != nil {
|
||||
return errors.Wrap(updateErr, "failed to update ConfigMap")
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@@ -51,7 +51,9 @@ import (
|
||||
|
||||
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
|
||||
"github.com/oam-dev/kubevela/pkg/appfile"
|
||||
_ "github.com/oam-dev/kubevela/pkg/features" // Import to register feature gates
|
||||
"github.com/oam-dev/kubevela/pkg/multicluster"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
// +kubebuilder:scaffold:imports
|
||||
)
|
||||
|
||||
@@ -77,6 +79,11 @@ func TestAPIs(t *testing.T) {
|
||||
var _ = BeforeSuite(func() {
|
||||
logf.SetLogger(zap.New(zap.UseDevMode(true), zap.WriteTo(GinkgoWriter)))
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
|
||||
// Enable global policies feature gate for tests
|
||||
Expect(utilfeature.DefaultMutableFeatureGate.Set("EnableGlobalPolicies=true")).ToNot(HaveOccurred())
|
||||
logf.Log.Info("Enabled EnableGlobalPolicies feature gate for tests")
|
||||
|
||||
By("bootstrapping test environment")
|
||||
var yamlPath string
|
||||
if _, set := os.LookupEnv("COMPATIBILITY_TEST"); set {
|
||||
|
||||
Reference in New Issue
Block a user