Refactor: Pre-Validation Hooks to be More Extensible and Testable (#6978)

* refactor: pre-start hook implementation

- Introduced a new `hook.go` file defining the `PreStartHook` interface for pre-start validation hooks.
- Removed the old `pre_start_hook.go` file which contained the `SystemCRDValidationHook` implementation.
- Updated the server initialization code to use the new hook structure, specifically integrating the `crdvalidation` package for pre-start validation.
- Enhanced logging for pre-start hook execution to improve clarity on hook names and execution results.

Signed-off-by: Ayush Kumar <ayushshyamkumar888@gmail.com>

* refactor: move color writer implementation to logging package and update usage in server setup

Signed-off-by: Ayush Kumar <ayushshyamkumar888@gmail.com>

* refactor: rename Hook to CRDValidation for clarity and consistency

Signed-off-by: Ayush Kumar <ayushshyamkumar888@gmail.com>

* refactor: reorder import statements for consistency

Signed-off-by: Ayush Kumar <ayushshyamkumar888@gmail.com>

* refactor: replace hardcoded namespace with variable in cleanup function

Signed-off-by: Ayush Kumar <ayushshyamkumar888@gmail.com>

* refactor: rename CRDValidation type to Hook for consistency

Signed-off-by: Ayush Kumar <ayushshyamkumar888@gmail.com>

* refactor: enhance CRD validation hook with custom client support and improved error handling

Signed-off-by: Ayush Kumar <ayushshyamkumar888@gmail.com>

* refactor: extend timeout for CRD validation hook and improve error handling for slow API servers

Signed-off-by: Ayush Kumar <ayushshyamkumar888@gmail.com>

* refactor: remove redundant comments from PreStartHook definition

Signed-off-by: Ayush Kumar <ayushshyamkumar888@gmail.com>

---------

Signed-off-by: Ayush Kumar <ayushshyamkumar888@gmail.com>
This commit is contained in:
Ayush Kumar
2025-11-18 07:44:40 +05:30
committed by GitHub
parent d064d3dbd2
commit 0a599ad177
9 changed files with 853 additions and 147 deletions

View File

@@ -0,0 +1,229 @@
/*
Copyright 2022 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package crdvalidation
import (
"context"
"fmt"
"time"
"github.com/kubevela/pkg/util/compression"
"github.com/kubevela/pkg/util/k8s"
"github.com/kubevela/pkg/util/singleton"
"k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/oam-dev/kubevela/apis/core.oam.dev/common"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
"github.com/oam-dev/kubevela/apis/types"
"github.com/oam-dev/kubevela/cmd/core/app/hooks"
"github.com/oam-dev/kubevela/pkg/features"
"github.com/oam-dev/kubevela/pkg/oam"
)
// Hook validates that CRDs installed in the cluster are compatible with
// enabled feature gates. This prevents silent data corruption by failing
// fast at startup if CRDs are out of date.
type Hook struct {
client.Client
}
// NewHook creates a new CRD validation hook with the default singleton client
func NewHook() hooks.PreStartHook {
klog.V(3).InfoS("Initializing CRD validation hook", "client", "singleton")
return NewHookWithClient(singleton.KubeClient.Get())
}
// NewHookWithClient creates a new CRD validation hook with a specified client
// for improved testability and dependency injection
func NewHookWithClient(c client.Client) hooks.PreStartHook {
klog.V(3).InfoS("Initializing CRD validation hook with custom client")
return &Hook{Client: c}
}
// Name returns the hook name for logging
func (h *Hook) Name() string {
return "CRDValidation"
}
// Run executes the CRD validation logic. It checks if compression-related
// feature gates are enabled and validates that the ApplicationRevision CRD
// supports the required compression fields.
func (h *Hook) Run(ctx context.Context) error {
klog.InfoS("Starting CRD validation hook")
// Add a reasonable timeout to prevent indefinite hanging while allowing
// sufficient time for slower clusters or API servers under load.
// 2 minutes should be more than enough for any reasonable cluster setup
// while still protecting against indefinite hangs.
timeout := 2 * time.Minute
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
zstdEnabled := feature.DefaultMutableFeatureGate.Enabled(features.ZstdApplicationRevision)
gzipEnabled := feature.DefaultMutableFeatureGate.Enabled(features.GzipApplicationRevision)
klog.V(2).InfoS("Checking compression feature gates",
"zstdEnabled", zstdEnabled,
"gzipEnabled", gzipEnabled)
if !zstdEnabled && !gzipEnabled {
klog.InfoS("No compression features enabled, skipping CRD validation")
return nil
}
klog.InfoS("Compression features enabled, validating ApplicationRevision CRD compatibility")
if err := h.validateApplicationRevisionCRD(ctx, zstdEnabled, gzipEnabled); err != nil {
// Check if the error was due to context timeout
if ctx.Err() == context.DeadlineExceeded {
klog.ErrorS(err, "CRD validation timed out - API server may be slow or unresponsive",
"timeout", timeout.String(),
"suggestion", "Check API server health and network connectivity")
return fmt.Errorf("CRD validation timed out after %v: %w. API server may be slow or under heavy load", timeout, err)
}
klog.ErrorS(err, "CRD validation failed")
return fmt.Errorf("CRD validation failed: %w", err)
}
klog.InfoS("CRD validation completed successfully")
return nil
}
// validateApplicationRevisionCRD performs a round-trip test to ensure the
// ApplicationRevision CRD supports compression fields
func (h *Hook) validateApplicationRevisionCRD(ctx context.Context, zstdEnabled, gzipEnabled bool) error {
// Generate test resource
testName := fmt.Sprintf("core.pre-check.%d", time.Now().UnixNano())
namespace := k8s.GetRuntimeNamespace()
klog.V(2).InfoS("Creating test ApplicationRevision for CRD validation",
"name", testName,
"namespace", namespace)
// Ensure the namespace exists before attempting to create resources
if err := k8s.EnsureNamespace(ctx, h.Client, namespace); err != nil {
klog.ErrorS(err, "Failed to ensure runtime namespace exists",
"namespace", namespace)
return fmt.Errorf("runtime namespace %q does not exist or is not accessible: %w", namespace, err)
}
appRev := &v1beta1.ApplicationRevision{}
appRev.Name = testName
appRev.Namespace = namespace
appRev.SetLabels(map[string]string{oam.LabelPreCheck: types.VelaCoreName})
appRev.Spec.Application.Name = testName
appRev.Spec.Application.Spec.Components = []common.ApplicationComponent{}
// Set compression type based on enabled features
var compressionType compression.Type
if zstdEnabled {
compressionType = compression.Zstd
appRev.Spec.Compression.SetType(compression.Zstd)
klog.V(3).InfoS("Setting compression type", "type", "zstd")
} else if gzipEnabled {
compressionType = compression.Gzip
appRev.Spec.Compression.SetType(compression.Gzip)
klog.V(3).InfoS("Setting compression type", "type", "gzip")
}
// Register cleanup function
defer func() {
klog.V(2).InfoS("Cleaning up test ApplicationRevisions",
"namespace", namespace,
"label", oam.LabelPreCheck)
if err := h.Client.DeleteAllOf(ctx, &v1beta1.ApplicationRevision{},
client.InNamespace(namespace),
client.MatchingLabels{oam.LabelPreCheck: types.VelaCoreName}); err != nil {
klog.ErrorS(err, "Failed to clean up test ApplicationRevision resources",
"namespace", namespace)
} else {
klog.V(3).InfoS("Successfully cleaned up test ApplicationRevision resources")
}
}()
// Create test resource
klog.V(2).InfoS("Writing test ApplicationRevision to cluster")
if err := h.Client.Create(ctx, appRev); err != nil {
klog.ErrorS(err, "Failed to create test ApplicationRevision",
"name", testName,
"namespace", namespace)
return fmt.Errorf("failed to create test ApplicationRevision: %w", err)
}
klog.V(3).InfoS("Test ApplicationRevision created successfully")
// Read back the resource
key := client.ObjectKeyFromObject(appRev)
klog.V(2).InfoS("Reading back test ApplicationRevision from cluster",
"key", key.String())
if err := h.Client.Get(ctx, key, appRev); err != nil {
klog.ErrorS(err, "Failed to read back test ApplicationRevision",
"key", key.String())
return fmt.Errorf("failed to read test ApplicationRevision: %w", err)
}
klog.V(3).InfoS("Test ApplicationRevision read successfully")
// Validate round-trip integrity
klog.V(2).InfoS("Validating round-trip data integrity",
"expectedName", testName,
"actualName", appRev.Spec.Application.Name,
"expectedCompression", compressionType,
"actualCompression", appRev.Spec.Compression.Type)
// First check that basic data survived
if appRev.Spec.Application.Name != testName {
klog.ErrorS(nil, "CRD round-trip validation failed - basic data corruption detected",
"expectedName", testName,
"actualName", appRev.Spec.Application.Name,
"compressionType", compressionType,
"issue", "The ApplicationRevision CRD does not support compression fields")
return fmt.Errorf("the ApplicationRevision CRD is not updated. Compression cannot be used. Please upgrade your CRD to latest ones")
}
// Validate that compression fields survived the round-trip
switch compressionType {
case compression.Zstd:
if appRev.Spec.Compression.Type != compression.Zstd {
klog.ErrorS(nil, "CRD round-trip validation failed - zstd compression type lost",
"expected", compression.Zstd,
"actual", appRev.Spec.Compression.Type,
"issue", "The ApplicationRevision CRD does not support zstd compression fields")
return fmt.Errorf("ApplicationRevision CRD missing zstd compression support after round-trip; got=%v. Please upgrade your CRD to latest ones", appRev.Spec.Compression.Type)
}
case compression.Gzip:
if appRev.Spec.Compression.Type != compression.Gzip {
klog.ErrorS(nil, "CRD round-trip validation failed - gzip compression type lost",
"expected", compression.Gzip,
"actual", appRev.Spec.Compression.Type,
"issue", "The ApplicationRevision CRD does not support gzip compression fields")
return fmt.Errorf("ApplicationRevision CRD missing gzip compression support after round-trip; got=%v. Please upgrade your CRD to latest ones", appRev.Spec.Compression.Type)
}
case compression.Uncompressed:
// This case should never happen as we only set Zstd or Gzip above,
// but we need to handle it to satisfy the exhaustive linter
klog.V(3).InfoS("Compression type is uncompressed, which is unexpected in validation",
"compressionType", compressionType)
}
klog.V(2).InfoS("Round-trip validation passed - CRD supports compression",
"compressionType", compressionType)
return nil
}

View File

@@ -0,0 +1,274 @@
/*
Copyright 2022 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package crdvalidation_test
import (
"context"
"errors"
"testing"
"github.com/kubevela/pkg/util/compression"
"github.com/kubevela/pkg/util/k8s"
"github.com/kubevela/pkg/util/singleton"
"github.com/kubevela/pkg/util/test/bootstrap"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"strconv"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
"github.com/oam-dev/kubevela/apis/types"
"github.com/oam-dev/kubevela/cmd/core/app/hooks/crdvalidation"
"github.com/oam-dev/kubevela/pkg/features"
"github.com/oam-dev/kubevela/pkg/oam"
)
var _ = bootstrap.InitKubeBuilderForTest(bootstrap.WithCRDPath("./testdata"))
func TestCRDValidationHook(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "CRD Validation Hook Suite")
}
var _ = Describe("CRD validation hook", func() {
Context("with old CRD that lacks compression support", func() {
It("should detect incompatible CRD when zstd compression is enabled", func() {
featuregatetesting.SetFeatureGateDuringTest(GinkgoT(), utilfeature.DefaultFeatureGate, features.ZstdApplicationRevision, true)
featuregatetesting.SetFeatureGateDuringTest(GinkgoT(), utilfeature.DefaultFeatureGate, features.GzipApplicationRevision, false)
ctx := context.Background()
Expect(k8s.EnsureNamespace(ctx, singleton.KubeClient.Get(), types.DefaultKubeVelaNS)).Should(Succeed())
hook := crdvalidation.NewHook()
Expect(hook.Name()).Should(Equal("CRDValidation"))
err := hook.Run(ctx)
Expect(err).ShouldNot(Succeed())
// The old CRD doesn't preserve the application data at all, so we get a basic corruption error
Expect(err.Error()).Should(ContainSubstring("the ApplicationRevision CRD is not updated"))
})
It("should detect incompatible CRD when gzip compression is enabled", func() {
featuregatetesting.SetFeatureGateDuringTest(GinkgoT(), utilfeature.DefaultFeatureGate, features.ZstdApplicationRevision, false)
featuregatetesting.SetFeatureGateDuringTest(GinkgoT(), utilfeature.DefaultFeatureGate, features.GzipApplicationRevision, true)
ctx := context.Background()
Expect(k8s.EnsureNamespace(ctx, singleton.KubeClient.Get(), types.DefaultKubeVelaNS)).Should(Succeed())
hook := crdvalidation.NewHook()
err := hook.Run(ctx)
Expect(err).ShouldNot(Succeed())
// The old CRD doesn't preserve the application data at all, so we get a basic corruption error
Expect(err.Error()).Should(ContainSubstring("the ApplicationRevision CRD is not updated"))
})
})
It("should skip validation when compression features are disabled", func() {
featuregatetesting.SetFeatureGateDuringTest(GinkgoT(), utilfeature.DefaultFeatureGate, features.ZstdApplicationRevision, false)
featuregatetesting.SetFeatureGateDuringTest(GinkgoT(), utilfeature.DefaultFeatureGate, features.GzipApplicationRevision, false)
ctx := context.Background()
hook := crdvalidation.NewHook()
err := hook.Run(ctx)
Expect(err).Should(Succeed())
})
Context("with dependency injection", func() {
It("should use custom client when provided", func() {
featuregatetesting.SetFeatureGateDuringTest(GinkgoT(), utilfeature.DefaultFeatureGate, features.ZstdApplicationRevision, true)
ctx := context.Background()
// Create a fake client that simulates a CRD with compression support
fakeClient := fake.NewClientBuilder().WithScheme(singleton.KubeClient.Get().Scheme()).Build()
// Pre-create the namespace
ns := &corev1.Namespace{}
ns.Name = types.DefaultKubeVelaNS
Expect(fakeClient.Create(ctx, ns)).Should(Succeed())
// Use NewHookWithClient to inject the fake client
hook := crdvalidation.NewHookWithClient(fakeClient)
Expect(hook.Name()).Should(Equal("CRDValidation"))
// Since fake client preserves all fields, validation should pass
err := hook.Run(ctx)
Expect(err).Should(Succeed())
})
})
Context("error scenarios", func() {
It("should handle Create errors gracefully", func() {
featuregatetesting.SetFeatureGateDuringTest(GinkgoT(), utilfeature.DefaultFeatureGate, features.ZstdApplicationRevision, true)
ctx := context.Background()
// Create a client that fails on Create operations
mockClient := &mockFailingClient{
Client: fake.NewClientBuilder().WithScheme(singleton.KubeClient.Get().Scheme()).Build(),
failCreate: true,
}
hook := crdvalidation.NewHookWithClient(mockClient)
err := hook.Run(ctx)
Expect(err).ShouldNot(Succeed())
Expect(err.Error()).Should(ContainSubstring("failed to create test ApplicationRevision"))
})
It("should handle Get errors gracefully", func() {
featuregatetesting.SetFeatureGateDuringTest(GinkgoT(), utilfeature.DefaultFeatureGate, features.GzipApplicationRevision, true)
ctx := context.Background()
// Create a client that fails on Get operations
mockClient := &mockFailingClient{
Client: fake.NewClientBuilder().WithScheme(singleton.KubeClient.Get().Scheme()).Build(),
failGet: true,
}
// Pre-create the namespace
ns := &corev1.Namespace{}
ns.Name = types.DefaultKubeVelaNS
Expect(mockClient.Client.Create(ctx, ns)).Should(Succeed())
hook := crdvalidation.NewHookWithClient(mockClient)
err := hook.Run(ctx)
Expect(err).ShouldNot(Succeed())
Expect(err.Error()).Should(ContainSubstring("failed to read test ApplicationRevision"))
})
It("should handle namespace creation errors", func() {
featuregatetesting.SetFeatureGateDuringTest(GinkgoT(), utilfeature.DefaultFeatureGate, features.ZstdApplicationRevision, true)
ctx := context.Background()
// Create a client that fails namespace operations
mockClient := &mockFailingClient{
Client: fake.NewClientBuilder().WithScheme(singleton.KubeClient.Get().Scheme()).Build(),
failNamespaceOps: true,
}
hook := crdvalidation.NewHookWithClient(mockClient)
err := hook.Run(ctx)
Expect(err).ShouldNot(Succeed())
Expect(err.Error()).Should(ContainSubstring("runtime namespace"))
Expect(err.Error()).Should(ContainSubstring("does not exist or is not accessible"))
})
})
Context("cleanup verification", func() {
It("should clean up test resources after validation", func() {
featuregatetesting.SetFeatureGateDuringTest(GinkgoT(), utilfeature.DefaultFeatureGate, features.ZstdApplicationRevision, true)
ctx := context.Background()
fakeClient := fake.NewClientBuilder().WithScheme(singleton.KubeClient.Get().Scheme()).Build()
// Pre-create the namespace
ns := &corev1.Namespace{}
ns.Name = types.DefaultKubeVelaNS
Expect(fakeClient.Create(ctx, ns)).Should(Succeed())
hook := crdvalidation.NewHookWithClient(fakeClient)
_ = hook.Run(ctx)
// Verify that test ApplicationRevisions are cleaned up
appRevList := &v1beta1.ApplicationRevisionList{}
err := fakeClient.List(ctx, appRevList,
client.InNamespace(types.DefaultKubeVelaNS),
client.MatchingLabels{oam.LabelPreCheck: types.VelaCoreName})
Expect(err).Should(Succeed())
Expect(appRevList.Items).Should(HaveLen(0))
})
It("should clean up multiple test resources with same label", func() {
featuregatetesting.SetFeatureGateDuringTest(GinkgoT(), utilfeature.DefaultFeatureGate, features.GzipApplicationRevision, true)
ctx := context.Background()
fakeClient := fake.NewClientBuilder().WithScheme(singleton.KubeClient.Get().Scheme()).Build()
// Pre-create the namespace
ns := &corev1.Namespace{}
ns.Name = types.DefaultKubeVelaNS
Expect(fakeClient.Create(ctx, ns)).Should(Succeed())
// Pre-create multiple test ApplicationRevisions with the precheck label
for i := 0; i < 3; i++ {
appRev := &v1beta1.ApplicationRevision{}
appRev.Name = "old-test-" + strconv.Itoa(i)
appRev.Namespace = types.DefaultKubeVelaNS
appRev.SetLabels(map[string]string{oam.LabelPreCheck: types.VelaCoreName})
appRev.Spec.Compression.Type = compression.Gzip
Expect(fakeClient.Create(ctx, appRev)).Should(Succeed())
}
// Verify pre-existing resources
appRevList := &v1beta1.ApplicationRevisionList{}
err := fakeClient.List(ctx, appRevList,
client.InNamespace(types.DefaultKubeVelaNS),
client.MatchingLabels{oam.LabelPreCheck: types.VelaCoreName})
Expect(err).Should(Succeed())
Expect(appRevList.Items).Should(HaveLen(3))
// Run the hook
hook := crdvalidation.NewHookWithClient(fakeClient)
_ = hook.Run(ctx)
// Verify all test resources are cleaned up
err = fakeClient.List(ctx, appRevList,
client.InNamespace(types.DefaultKubeVelaNS),
client.MatchingLabels{oam.LabelPreCheck: types.VelaCoreName})
Expect(err).Should(Succeed())
Expect(appRevList.Items).Should(HaveLen(0))
})
})
})
// mockFailingClient is a test client that can simulate various failure scenarios
type mockFailingClient struct {
client.Client
failCreate bool
failGet bool
failNamespaceOps bool
}
func (m *mockFailingClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error {
if m.failCreate {
if _, ok := obj.(*v1beta1.ApplicationRevision); ok {
return errors.New("simulated create failure")
}
}
if m.failNamespaceOps {
if _, ok := obj.(*corev1.Namespace); ok {
return errors.New("simulated namespace creation failure")
}
}
return m.Client.Create(ctx, obj, opts...)
}
func (m *mockFailingClient) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error {
if m.failGet {
if _, ok := obj.(*v1beta1.ApplicationRevision); ok {
return errors.New("simulated get failure")
}
}
if m.failNamespaceOps {
if _, ok := obj.(*corev1.Namespace); ok {
return apierrors.NewNotFound(corev1.SchemeGroupVersion.WithResource("namespaces").GroupResource(), key.Name)
}
}
return m.Client.Get(ctx, key, obj, opts...)
}

View File

@@ -0,0 +1,31 @@
/*
Copyright 2022 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package hooks
import "context"
// PreStartHook defines a hook that should be run before the controller starts working.
// Pre-start hooks are used for validation, initialization, and safety checks that must
// pass before the controller begins processing resources.
type PreStartHook interface {
// Run executes the hook's logic. If an error is returned, the controller
// startup will be aborted.
Run(ctx context.Context) error
// Name returns a human-readable name for the hook, used in logging.
Name() string
}

View File

@@ -1,87 +0,0 @@
/*
Copyright 2022 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package hooks
import (
"context"
"fmt"
"time"
"github.com/kubevela/pkg/util/compression"
"github.com/kubevela/pkg/util/k8s"
"github.com/kubevela/pkg/util/singleton"
"k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/oam-dev/kubevela/apis/core.oam.dev/common"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
"github.com/oam-dev/kubevela/apis/types"
"github.com/oam-dev/kubevela/pkg/features"
"github.com/oam-dev/kubevela/pkg/oam"
)
// PreStartHook hook that should be run before controller start working
type PreStartHook interface {
Run(ctx context.Context) error
}
// SystemCRDValidationHook checks if the crd in the system are valid to run the current controller
type SystemCRDValidationHook struct {
client.Client
}
// NewSystemCRDValidationHook .
func NewSystemCRDValidationHook() PreStartHook {
return &SystemCRDValidationHook{Client: singleton.KubeClient.Get()}
}
// Run .
func (in *SystemCRDValidationHook) Run(ctx context.Context) error {
if feature.DefaultMutableFeatureGate.Enabled(features.ZstdApplicationRevision) ||
feature.DefaultMutableFeatureGate.Enabled(features.GzipApplicationRevision) {
appRev := &v1beta1.ApplicationRevision{}
appRev.Name = fmt.Sprintf("core.pre-check.%d", time.Now().UnixNano())
appRev.Namespace = k8s.GetRuntimeNamespace()
key := client.ObjectKeyFromObject(appRev)
appRev.SetLabels(map[string]string{oam.LabelPreCheck: types.VelaCoreName})
appRev.Spec.Application.Name = appRev.Name
appRev.Spec.Application.Spec.Components = []common.ApplicationComponent{}
if feature.DefaultMutableFeatureGate.Enabled(features.ZstdApplicationRevision) {
appRev.Spec.Compression.SetType(compression.Zstd)
} else if feature.DefaultMutableFeatureGate.Enabled(features.GzipApplicationRevision) {
appRev.Spec.Compression.SetType(compression.Gzip)
}
if err := in.Client.Create(ctx, appRev); err != nil {
return err
}
defer func() {
if err := in.Client.DeleteAllOf(ctx, &v1beta1.ApplicationRevision{},
client.InNamespace(types.DefaultKubeVelaNS),
client.MatchingLabels{oam.LabelPreCheck: types.VelaCoreName}); err != nil {
klog.Errorf("failed to recycle pre-check ApplicationRevision: %v", err)
}
}()
if err := in.Client.Get(ctx, key, appRev); err != nil {
return err
}
if appRev.Spec.Application.Name != appRev.Name {
return fmt.Errorf("the ApplicationRevision CRD is not updated. Compression cannot be used. Please upgrade your CRD to latest ones")
}
}
return nil
}

View File

@@ -1,52 +0,0 @@
/*
Copyright 2022 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package hooks_test
import (
"context"
"testing"
"github.com/kubevela/pkg/util/k8s"
"github.com/kubevela/pkg/util/singleton"
"github.com/kubevela/pkg/util/test/bootstrap"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"github.com/oam-dev/kubevela/apis/types"
"github.com/oam-dev/kubevela/cmd/core/app/hooks"
"github.com/oam-dev/kubevela/pkg/features"
)
var _ = bootstrap.InitKubeBuilderForTest(bootstrap.WithCRDPath("./testdata"))
func TestPreStartHook(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Run pre-start hook test")
}
var _ = Describe("Test pre-start hooks", func() {
It("Test SystemCRDValidationHook", func() {
featuregatetesting.SetFeatureGateDuringTest(GinkgoT(), utilfeature.DefaultFeatureGate, features.ZstdApplicationRevision, true)
ctx := context.Background()
Expect(k8s.EnsureNamespace(ctx, singleton.KubeClient.Get(), types.DefaultKubeVelaNS)).Should(Succeed())
err := hooks.NewSystemCRDValidationHook().Run(ctx)
Expect(err).ShouldNot(Succeed())
Expect(err.Error()).Should(ContainSubstring("the ApplicationRevision CRD is not updated"))
})
})

View File

@@ -49,6 +49,7 @@ import (
"github.com/oam-dev/kubevela/apis/types"
"github.com/oam-dev/kubevela/cmd/core/app/config"
"github.com/oam-dev/kubevela/cmd/core/app/hooks"
"github.com/oam-dev/kubevela/cmd/core/app/hooks/crdvalidation"
"github.com/oam-dev/kubevela/cmd/core/app/options"
"github.com/oam-dev/kubevela/pkg/auth"
"github.com/oam-dev/kubevela/pkg/cache"
@@ -56,6 +57,7 @@ import (
oamv1beta1 "github.com/oam-dev/kubevela/pkg/controller/core.oam.dev/v1beta1"
"github.com/oam-dev/kubevela/pkg/controller/core.oam.dev/v1beta1/application"
"github.com/oam-dev/kubevela/pkg/features"
"github.com/oam-dev/kubevela/pkg/logging"
"github.com/oam-dev/kubevela/pkg/monitor/watcher"
"github.com/oam-dev/kubevela/pkg/multicluster"
"github.com/oam-dev/kubevela/pkg/oam"
@@ -241,7 +243,7 @@ func setupLogging(observabilityConfig *config.ObservabilityConfig) {
// Set logger (use --dev-logs=true for local development)
if observabilityConfig.DevLogs {
logOutput := newColorWriter(os.Stdout)
logOutput := logging.NewColorWriter(os.Stdout)
klog.LogToStderr(false)
klog.SetOutput(logOutput)
ctrl.SetLogger(textlogger.NewLogger(textlogger.NewConfig(textlogger.Output(logOutput))))
@@ -478,14 +480,16 @@ func prepareRun(ctx context.Context, manager manager.Manager, coreOptions *optio
}
klog.InfoS("Starting vela controller manager with pre-start validation")
for _, hook := range []hooks.PreStartHook{hooks.NewSystemCRDValidationHook()} {
klog.V(2).InfoS("Running pre-start hook", "hook", fmt.Sprintf("%T", hook))
for _, hook := range []hooks.PreStartHook{crdvalidation.NewHook()} {
hookName := hook.Name()
klog.InfoS("Running pre-start hook", "hook", hookName)
if err := hook.Run(ctx); err != nil {
klog.ErrorS(err, "Failed to run pre-start hook", "hook", fmt.Sprintf("%T", hook))
return fmt.Errorf("failed to run hook %T: %w", hook, err)
klog.ErrorS(err, "Failed to run pre-start hook", "hook", hookName)
return fmt.Errorf("failed to run hook %s: %w", hookName, err)
}
klog.InfoS("Pre-start hook completed successfully", "hook", hookName)
}
klog.InfoS("Pre-start validation completed successfully")
klog.InfoS("All pre-start validation hooks completed successfully")
return nil
}

View File

@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package app
package logging
import (
"bytes"
@@ -60,7 +60,12 @@ type colorWriter struct {
buf bytes.Buffer
}
func newColorWriter(dst io.Writer) io.Writer {
// NewColorWriter creates a new writer that adds ANSI color codes to klog output.
// It wraps the provided writer and colorizes log messages based on severity level,
// enhances location information with function names, and formats structured fields.
// This is primarily intended for development mode (--dev-logs=true) to improve
// log readability in terminal output.
func NewColorWriter(dst io.Writer) io.Writer {
return &colorWriter{dst: dst}
}

View File

@@ -0,0 +1,302 @@
/*
Copyright 2022 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package logging_test
import (
"bytes"
"fmt"
"strings"
"sync"
"testing"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/oam-dev/kubevela/pkg/logging"
)
func TestColorWriter(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Color Writer Suite")
}
var _ = Describe("Color Writer", func() {
Context("formatting", func() {
It("should format INFO messages with colors", func() {
var buf bytes.Buffer
writer := logging.NewColorWriter(&buf)
input := "I1117 12:34:56.789012 1234 server.go:123] Starting server"
_, err := writer.Write([]byte(input + "\n"))
Expect(err).Should(Succeed())
output := buf.String()
Expect(output).Should(ContainSubstring("\x1b[32m")) // Info color
Expect(output).Should(ContainSubstring("INFO"))
Expect(output).Should(ContainSubstring("Starting server"))
Expect(output).Should(ContainSubstring("\x1b[0m")) // Reset color
})
It("should format ERROR messages with colors", func() {
var buf bytes.Buffer
writer := logging.NewColorWriter(&buf)
input := "E1117 12:34:56.789012 1234 server.go:456] Failed to start server"
_, err := writer.Write([]byte(input + "\n"))
Expect(err).Should(Succeed())
output := buf.String()
Expect(output).Should(ContainSubstring("\x1b[31m")) // Error color
Expect(output).Should(ContainSubstring("ERROR"))
Expect(output).Should(ContainSubstring("Failed to start server"))
})
It("should format WARNING messages with colors", func() {
var buf bytes.Buffer
writer := logging.NewColorWriter(&buf)
input := "W1117 12:34:56.789012 1234 server.go:789] Configuration deprecated"
_, err := writer.Write([]byte(input + "\n"))
Expect(err).Should(Succeed())
output := buf.String()
Expect(output).Should(ContainSubstring("\x1b[33m")) // Warning color
Expect(output).Should(ContainSubstring("WARN"))
Expect(output).Should(ContainSubstring("Configuration deprecated"))
})
It("should handle structured logging with key-value pairs", func() {
var buf bytes.Buffer
writer := logging.NewColorWriter(&buf)
input := "I1117 12:34:56.789012 1234 hook.go:123] Running hook name=CRDValidation status=success"
_, err := writer.Write([]byte(input + "\n"))
Expect(err).Should(Succeed())
output := buf.String()
Expect(output).Should(ContainSubstring("Running hook"))
Expect(output).Should(ContainSubstring("\x1b[96m")) // Key color
Expect(output).Should(ContainSubstring("name"))
Expect(output).Should(ContainSubstring("\x1b[37m")) // Value color
Expect(output).Should(ContainSubstring("CRDValidation"))
})
It("should handle quoted values in key-value pairs", func() {
var buf bytes.Buffer
writer := logging.NewColorWriter(&buf)
input := `I1117 12:34:56.789012 1234 server.go:123] Processing request path="/api/v1" method="GET"`
_, err := writer.Write([]byte(input + "\n"))
Expect(err).Should(Succeed())
output := buf.String()
Expect(output).Should(ContainSubstring("Processing request"))
Expect(output).Should(ContainSubstring("path"))
Expect(output).Should(ContainSubstring(`"/api/v1"`))
Expect(output).Should(ContainSubstring("method"))
Expect(output).Should(ContainSubstring(`"GET"`))
})
})
Context("concurrency safety", func() {
It("should handle concurrent writes safely", func() {
var buf bytes.Buffer
writer := logging.NewColorWriter(&buf)
const numGoroutines = 100
const numWrites = 50
var wg sync.WaitGroup
wg.Add(numGoroutines)
// Use a channel to detect race conditions
errors := make(chan error, numGoroutines*numWrites)
for i := 0; i < numGoroutines; i++ {
go func(id int) {
defer wg.Done()
for j := 0; j < numWrites; j++ {
msg := fmt.Sprintf("I1117 12:34:56.789012 %04d test.go:123] Goroutine %d write %d\n",
id, id, j)
_, err := writer.Write([]byte(msg))
if err != nil {
errors <- err
}
}
}(i)
}
wg.Wait()
close(errors)
// Check for any errors
for err := range errors {
Expect(err).Should(BeNil())
}
// Verify output contains expected number of lines
output := buf.String()
lines := bytes.Count([]byte(output), []byte("\n"))
Expect(lines).Should(Equal(numGoroutines * numWrites))
})
It("should maintain data integrity under concurrent access", func() {
var buf bytes.Buffer
writer := logging.NewColorWriter(&buf)
const numGoroutines = 50
messages := make(map[string]bool)
var mu sync.Mutex
var wg sync.WaitGroup
wg.Add(numGoroutines)
for i := 0; i < numGoroutines; i++ {
go func(id int) {
defer wg.Done()
// Create unique messages
for j := 0; j < 10; j++ {
msg := fmt.Sprintf("I1117 12:34:56.789012 %04d test.go:%03d] Unique message %d-%d",
id, j*10, id, j)
mu.Lock()
messages[msg] = true
mu.Unlock()
_, err := writer.Write([]byte(msg + "\n"))
Expect(err).Should(BeNil())
}
}(i)
}
wg.Wait()
// Verify all unique messages appear in output
output := buf.String()
for msg := range messages {
// Extract the unique part of each message (e.g., "Unique message 1-2")
// The message format is: "I1117 12:34:56.789012 XXXX test.go:YYY] Unique message X-Y"
// We look for the complete "Unique message X-Y" part
startIdx := strings.Index(msg, "Unique message")
if startIdx >= 0 {
uniquePart := msg[startIdx:] // Get "Unique message X-Y"
Expect(output).Should(ContainSubstring(uniquePart),
"Missing message: %s", uniquePart)
}
}
})
It("should handle partial writes correctly", func() {
var buf bytes.Buffer
writer := logging.NewColorWriter(&buf)
var wg sync.WaitGroup
wg.Add(2)
// Goroutine 1: Write a message in parts
go func() {
defer wg.Done()
part1 := []byte("I1117 12:34:56.789012 1234 test.go:123] ")
part2 := []byte("First ")
part3 := []byte("message\n")
writer.Write(part1)
writer.Write(part2)
writer.Write(part3)
}()
// Goroutine 2: Write a complete message
go func() {
defer wg.Done()
msg := []byte("I1117 12:34:56.789012 5678 test.go:456] Second message\n")
writer.Write(msg)
}()
wg.Wait()
// Both messages should be in the output
output := buf.String()
Expect(output).Should(ContainSubstring("First message"))
Expect(output).Should(ContainSubstring("Second message"))
})
})
Context("edge cases", func() {
It("should handle empty input", func() {
var buf bytes.Buffer
writer := logging.NewColorWriter(&buf)
n, err := writer.Write([]byte(""))
Expect(err).Should(BeNil())
Expect(n).Should(Equal(0))
Expect(buf.String()).Should(Equal(""))
})
It("should handle input without newlines", func() {
var buf bytes.Buffer
writer := logging.NewColorWriter(&buf)
input := "I1117 12:34:56.789012 1234 test.go:123] Message without newline"
n, err := writer.Write([]byte(input))
Expect(err).Should(BeNil())
Expect(n).Should(Equal(len(input)))
// The message should be buffered, not yet written to output
Expect(buf.String()).Should(Equal(""))
// Write a newline to flush
writer.Write([]byte("\n"))
Expect(buf.String()).Should(ContainSubstring("Message without newline"))
})
It("should handle malformed log lines gracefully", func() {
var buf bytes.Buffer
writer := logging.NewColorWriter(&buf)
// Various malformed inputs
inputs := []string{
"Not a klog line\n",
"I\n",
"I1117\n",
"Random text with no structure\n",
"\n",
}
for _, input := range inputs {
_, err := writer.Write([]byte(input))
Expect(err).Should(BeNil())
}
// Should have written something for each input
output := buf.String()
lines := bytes.Count([]byte(output), []byte("\n"))
Expect(lines).Should(Equal(len(inputs)))
})
It("should handle very long lines", func() {
var buf bytes.Buffer
writer := logging.NewColorWriter(&buf)
// Create a very long message
longMsg := "I1117 12:34:56.789012 1234 test.go:123] " + string(make([]byte, 10000))
_, err := writer.Write([]byte(longMsg + "\n"))
Expect(err).Should(BeNil())
output := buf.String()
Expect(len(output)).Should(BeNumerically(">", 10000))
})
})
})