diff --git a/cmd/core/app/hooks/crdvalidation/crd_validation_hook.go b/cmd/core/app/hooks/crdvalidation/crd_validation_hook.go new file mode 100644 index 000000000..827e4dc82 --- /dev/null +++ b/cmd/core/app/hooks/crdvalidation/crd_validation_hook.go @@ -0,0 +1,229 @@ +/* +Copyright 2022 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package crdvalidation + +import ( + "context" + "fmt" + "time" + + "github.com/kubevela/pkg/util/compression" + "github.com/kubevela/pkg/util/k8s" + "github.com/kubevela/pkg/util/singleton" + "k8s.io/apiserver/pkg/util/feature" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/oam-dev/kubevela/apis/core.oam.dev/common" + "github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1" + "github.com/oam-dev/kubevela/apis/types" + "github.com/oam-dev/kubevela/cmd/core/app/hooks" + "github.com/oam-dev/kubevela/pkg/features" + "github.com/oam-dev/kubevela/pkg/oam" +) + +// Hook validates that CRDs installed in the cluster are compatible with +// enabled feature gates. This prevents silent data corruption by failing +// fast at startup if CRDs are out of date. +type Hook struct { + client.Client +} + +// NewHook creates a new CRD validation hook with the default singleton client +func NewHook() hooks.PreStartHook { + klog.V(3).InfoS("Initializing CRD validation hook", "client", "singleton") + return NewHookWithClient(singleton.KubeClient.Get()) +} + +// NewHookWithClient creates a new CRD validation hook with a specified client +// for improved testability and dependency injection +func NewHookWithClient(c client.Client) hooks.PreStartHook { + klog.V(3).InfoS("Initializing CRD validation hook with custom client") + return &Hook{Client: c} +} + +// Name returns the hook name for logging +func (h *Hook) Name() string { + return "CRDValidation" +} + +// Run executes the CRD validation logic. It checks if compression-related +// feature gates are enabled and validates that the ApplicationRevision CRD +// supports the required compression fields. +func (h *Hook) Run(ctx context.Context) error { + klog.InfoS("Starting CRD validation hook") + + // Add a reasonable timeout to prevent indefinite hanging while allowing + // sufficient time for slower clusters or API servers under load. + // 2 minutes should be more than enough for any reasonable cluster setup + // while still protecting against indefinite hangs. + timeout := 2 * time.Minute + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + zstdEnabled := feature.DefaultMutableFeatureGate.Enabled(features.ZstdApplicationRevision) + gzipEnabled := feature.DefaultMutableFeatureGate.Enabled(features.GzipApplicationRevision) + + klog.V(2).InfoS("Checking compression feature gates", + "zstdEnabled", zstdEnabled, + "gzipEnabled", gzipEnabled) + + if !zstdEnabled && !gzipEnabled { + klog.InfoS("No compression features enabled, skipping CRD validation") + return nil + } + + klog.InfoS("Compression features enabled, validating ApplicationRevision CRD compatibility") + + if err := h.validateApplicationRevisionCRD(ctx, zstdEnabled, gzipEnabled); err != nil { + // Check if the error was due to context timeout + if ctx.Err() == context.DeadlineExceeded { + klog.ErrorS(err, "CRD validation timed out - API server may be slow or unresponsive", + "timeout", timeout.String(), + "suggestion", "Check API server health and network connectivity") + return fmt.Errorf("CRD validation timed out after %v: %w. API server may be slow or under heavy load", timeout, err) + } + klog.ErrorS(err, "CRD validation failed") + return fmt.Errorf("CRD validation failed: %w", err) + } + + klog.InfoS("CRD validation completed successfully") + return nil +} + +// validateApplicationRevisionCRD performs a round-trip test to ensure the +// ApplicationRevision CRD supports compression fields +func (h *Hook) validateApplicationRevisionCRD(ctx context.Context, zstdEnabled, gzipEnabled bool) error { + // Generate test resource + testName := fmt.Sprintf("core.pre-check.%d", time.Now().UnixNano()) + namespace := k8s.GetRuntimeNamespace() + + klog.V(2).InfoS("Creating test ApplicationRevision for CRD validation", + "name", testName, + "namespace", namespace) + + // Ensure the namespace exists before attempting to create resources + if err := k8s.EnsureNamespace(ctx, h.Client, namespace); err != nil { + klog.ErrorS(err, "Failed to ensure runtime namespace exists", + "namespace", namespace) + return fmt.Errorf("runtime namespace %q does not exist or is not accessible: %w", namespace, err) + } + + appRev := &v1beta1.ApplicationRevision{} + appRev.Name = testName + appRev.Namespace = namespace + appRev.SetLabels(map[string]string{oam.LabelPreCheck: types.VelaCoreName}) + appRev.Spec.Application.Name = testName + appRev.Spec.Application.Spec.Components = []common.ApplicationComponent{} + + // Set compression type based on enabled features + var compressionType compression.Type + if zstdEnabled { + compressionType = compression.Zstd + appRev.Spec.Compression.SetType(compression.Zstd) + klog.V(3).InfoS("Setting compression type", "type", "zstd") + } else if gzipEnabled { + compressionType = compression.Gzip + appRev.Spec.Compression.SetType(compression.Gzip) + klog.V(3).InfoS("Setting compression type", "type", "gzip") + } + + // Register cleanup function + defer func() { + klog.V(2).InfoS("Cleaning up test ApplicationRevisions", + "namespace", namespace, + "label", oam.LabelPreCheck) + + if err := h.Client.DeleteAllOf(ctx, &v1beta1.ApplicationRevision{}, + client.InNamespace(namespace), + client.MatchingLabels{oam.LabelPreCheck: types.VelaCoreName}); err != nil { + klog.ErrorS(err, "Failed to clean up test ApplicationRevision resources", + "namespace", namespace) + } else { + klog.V(3).InfoS("Successfully cleaned up test ApplicationRevision resources") + } + }() + + // Create test resource + klog.V(2).InfoS("Writing test ApplicationRevision to cluster") + if err := h.Client.Create(ctx, appRev); err != nil { + klog.ErrorS(err, "Failed to create test ApplicationRevision", + "name", testName, + "namespace", namespace) + return fmt.Errorf("failed to create test ApplicationRevision: %w", err) + } + klog.V(3).InfoS("Test ApplicationRevision created successfully") + + // Read back the resource + key := client.ObjectKeyFromObject(appRev) + klog.V(2).InfoS("Reading back test ApplicationRevision from cluster", + "key", key.String()) + + if err := h.Client.Get(ctx, key, appRev); err != nil { + klog.ErrorS(err, "Failed to read back test ApplicationRevision", + "key", key.String()) + return fmt.Errorf("failed to read test ApplicationRevision: %w", err) + } + klog.V(3).InfoS("Test ApplicationRevision read successfully") + + // Validate round-trip integrity + klog.V(2).InfoS("Validating round-trip data integrity", + "expectedName", testName, + "actualName", appRev.Spec.Application.Name, + "expectedCompression", compressionType, + "actualCompression", appRev.Spec.Compression.Type) + + // First check that basic data survived + if appRev.Spec.Application.Name != testName { + klog.ErrorS(nil, "CRD round-trip validation failed - basic data corruption detected", + "expectedName", testName, + "actualName", appRev.Spec.Application.Name, + "compressionType", compressionType, + "issue", "The ApplicationRevision CRD does not support compression fields") + return fmt.Errorf("the ApplicationRevision CRD is not updated. Compression cannot be used. Please upgrade your CRD to latest ones") + } + + // Validate that compression fields survived the round-trip + switch compressionType { + case compression.Zstd: + if appRev.Spec.Compression.Type != compression.Zstd { + klog.ErrorS(nil, "CRD round-trip validation failed - zstd compression type lost", + "expected", compression.Zstd, + "actual", appRev.Spec.Compression.Type, + "issue", "The ApplicationRevision CRD does not support zstd compression fields") + return fmt.Errorf("ApplicationRevision CRD missing zstd compression support after round-trip; got=%v. Please upgrade your CRD to latest ones", appRev.Spec.Compression.Type) + } + case compression.Gzip: + if appRev.Spec.Compression.Type != compression.Gzip { + klog.ErrorS(nil, "CRD round-trip validation failed - gzip compression type lost", + "expected", compression.Gzip, + "actual", appRev.Spec.Compression.Type, + "issue", "The ApplicationRevision CRD does not support gzip compression fields") + return fmt.Errorf("ApplicationRevision CRD missing gzip compression support after round-trip; got=%v. Please upgrade your CRD to latest ones", appRev.Spec.Compression.Type) + } + case compression.Uncompressed: + // This case should never happen as we only set Zstd or Gzip above, + // but we need to handle it to satisfy the exhaustive linter + klog.V(3).InfoS("Compression type is uncompressed, which is unexpected in validation", + "compressionType", compressionType) + } + + klog.V(2).InfoS("Round-trip validation passed - CRD supports compression", + "compressionType", compressionType) + + return nil +} diff --git a/cmd/core/app/hooks/crdvalidation/crd_validation_hook_test.go b/cmd/core/app/hooks/crdvalidation/crd_validation_hook_test.go new file mode 100644 index 000000000..05dcf751e --- /dev/null +++ b/cmd/core/app/hooks/crdvalidation/crd_validation_hook_test.go @@ -0,0 +1,274 @@ +/* +Copyright 2022 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package crdvalidation_test + +import ( + "context" + "errors" + "testing" + + "github.com/kubevela/pkg/util/compression" + "github.com/kubevela/pkg/util/k8s" + "github.com/kubevela/pkg/util/singleton" + "github.com/kubevela/pkg/util/test/bootstrap" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + "strconv" + + "github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1" + "github.com/oam-dev/kubevela/apis/types" + "github.com/oam-dev/kubevela/cmd/core/app/hooks/crdvalidation" + "github.com/oam-dev/kubevela/pkg/features" + "github.com/oam-dev/kubevela/pkg/oam" +) + +var _ = bootstrap.InitKubeBuilderForTest(bootstrap.WithCRDPath("./testdata")) + +func TestCRDValidationHook(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "CRD Validation Hook Suite") +} + +var _ = Describe("CRD validation hook", func() { + Context("with old CRD that lacks compression support", func() { + It("should detect incompatible CRD when zstd compression is enabled", func() { + featuregatetesting.SetFeatureGateDuringTest(GinkgoT(), utilfeature.DefaultFeatureGate, features.ZstdApplicationRevision, true) + featuregatetesting.SetFeatureGateDuringTest(GinkgoT(), utilfeature.DefaultFeatureGate, features.GzipApplicationRevision, false) + ctx := context.Background() + Expect(k8s.EnsureNamespace(ctx, singleton.KubeClient.Get(), types.DefaultKubeVelaNS)).Should(Succeed()) + + hook := crdvalidation.NewHook() + Expect(hook.Name()).Should(Equal("CRDValidation")) + + err := hook.Run(ctx) + Expect(err).ShouldNot(Succeed()) + // The old CRD doesn't preserve the application data at all, so we get a basic corruption error + Expect(err.Error()).Should(ContainSubstring("the ApplicationRevision CRD is not updated")) + }) + + It("should detect incompatible CRD when gzip compression is enabled", func() { + featuregatetesting.SetFeatureGateDuringTest(GinkgoT(), utilfeature.DefaultFeatureGate, features.ZstdApplicationRevision, false) + featuregatetesting.SetFeatureGateDuringTest(GinkgoT(), utilfeature.DefaultFeatureGate, features.GzipApplicationRevision, true) + ctx := context.Background() + Expect(k8s.EnsureNamespace(ctx, singleton.KubeClient.Get(), types.DefaultKubeVelaNS)).Should(Succeed()) + + hook := crdvalidation.NewHook() + err := hook.Run(ctx) + Expect(err).ShouldNot(Succeed()) + // The old CRD doesn't preserve the application data at all, so we get a basic corruption error + Expect(err.Error()).Should(ContainSubstring("the ApplicationRevision CRD is not updated")) + }) + }) + + It("should skip validation when compression features are disabled", func() { + featuregatetesting.SetFeatureGateDuringTest(GinkgoT(), utilfeature.DefaultFeatureGate, features.ZstdApplicationRevision, false) + featuregatetesting.SetFeatureGateDuringTest(GinkgoT(), utilfeature.DefaultFeatureGate, features.GzipApplicationRevision, false) + ctx := context.Background() + + hook := crdvalidation.NewHook() + err := hook.Run(ctx) + Expect(err).Should(Succeed()) + }) + + Context("with dependency injection", func() { + It("should use custom client when provided", func() { + featuregatetesting.SetFeatureGateDuringTest(GinkgoT(), utilfeature.DefaultFeatureGate, features.ZstdApplicationRevision, true) + ctx := context.Background() + + // Create a fake client that simulates a CRD with compression support + fakeClient := fake.NewClientBuilder().WithScheme(singleton.KubeClient.Get().Scheme()).Build() + + // Pre-create the namespace + ns := &corev1.Namespace{} + ns.Name = types.DefaultKubeVelaNS + Expect(fakeClient.Create(ctx, ns)).Should(Succeed()) + + // Use NewHookWithClient to inject the fake client + hook := crdvalidation.NewHookWithClient(fakeClient) + Expect(hook.Name()).Should(Equal("CRDValidation")) + + // Since fake client preserves all fields, validation should pass + err := hook.Run(ctx) + Expect(err).Should(Succeed()) + }) + }) + + Context("error scenarios", func() { + It("should handle Create errors gracefully", func() { + featuregatetesting.SetFeatureGateDuringTest(GinkgoT(), utilfeature.DefaultFeatureGate, features.ZstdApplicationRevision, true) + ctx := context.Background() + + // Create a client that fails on Create operations + mockClient := &mockFailingClient{ + Client: fake.NewClientBuilder().WithScheme(singleton.KubeClient.Get().Scheme()).Build(), + failCreate: true, + } + + hook := crdvalidation.NewHookWithClient(mockClient) + err := hook.Run(ctx) + Expect(err).ShouldNot(Succeed()) + Expect(err.Error()).Should(ContainSubstring("failed to create test ApplicationRevision")) + }) + + It("should handle Get errors gracefully", func() { + featuregatetesting.SetFeatureGateDuringTest(GinkgoT(), utilfeature.DefaultFeatureGate, features.GzipApplicationRevision, true) + ctx := context.Background() + + // Create a client that fails on Get operations + mockClient := &mockFailingClient{ + Client: fake.NewClientBuilder().WithScheme(singleton.KubeClient.Get().Scheme()).Build(), + failGet: true, + } + + // Pre-create the namespace + ns := &corev1.Namespace{} + ns.Name = types.DefaultKubeVelaNS + Expect(mockClient.Client.Create(ctx, ns)).Should(Succeed()) + + hook := crdvalidation.NewHookWithClient(mockClient) + err := hook.Run(ctx) + Expect(err).ShouldNot(Succeed()) + Expect(err.Error()).Should(ContainSubstring("failed to read test ApplicationRevision")) + }) + + It("should handle namespace creation errors", func() { + featuregatetesting.SetFeatureGateDuringTest(GinkgoT(), utilfeature.DefaultFeatureGate, features.ZstdApplicationRevision, true) + ctx := context.Background() + + // Create a client that fails namespace operations + mockClient := &mockFailingClient{ + Client: fake.NewClientBuilder().WithScheme(singleton.KubeClient.Get().Scheme()).Build(), + failNamespaceOps: true, + } + + hook := crdvalidation.NewHookWithClient(mockClient) + err := hook.Run(ctx) + Expect(err).ShouldNot(Succeed()) + Expect(err.Error()).Should(ContainSubstring("runtime namespace")) + Expect(err.Error()).Should(ContainSubstring("does not exist or is not accessible")) + }) + }) + + Context("cleanup verification", func() { + It("should clean up test resources after validation", func() { + featuregatetesting.SetFeatureGateDuringTest(GinkgoT(), utilfeature.DefaultFeatureGate, features.ZstdApplicationRevision, true) + ctx := context.Background() + + fakeClient := fake.NewClientBuilder().WithScheme(singleton.KubeClient.Get().Scheme()).Build() + + // Pre-create the namespace + ns := &corev1.Namespace{} + ns.Name = types.DefaultKubeVelaNS + Expect(fakeClient.Create(ctx, ns)).Should(Succeed()) + + hook := crdvalidation.NewHookWithClient(fakeClient) + _ = hook.Run(ctx) + + // Verify that test ApplicationRevisions are cleaned up + appRevList := &v1beta1.ApplicationRevisionList{} + err := fakeClient.List(ctx, appRevList, + client.InNamespace(types.DefaultKubeVelaNS), + client.MatchingLabels{oam.LabelPreCheck: types.VelaCoreName}) + Expect(err).Should(Succeed()) + Expect(appRevList.Items).Should(HaveLen(0)) + }) + + It("should clean up multiple test resources with same label", func() { + featuregatetesting.SetFeatureGateDuringTest(GinkgoT(), utilfeature.DefaultFeatureGate, features.GzipApplicationRevision, true) + ctx := context.Background() + + fakeClient := fake.NewClientBuilder().WithScheme(singleton.KubeClient.Get().Scheme()).Build() + + // Pre-create the namespace + ns := &corev1.Namespace{} + ns.Name = types.DefaultKubeVelaNS + Expect(fakeClient.Create(ctx, ns)).Should(Succeed()) + + // Pre-create multiple test ApplicationRevisions with the precheck label + for i := 0; i < 3; i++ { + appRev := &v1beta1.ApplicationRevision{} + appRev.Name = "old-test-" + strconv.Itoa(i) + appRev.Namespace = types.DefaultKubeVelaNS + appRev.SetLabels(map[string]string{oam.LabelPreCheck: types.VelaCoreName}) + appRev.Spec.Compression.Type = compression.Gzip + Expect(fakeClient.Create(ctx, appRev)).Should(Succeed()) + } + + // Verify pre-existing resources + appRevList := &v1beta1.ApplicationRevisionList{} + err := fakeClient.List(ctx, appRevList, + client.InNamespace(types.DefaultKubeVelaNS), + client.MatchingLabels{oam.LabelPreCheck: types.VelaCoreName}) + Expect(err).Should(Succeed()) + Expect(appRevList.Items).Should(HaveLen(3)) + + // Run the hook + hook := crdvalidation.NewHookWithClient(fakeClient) + _ = hook.Run(ctx) + + // Verify all test resources are cleaned up + err = fakeClient.List(ctx, appRevList, + client.InNamespace(types.DefaultKubeVelaNS), + client.MatchingLabels{oam.LabelPreCheck: types.VelaCoreName}) + Expect(err).Should(Succeed()) + Expect(appRevList.Items).Should(HaveLen(0)) + }) + }) +}) + +// mockFailingClient is a test client that can simulate various failure scenarios +type mockFailingClient struct { + client.Client + failCreate bool + failGet bool + failNamespaceOps bool +} + +func (m *mockFailingClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { + if m.failCreate { + if _, ok := obj.(*v1beta1.ApplicationRevision); ok { + return errors.New("simulated create failure") + } + } + if m.failNamespaceOps { + if _, ok := obj.(*corev1.Namespace); ok { + return errors.New("simulated namespace creation failure") + } + } + return m.Client.Create(ctx, obj, opts...) +} + +func (m *mockFailingClient) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { + if m.failGet { + if _, ok := obj.(*v1beta1.ApplicationRevision); ok { + return errors.New("simulated get failure") + } + } + if m.failNamespaceOps { + if _, ok := obj.(*corev1.Namespace); ok { + return apierrors.NewNotFound(corev1.SchemeGroupVersion.WithResource("namespaces").GroupResource(), key.Name) + } + } + return m.Client.Get(ctx, key, obj, opts...) +} diff --git a/cmd/core/app/hooks/testdata/old_apprev_crd.yaml b/cmd/core/app/hooks/crdvalidation/testdata/old_apprev_crd.yaml similarity index 100% rename from cmd/core/app/hooks/testdata/old_apprev_crd.yaml rename to cmd/core/app/hooks/crdvalidation/testdata/old_apprev_crd.yaml diff --git a/cmd/core/app/hooks/hook.go b/cmd/core/app/hooks/hook.go new file mode 100644 index 000000000..c408170a0 --- /dev/null +++ b/cmd/core/app/hooks/hook.go @@ -0,0 +1,31 @@ +/* +Copyright 2022 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package hooks + +import "context" + +// PreStartHook defines a hook that should be run before the controller starts working. +// Pre-start hooks are used for validation, initialization, and safety checks that must +// pass before the controller begins processing resources. +type PreStartHook interface { + // Run executes the hook's logic. If an error is returned, the controller + // startup will be aborted. + Run(ctx context.Context) error + + // Name returns a human-readable name for the hook, used in logging. + Name() string +} diff --git a/cmd/core/app/hooks/pre_start_hook.go b/cmd/core/app/hooks/pre_start_hook.go deleted file mode 100644 index a8889a52a..000000000 --- a/cmd/core/app/hooks/pre_start_hook.go +++ /dev/null @@ -1,87 +0,0 @@ -/* -Copyright 2022 The KubeVela Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package hooks - -import ( - "context" - "fmt" - "time" - - "github.com/kubevela/pkg/util/compression" - "github.com/kubevela/pkg/util/k8s" - "github.com/kubevela/pkg/util/singleton" - "k8s.io/apiserver/pkg/util/feature" - "k8s.io/klog/v2" - "sigs.k8s.io/controller-runtime/pkg/client" - - "github.com/oam-dev/kubevela/apis/core.oam.dev/common" - "github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1" - "github.com/oam-dev/kubevela/apis/types" - "github.com/oam-dev/kubevela/pkg/features" - "github.com/oam-dev/kubevela/pkg/oam" -) - -// PreStartHook hook that should be run before controller start working -type PreStartHook interface { - Run(ctx context.Context) error -} - -// SystemCRDValidationHook checks if the crd in the system are valid to run the current controller -type SystemCRDValidationHook struct { - client.Client -} - -// NewSystemCRDValidationHook . -func NewSystemCRDValidationHook() PreStartHook { - return &SystemCRDValidationHook{Client: singleton.KubeClient.Get()} -} - -// Run . -func (in *SystemCRDValidationHook) Run(ctx context.Context) error { - if feature.DefaultMutableFeatureGate.Enabled(features.ZstdApplicationRevision) || - feature.DefaultMutableFeatureGate.Enabled(features.GzipApplicationRevision) { - appRev := &v1beta1.ApplicationRevision{} - appRev.Name = fmt.Sprintf("core.pre-check.%d", time.Now().UnixNano()) - appRev.Namespace = k8s.GetRuntimeNamespace() - key := client.ObjectKeyFromObject(appRev) - appRev.SetLabels(map[string]string{oam.LabelPreCheck: types.VelaCoreName}) - appRev.Spec.Application.Name = appRev.Name - appRev.Spec.Application.Spec.Components = []common.ApplicationComponent{} - if feature.DefaultMutableFeatureGate.Enabled(features.ZstdApplicationRevision) { - appRev.Spec.Compression.SetType(compression.Zstd) - } else if feature.DefaultMutableFeatureGate.Enabled(features.GzipApplicationRevision) { - appRev.Spec.Compression.SetType(compression.Gzip) - } - if err := in.Client.Create(ctx, appRev); err != nil { - return err - } - defer func() { - if err := in.Client.DeleteAllOf(ctx, &v1beta1.ApplicationRevision{}, - client.InNamespace(types.DefaultKubeVelaNS), - client.MatchingLabels{oam.LabelPreCheck: types.VelaCoreName}); err != nil { - klog.Errorf("failed to recycle pre-check ApplicationRevision: %v", err) - } - }() - if err := in.Client.Get(ctx, key, appRev); err != nil { - return err - } - if appRev.Spec.Application.Name != appRev.Name { - return fmt.Errorf("the ApplicationRevision CRD is not updated. Compression cannot be used. Please upgrade your CRD to latest ones") - } - } - return nil -} diff --git a/cmd/core/app/hooks/pre_start_hook_test.go b/cmd/core/app/hooks/pre_start_hook_test.go deleted file mode 100644 index f19381538..000000000 --- a/cmd/core/app/hooks/pre_start_hook_test.go +++ /dev/null @@ -1,52 +0,0 @@ -/* -Copyright 2022 The KubeVela Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package hooks_test - -import ( - "context" - "testing" - - "github.com/kubevela/pkg/util/k8s" - "github.com/kubevela/pkg/util/singleton" - "github.com/kubevela/pkg/util/test/bootstrap" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - utilfeature "k8s.io/apiserver/pkg/util/feature" - featuregatetesting "k8s.io/component-base/featuregate/testing" - - "github.com/oam-dev/kubevela/apis/types" - "github.com/oam-dev/kubevela/cmd/core/app/hooks" - "github.com/oam-dev/kubevela/pkg/features" -) - -var _ = bootstrap.InitKubeBuilderForTest(bootstrap.WithCRDPath("./testdata")) - -func TestPreStartHook(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "Run pre-start hook test") -} - -var _ = Describe("Test pre-start hooks", func() { - It("Test SystemCRDValidationHook", func() { - featuregatetesting.SetFeatureGateDuringTest(GinkgoT(), utilfeature.DefaultFeatureGate, features.ZstdApplicationRevision, true) - ctx := context.Background() - Expect(k8s.EnsureNamespace(ctx, singleton.KubeClient.Get(), types.DefaultKubeVelaNS)).Should(Succeed()) - err := hooks.NewSystemCRDValidationHook().Run(ctx) - Expect(err).ShouldNot(Succeed()) - Expect(err.Error()).Should(ContainSubstring("the ApplicationRevision CRD is not updated")) - }) -}) diff --git a/cmd/core/app/server.go b/cmd/core/app/server.go index e6fc5898e..8573a62fb 100644 --- a/cmd/core/app/server.go +++ b/cmd/core/app/server.go @@ -49,6 +49,7 @@ import ( "github.com/oam-dev/kubevela/apis/types" "github.com/oam-dev/kubevela/cmd/core/app/config" "github.com/oam-dev/kubevela/cmd/core/app/hooks" + "github.com/oam-dev/kubevela/cmd/core/app/hooks/crdvalidation" "github.com/oam-dev/kubevela/cmd/core/app/options" "github.com/oam-dev/kubevela/pkg/auth" "github.com/oam-dev/kubevela/pkg/cache" @@ -56,6 +57,7 @@ import ( oamv1beta1 "github.com/oam-dev/kubevela/pkg/controller/core.oam.dev/v1beta1" "github.com/oam-dev/kubevela/pkg/controller/core.oam.dev/v1beta1/application" "github.com/oam-dev/kubevela/pkg/features" + "github.com/oam-dev/kubevela/pkg/logging" "github.com/oam-dev/kubevela/pkg/monitor/watcher" "github.com/oam-dev/kubevela/pkg/multicluster" "github.com/oam-dev/kubevela/pkg/oam" @@ -241,7 +243,7 @@ func setupLogging(observabilityConfig *config.ObservabilityConfig) { // Set logger (use --dev-logs=true for local development) if observabilityConfig.DevLogs { - logOutput := newColorWriter(os.Stdout) + logOutput := logging.NewColorWriter(os.Stdout) klog.LogToStderr(false) klog.SetOutput(logOutput) ctrl.SetLogger(textlogger.NewLogger(textlogger.NewConfig(textlogger.Output(logOutput)))) @@ -478,14 +480,16 @@ func prepareRun(ctx context.Context, manager manager.Manager, coreOptions *optio } klog.InfoS("Starting vela controller manager with pre-start validation") - for _, hook := range []hooks.PreStartHook{hooks.NewSystemCRDValidationHook()} { - klog.V(2).InfoS("Running pre-start hook", "hook", fmt.Sprintf("%T", hook)) + for _, hook := range []hooks.PreStartHook{crdvalidation.NewHook()} { + hookName := hook.Name() + klog.InfoS("Running pre-start hook", "hook", hookName) if err := hook.Run(ctx); err != nil { - klog.ErrorS(err, "Failed to run pre-start hook", "hook", fmt.Sprintf("%T", hook)) - return fmt.Errorf("failed to run hook %T: %w", hook, err) + klog.ErrorS(err, "Failed to run pre-start hook", "hook", hookName) + return fmt.Errorf("failed to run hook %s: %w", hookName, err) } + klog.InfoS("Pre-start hook completed successfully", "hook", hookName) } - klog.InfoS("Pre-start validation completed successfully") + klog.InfoS("All pre-start validation hooks completed successfully") return nil } diff --git a/cmd/core/app/color_writer.go b/pkg/logging/color_writer.go similarity index 96% rename from cmd/core/app/color_writer.go rename to pkg/logging/color_writer.go index dd1fc51a4..e2a0ac1a5 100644 --- a/cmd/core/app/color_writer.go +++ b/pkg/logging/color_writer.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package app +package logging import ( "bytes" @@ -60,7 +60,12 @@ type colorWriter struct { buf bytes.Buffer } -func newColorWriter(dst io.Writer) io.Writer { +// NewColorWriter creates a new writer that adds ANSI color codes to klog output. +// It wraps the provided writer and colorizes log messages based on severity level, +// enhances location information with function names, and formats structured fields. +// This is primarily intended for development mode (--dev-logs=true) to improve +// log readability in terminal output. +func NewColorWriter(dst io.Writer) io.Writer { return &colorWriter{dst: dst} } diff --git a/pkg/logging/color_writer_test.go b/pkg/logging/color_writer_test.go new file mode 100644 index 000000000..aafcf8004 --- /dev/null +++ b/pkg/logging/color_writer_test.go @@ -0,0 +1,302 @@ +/* +Copyright 2022 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package logging_test + +import ( + "bytes" + "fmt" + "strings" + "sync" + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/oam-dev/kubevela/pkg/logging" +) + +func TestColorWriter(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Color Writer Suite") +} + +var _ = Describe("Color Writer", func() { + Context("formatting", func() { + It("should format INFO messages with colors", func() { + var buf bytes.Buffer + writer := logging.NewColorWriter(&buf) + + input := "I1117 12:34:56.789012 1234 server.go:123] Starting server" + _, err := writer.Write([]byte(input + "\n")) + Expect(err).Should(Succeed()) + + output := buf.String() + Expect(output).Should(ContainSubstring("\x1b[32m")) // Info color + Expect(output).Should(ContainSubstring("INFO")) + Expect(output).Should(ContainSubstring("Starting server")) + Expect(output).Should(ContainSubstring("\x1b[0m")) // Reset color + }) + + It("should format ERROR messages with colors", func() { + var buf bytes.Buffer + writer := logging.NewColorWriter(&buf) + + input := "E1117 12:34:56.789012 1234 server.go:456] Failed to start server" + _, err := writer.Write([]byte(input + "\n")) + Expect(err).Should(Succeed()) + + output := buf.String() + Expect(output).Should(ContainSubstring("\x1b[31m")) // Error color + Expect(output).Should(ContainSubstring("ERROR")) + Expect(output).Should(ContainSubstring("Failed to start server")) + }) + + It("should format WARNING messages with colors", func() { + var buf bytes.Buffer + writer := logging.NewColorWriter(&buf) + + input := "W1117 12:34:56.789012 1234 server.go:789] Configuration deprecated" + _, err := writer.Write([]byte(input + "\n")) + Expect(err).Should(Succeed()) + + output := buf.String() + Expect(output).Should(ContainSubstring("\x1b[33m")) // Warning color + Expect(output).Should(ContainSubstring("WARN")) + Expect(output).Should(ContainSubstring("Configuration deprecated")) + }) + + It("should handle structured logging with key-value pairs", func() { + var buf bytes.Buffer + writer := logging.NewColorWriter(&buf) + + input := "I1117 12:34:56.789012 1234 hook.go:123] Running hook name=CRDValidation status=success" + _, err := writer.Write([]byte(input + "\n")) + Expect(err).Should(Succeed()) + + output := buf.String() + Expect(output).Should(ContainSubstring("Running hook")) + Expect(output).Should(ContainSubstring("\x1b[96m")) // Key color + Expect(output).Should(ContainSubstring("name")) + Expect(output).Should(ContainSubstring("\x1b[37m")) // Value color + Expect(output).Should(ContainSubstring("CRDValidation")) + }) + + It("should handle quoted values in key-value pairs", func() { + var buf bytes.Buffer + writer := logging.NewColorWriter(&buf) + + input := `I1117 12:34:56.789012 1234 server.go:123] Processing request path="/api/v1" method="GET"` + _, err := writer.Write([]byte(input + "\n")) + Expect(err).Should(Succeed()) + + output := buf.String() + Expect(output).Should(ContainSubstring("Processing request")) + Expect(output).Should(ContainSubstring("path")) + Expect(output).Should(ContainSubstring(`"/api/v1"`)) + Expect(output).Should(ContainSubstring("method")) + Expect(output).Should(ContainSubstring(`"GET"`)) + }) + }) + + Context("concurrency safety", func() { + It("should handle concurrent writes safely", func() { + var buf bytes.Buffer + writer := logging.NewColorWriter(&buf) + + const numGoroutines = 100 + const numWrites = 50 + + var wg sync.WaitGroup + wg.Add(numGoroutines) + + // Use a channel to detect race conditions + errors := make(chan error, numGoroutines*numWrites) + + for i := 0; i < numGoroutines; i++ { + go func(id int) { + defer wg.Done() + for j := 0; j < numWrites; j++ { + msg := fmt.Sprintf("I1117 12:34:56.789012 %04d test.go:123] Goroutine %d write %d\n", + id, id, j) + _, err := writer.Write([]byte(msg)) + if err != nil { + errors <- err + } + } + }(i) + } + + wg.Wait() + close(errors) + + // Check for any errors + for err := range errors { + Expect(err).Should(BeNil()) + } + + // Verify output contains expected number of lines + output := buf.String() + lines := bytes.Count([]byte(output), []byte("\n")) + Expect(lines).Should(Equal(numGoroutines * numWrites)) + }) + + It("should maintain data integrity under concurrent access", func() { + var buf bytes.Buffer + writer := logging.NewColorWriter(&buf) + + const numGoroutines = 50 + messages := make(map[string]bool) + var mu sync.Mutex + + var wg sync.WaitGroup + wg.Add(numGoroutines) + + for i := 0; i < numGoroutines; i++ { + go func(id int) { + defer wg.Done() + // Create unique messages + for j := 0; j < 10; j++ { + msg := fmt.Sprintf("I1117 12:34:56.789012 %04d test.go:%03d] Unique message %d-%d", + id, j*10, id, j) + mu.Lock() + messages[msg] = true + mu.Unlock() + + _, err := writer.Write([]byte(msg + "\n")) + Expect(err).Should(BeNil()) + } + }(i) + } + + wg.Wait() + + // Verify all unique messages appear in output + output := buf.String() + for msg := range messages { + // Extract the unique part of each message (e.g., "Unique message 1-2") + // The message format is: "I1117 12:34:56.789012 XXXX test.go:YYY] Unique message X-Y" + // We look for the complete "Unique message X-Y" part + startIdx := strings.Index(msg, "Unique message") + if startIdx >= 0 { + uniquePart := msg[startIdx:] // Get "Unique message X-Y" + Expect(output).Should(ContainSubstring(uniquePart), + "Missing message: %s", uniquePart) + } + } + }) + + It("should handle partial writes correctly", func() { + var buf bytes.Buffer + writer := logging.NewColorWriter(&buf) + + var wg sync.WaitGroup + wg.Add(2) + + // Goroutine 1: Write a message in parts + go func() { + defer wg.Done() + part1 := []byte("I1117 12:34:56.789012 1234 test.go:123] ") + part2 := []byte("First ") + part3 := []byte("message\n") + + writer.Write(part1) + writer.Write(part2) + writer.Write(part3) + }() + + // Goroutine 2: Write a complete message + go func() { + defer wg.Done() + msg := []byte("I1117 12:34:56.789012 5678 test.go:456] Second message\n") + writer.Write(msg) + }() + + wg.Wait() + + // Both messages should be in the output + output := buf.String() + Expect(output).Should(ContainSubstring("First message")) + Expect(output).Should(ContainSubstring("Second message")) + }) + }) + + Context("edge cases", func() { + It("should handle empty input", func() { + var buf bytes.Buffer + writer := logging.NewColorWriter(&buf) + + n, err := writer.Write([]byte("")) + Expect(err).Should(BeNil()) + Expect(n).Should(Equal(0)) + Expect(buf.String()).Should(Equal("")) + }) + + It("should handle input without newlines", func() { + var buf bytes.Buffer + writer := logging.NewColorWriter(&buf) + + input := "I1117 12:34:56.789012 1234 test.go:123] Message without newline" + n, err := writer.Write([]byte(input)) + Expect(err).Should(BeNil()) + Expect(n).Should(Equal(len(input))) + + // The message should be buffered, not yet written to output + Expect(buf.String()).Should(Equal("")) + + // Write a newline to flush + writer.Write([]byte("\n")) + Expect(buf.String()).Should(ContainSubstring("Message without newline")) + }) + + It("should handle malformed log lines gracefully", func() { + var buf bytes.Buffer + writer := logging.NewColorWriter(&buf) + + // Various malformed inputs + inputs := []string{ + "Not a klog line\n", + "I\n", + "I1117\n", + "Random text with no structure\n", + "\n", + } + + for _, input := range inputs { + _, err := writer.Write([]byte(input)) + Expect(err).Should(BeNil()) + } + + // Should have written something for each input + output := buf.String() + lines := bytes.Count([]byte(output), []byte("\n")) + Expect(lines).Should(Equal(len(inputs))) + }) + + It("should handle very long lines", func() { + var buf bytes.Buffer + writer := logging.NewColorWriter(&buf) + + // Create a very long message + longMsg := "I1117 12:34:56.789012 1234 test.go:123] " + string(make([]byte, 10000)) + _, err := writer.Write([]byte(longMsg + "\n")) + Expect(err).Should(BeNil()) + + output := buf.String() + Expect(len(output)).Should(BeNumerically(">", 10000)) + }) + }) +})