Feat: add support for compressing apprev using gzip and zstd (#5090)

* Feat: add support for compressing apprev using gzip and zstd

Signed-off-by: Charlie Chiang <charlie_c_0129@outlook.com>

* Test: fix tests

Signed-off-by: Charlie Chiang <charlie_c_0129@outlook.com>

* Test: fix tests

Signed-off-by: Charlie Chiang <charlie_c_0129@outlook.com>

* Refactor: use move compressible fields into a separate struct

Signed-off-by: Charlie Chiang <charlie_c_0129@outlook.com>

* Refactor: use compression util from kubevela/pkg

Signed-off-by: Charlie Chiang <charlie_c_0129@outlook.com>

* Test: fix core-api-test

Signed-off-by: Charlie Chiang <charlie_c_0129@outlook.com>

* Feat: add compression ratio in `revision list`

Signed-off-by: Charlie Chiang <charlie_c_0129@outlook.com>

* Test: fix tests

Signed-off-by: Charlie Chiang <charlie_c_0129@outlook.com>

Signed-off-by: Charlie Chiang <charlie_c_0129@outlook.com>
This commit is contained in:
Charlie Chiang
2022-11-24 10:27:13 +08:00
committed by GitHub
parent 734025f03f
commit fdc4622208
37 changed files with 512 additions and 581 deletions

View File

@@ -17,9 +17,11 @@
package v1beta1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"encoding/json"
"github.com/kubevela/pkg/util/compression"
workflowv1alpha1 "github.com/kubevela/workflow/api/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/oam-dev/kubevela/apis/core.oam.dev/common"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1alpha1"
@@ -29,6 +31,16 @@ import (
// ApplicationRevisionSpec is the spec of ApplicationRevision
type ApplicationRevisionSpec struct {
// ApplicationRevisionCompressibleFields represents all the fields that can be compressed.
ApplicationRevisionCompressibleFields `json:",inline"`
// Compression represents the compressed components in apprev in base64 (if compression is enabled).
Compression ApplicationRevisionCompression `json:"compression,omitempty"`
}
// ApplicationRevisionCompressibleFields represents all the fields that can be compressed.
// So we can better organize them and compress only the compressible fields.
type ApplicationRevisionCompressibleFields struct {
// Application records the snapshot of the created/modified Application
Application Application `json:"application"`
@@ -64,6 +76,59 @@ type ApplicationRevisionSpec struct {
ReferredObjects []common.ReferredObject `json:"referredObjects,omitempty"`
}
// ApplicationRevisionCompression represents the compressed components in apprev in base64.
type ApplicationRevisionCompression struct {
compression.CompressedText `json:",inline"`
}
// MarshalJSON serves the same purpose as the one in ResourceTrackerSpec.
func (apprev *ApplicationRevisionSpec) MarshalJSON() ([]byte, error) {
type Alias ApplicationRevisionSpec
tmp := &struct {
*Alias
}{}
if apprev.Compression.Type == compression.Uncompressed {
tmp.Alias = (*Alias)(apprev)
} else {
cpy := apprev.DeepCopy()
err := cpy.Compression.EncodeFrom(cpy.ApplicationRevisionCompressibleFields)
cpy.ApplicationRevisionCompressibleFields = ApplicationRevisionCompressibleFields{
// Application needs to have components.
Application: Application{Spec: ApplicationSpec{Components: []common.ApplicationComponent{}}},
}
if err != nil {
return nil, err
}
tmp.Alias = (*Alias)(cpy)
}
return json.Marshal(tmp.Alias)
}
// UnmarshalJSON serves the same purpose as the one in ResourceTrackerSpec.
func (apprev *ApplicationRevisionSpec) UnmarshalJSON(data []byte) error {
type Alias ApplicationRevisionSpec
tmp := &struct {
*Alias
}{}
if err := json.Unmarshal(data, tmp); err != nil {
return err
}
if tmp.Compression.Type != compression.Uncompressed {
err := tmp.Compression.DecodeTo(&tmp.ApplicationRevisionCompressibleFields)
if err != nil {
return err
}
tmp.Compression.Clean()
}
(*ApplicationRevisionSpec)(tmp.Alias).DeepCopyInto(apprev)
return nil
}
// ApplicationRevisionStatus is the status of ApplicationRevision
type ApplicationRevisionStatus struct {
// Succeeded records if the workflow finished running with success

View File

@@ -0,0 +1,86 @@
/*
Copyright 2021 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1beta1
import (
"encoding/json"
"fmt"
"testing"
"github.com/kubevela/pkg/util/compression"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/runtime"
"github.com/oam-dev/kubevela/apis/core.oam.dev/common"
)
func TestApplicationRevisionCompression(t *testing.T) {
// Fill data
spec := &ApplicationRevisionSpec{}
spec.Application = Application{Spec: ApplicationSpec{Components: []common.ApplicationComponent{{Name: "test-name"}}}}
spec.ComponentDefinitions = make(map[string]ComponentDefinition)
spec.ComponentDefinitions["def"] = ComponentDefinition{Spec: ComponentDefinitionSpec{PodSpecPath: "path"}}
spec.WorkloadDefinitions = make(map[string]WorkloadDefinition)
spec.WorkloadDefinitions["def"] = WorkloadDefinition{Spec: WorkloadDefinitionSpec{Reference: common.DefinitionReference{Name: "testdef"}}}
spec.TraitDefinitions = make(map[string]TraitDefinition)
spec.TraitDefinitions["def"] = TraitDefinition{Spec: TraitDefinitionSpec{ControlPlaneOnly: true}}
spec.ScopeDefinitions = make(map[string]ScopeDefinition)
spec.ScopeDefinitions["def"] = ScopeDefinition{Spec: ScopeDefinitionSpec{AllowComponentOverlap: true}}
spec.PolicyDefinitions = make(map[string]PolicyDefinition)
spec.PolicyDefinitions["def"] = PolicyDefinition{Spec: PolicyDefinitionSpec{ManageHealthCheck: true}}
spec.WorkflowStepDefinitions = make(map[string]WorkflowStepDefinition)
spec.WorkflowStepDefinitions["def"] = WorkflowStepDefinition{Spec: WorkflowStepDefinitionSpec{Reference: common.DefinitionReference{Name: "testname"}}}
spec.ReferredObjects = []common.ReferredObject{{RawExtension: runtime.RawExtension{Raw: []byte("123")}}}
testAppRev := &ApplicationRevision{Spec: *spec}
marshalAndUnmarshal := func(in *ApplicationRevision) (*ApplicationRevision, int) {
out := &ApplicationRevision{}
b, err := json.Marshal(in)
assert.NoError(t, err)
if in.Spec.Compression.Type != compression.Uncompressed {
assert.Contains(t, string(b), fmt.Sprintf("\"type\":\"%s\",\"data\":\"", in.Spec.Compression.Type))
}
err = json.Unmarshal(b, out)
assert.NoError(t, err)
assert.Equal(t, out.Spec.Compression.Type, in.Spec.Compression.Type)
assert.Equal(t, out.Spec.Compression.Data, "")
return out, len(b)
}
// uncompressed
testAppRev.Spec.Compression.SetType(compression.Uncompressed)
uncomp, uncompsize := marshalAndUnmarshal(testAppRev)
// zstd compressed
testAppRev.Spec.Compression.SetType(compression.Zstd)
zstdcomp, zstdsize := marshalAndUnmarshal(testAppRev)
// We will compare content later. Clear compression methods since it will interfere
// comparison and is verified earlier.
zstdcomp.Spec.Compression.SetType(compression.Uncompressed)
// gzip compressed
testAppRev.Spec.Compression.SetType(compression.Gzip)
gzipcomp, gzipsize := marshalAndUnmarshal(testAppRev)
gzipcomp.Spec.Compression.SetType(compression.Uncompressed)
assert.Equal(t, uncomp, zstdcomp)
assert.Equal(t, zstdcomp, gzipcomp)
assert.Less(t, zstdsize, uncompsize)
assert.Less(t, gzipsize, uncompsize)
}

View File

@@ -29,11 +29,12 @@ import (
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/kubevela/pkg/util/compression"
"github.com/oam-dev/kubevela/apis/core.oam.dev/common"
"github.com/oam-dev/kubevela/apis/interfaces"
velatypes "github.com/oam-dev/kubevela/apis/types"
"github.com/oam-dev/kubevela/pkg/oam"
"github.com/oam-dev/kubevela/pkg/utils/compression"
velaerr "github.com/oam-dev/kubevela/pkg/utils/errors"
)
@@ -76,10 +77,9 @@ type ResourceTrackerSpec struct {
Compression ResourceTrackerCompression `json:"compression,omitempty"`
}
// ResourceTrackerCompression the compression for ResourceTracker ManagedResources
// ResourceTrackerCompression represents the compressed components in ResourceTracker.
type ResourceTrackerCompression struct {
Type compression.Type `json:"type,omitempty"`
Data string `json:"data,omitempty"`
compression.CompressedText `json:",inline"`
}
// MarshalJSON will encode ResourceTrackerSpec according to the compression type. If type specified,
@@ -88,30 +88,19 @@ type ResourceTrackerCompression struct {
func (in *ResourceTrackerSpec) MarshalJSON() ([]byte, error) {
type Alias ResourceTrackerSpec
tmp := &struct{ *Alias }{}
switch in.Compression.Type {
case compression.Uncompressed:
if in.Compression.Type == compression.Uncompressed {
tmp.Alias = (*Alias)(in)
case compression.Gzip:
} else {
cpy := in.DeepCopy()
data, err := compression.GzipObjectToString(in.ManagedResources)
cpy.ManagedResources = nil
err := cpy.Compression.EncodeFrom(in.ManagedResources)
if err != nil {
return nil, err
}
cpy.ManagedResources = nil
cpy.Compression.Data = data
tmp.Alias = (*Alias)(cpy)
case compression.Zstd:
cpy := in.DeepCopy()
data, err := compression.ZstdObjectToString(in.ManagedResources)
if err != nil {
return nil, err
}
cpy.ManagedResources = nil
cpy.Compression.Data = data
tmp.Alias = (*Alias)(cpy)
default:
return nil, compression.NewUnsupportedCompressionTypeError(string(in.Compression.Type))
}
return json.Marshal(tmp.Alias)
}
@@ -124,24 +113,16 @@ func (in *ResourceTrackerSpec) UnmarshalJSON(src []byte) error {
if err := json.Unmarshal(src, tmp); err != nil {
return err
}
switch tmp.Compression.Type {
case compression.Uncompressed:
break
case compression.Gzip:
if tmp.Compression.Type != compression.Uncompressed {
tmp.ManagedResources = []ManagedResource{}
if err := compression.GunzipStringToObject(tmp.Compression.Data, &tmp.ManagedResources); err != nil {
err := tmp.Compression.DecodeTo(&tmp.ManagedResources)
if err != nil {
return err
}
tmp.Compression.Data = ""
case compression.Zstd:
tmp.ManagedResources = []ManagedResource{}
if err := compression.UnZstdStringToObject(tmp.Compression.Data, &tmp.ManagedResources); err != nil {
return err
}
tmp.Compression.Data = ""
default:
return compression.NewUnsupportedCompressionTypeError(string(in.Compression.Type))
tmp.Compression.Clean()
}
(*ResourceTrackerSpec)(tmp.Alias).DeepCopyInto(in)
return nil
}

View File

@@ -24,6 +24,7 @@ import (
"testing"
"time"
"github.com/kubevela/pkg/util/compression"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
@@ -34,7 +35,6 @@ import (
"github.com/oam-dev/kubevela/apis/core.oam.dev/common"
"github.com/oam-dev/kubevela/pkg/oam"
"github.com/oam-dev/kubevela/pkg/utils/compression"
"github.com/oam-dev/kubevela/pkg/utils/errors"
)

View File

@@ -137,39 +137,7 @@ func (in *ApplicationRevision) DeepCopyObject() runtime.Object {
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ApplicationRevisionList) DeepCopyInto(out *ApplicationRevisionList) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ListMeta.DeepCopyInto(&out.ListMeta)
if in.Items != nil {
in, out := &in.Items, &out.Items
*out = make([]ApplicationRevision, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ApplicationRevisionList.
func (in *ApplicationRevisionList) DeepCopy() *ApplicationRevisionList {
if in == nil {
return nil
}
out := new(ApplicationRevisionList)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *ApplicationRevisionList) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ApplicationRevisionSpec) DeepCopyInto(out *ApplicationRevisionSpec) {
func (in *ApplicationRevisionCompressibleFields) DeepCopyInto(out *ApplicationRevisionCompressibleFields) {
*out = *in
in.Application.DeepCopyInto(&out.Application)
if in.ComponentDefinitions != nil {
@@ -242,6 +210,71 @@ func (in *ApplicationRevisionSpec) DeepCopyInto(out *ApplicationRevisionSpec) {
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ApplicationRevisionCompressibleFields.
func (in *ApplicationRevisionCompressibleFields) DeepCopy() *ApplicationRevisionCompressibleFields {
if in == nil {
return nil
}
out := new(ApplicationRevisionCompressibleFields)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ApplicationRevisionCompression) DeepCopyInto(out *ApplicationRevisionCompression) {
*out = *in
out.CompressedText = in.CompressedText
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ApplicationRevisionCompression.
func (in *ApplicationRevisionCompression) DeepCopy() *ApplicationRevisionCompression {
if in == nil {
return nil
}
out := new(ApplicationRevisionCompression)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ApplicationRevisionList) DeepCopyInto(out *ApplicationRevisionList) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ListMeta.DeepCopyInto(&out.ListMeta)
if in.Items != nil {
in, out := &in.Items, &out.Items
*out = make([]ApplicationRevision, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ApplicationRevisionList.
func (in *ApplicationRevisionList) DeepCopy() *ApplicationRevisionList {
if in == nil {
return nil
}
out := new(ApplicationRevisionList)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *ApplicationRevisionList) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ApplicationRevisionSpec) DeepCopyInto(out *ApplicationRevisionSpec) {
*out = *in
in.ApplicationRevisionCompressibleFields.DeepCopyInto(&out.ApplicationRevisionCompressibleFields)
out.Compression = in.Compression
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ApplicationRevisionSpec.
func (in *ApplicationRevisionSpec) DeepCopy() *ApplicationRevisionSpec {
if in == nil {
@@ -654,6 +687,7 @@ func (in *ResourceTracker) DeepCopyObject() runtime.Object {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ResourceTrackerCompression) DeepCopyInto(out *ResourceTrackerCompression) {
*out = *in
out.CompressedText = in.CompressedText
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ResourceTrackerCompression.

View File

@@ -81,23 +81,25 @@ helm install --create-namespace -n vela-system kubevela kubevela/vela-core --wai
### KubeVela controller optimization parameters
| Name | Description | Value |
| ------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | ------- |
| `optimize.cachedGvks` | Optimize types of resources to be cached. | `""` |
| `optimize.resourceTrackerListOp` | Optimize ResourceTracker List Op by adding index. | `true` |
| `optimize.controllerReconcileLoopReduction` | Optimize ApplicationController reconcile by reducing the number of loops to reconcile application. | `false` |
| `optimize.markWithProb` | Optimize ResourceTracker GC by only run mark with probability. Side effect: outdated ResourceTracker might not be able to be removed immediately. | `0.1` |
| `optimize.disableComponentRevision` | Optimize componentRevision by disabling the creation and gc | `false` |
| `optimize.disableApplicationRevision` | Optimize ApplicationRevision by disabling the creation and gc. | `false` |
| `optimize.disableWorkflowRecorder` | Optimize workflow recorder by disabling the creation and gc. | `false` |
| `optimize.enableInMemoryWorkflowContext` | Optimize workflow by use in-memory context. | `false` |
| `optimize.disableResourceApplyDoubleCheck` | Optimize workflow by ignoring resource double check after apply. | `false` |
| `optimize.enableResourceTrackerDeleteOnlyTrigger` | Optimize resourcetracker by only trigger reconcile when resourcetracker is deleted. | `true` |
| `featureGates.enableLegacyComponentRevision` | if disabled, only component with rollout trait will create component revisions | `false` |
| `featureGates.gzipResourceTracker` | if enabled, resourceTracker will be compressed using gzip before being stored | `false` |
| `featureGates.zstdResourceTracker` | if enabled, resourceTracker will be compressed using zstd before being stored. It is much faster and more efficient than gzip. If both gzip and zstd are enabled, zstd will be used. | `false` |
| `featureGates.applyOnce` | if enabled, the apply-once feature will be applied to all applications, no state-keep and no resource data storage in ResourceTracker | `false` |
| `featureGates.multiStageComponentApply` | if enabled, the multiStageComponentApply feature will be combined with the stage field in TraitDefinition to complete the multi-stage apply. | `false` |
| Name | Description | Value |
| ------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- |
| `optimize.cachedGvks` | Optimize types of resources to be cached. | `""` |
| `optimize.resourceTrackerListOp` | Optimize ResourceTracker List Op by adding index. | `true` |
| `optimize.controllerReconcileLoopReduction` | Optimize ApplicationController reconcile by reducing the number of loops to reconcile application. | `false` |
| `optimize.markWithProb` | Optimize ResourceTracker GC by only run mark with probability. Side effect: outdated ResourceTracker might not be able to be removed immediately. | `0.1` |
| `optimize.disableComponentRevision` | Optimize componentRevision by disabling the creation and gc | `false` |
| `optimize.disableApplicationRevision` | Optimize ApplicationRevision by disabling the creation and gc. | `false` |
| `optimize.disableWorkflowRecorder` | Optimize workflow recorder by disabling the creation and gc. | `false` |
| `optimize.enableInMemoryWorkflowContext` | Optimize workflow by use in-memory context. | `false` |
| `optimize.disableResourceApplyDoubleCheck` | Optimize workflow by ignoring resource double check after apply. | `false` |
| `optimize.enableResourceTrackerDeleteOnlyTrigger` | Optimize resourcetracker by only trigger reconcile when resourcetracker is deleted. | `true` |
| `featureGates.enableLegacyComponentRevision` | if disabled, only component with rollout trait will create component revisions | `false` |
| `featureGates.gzipResourceTracker` | compress ResourceTracker using gzip (good) before being stored. This is reduces network throughput when dealing with huge ResourceTrackers. | `false` |
| `featureGates.zstdResourceTracker` | compress ResourceTracker using zstd (fast and good) before being stored. This is reduces network throughput when dealing with huge ResourceTrackers. Note that zstd will be prioritized if you enable other compression options. | `false` |
| `featureGates.applyOnce` | if enabled, the apply-once feature will be applied to all applications, no state-keep and no resource data storage in ResourceTracker | `false` |
| `featureGates.multiStageComponentApply` | if enabled, the multiStageComponentApply feature will be combined with the stage field in TraitDefinition to complete the multi-stage apply. | `false` |
| `featureGates.gzipApplicationRevision` | compress apprev using gzip (good) before being stored. This is reduces network throughput when dealing with huge apprevs. | `false` |
| `featureGates.zstdApplicationRevision` | compress apprev using zstd (fast and good) before being stored. This is reduces network throughput when dealing with huge apprevs. Note that zstd will be prioritized if you enable other compression options. | `false` |
### MultiCluster parameters

View File

@@ -3199,6 +3199,16 @@ spec:
description: ComponentDefinitions records the snapshot of the componentDefinitions
related with the created/modified Application
type: object
compression:
description: Compression represents the compressed components in apprev
in base64 (if compression is enabled).
properties:
data:
type: string
type:
description: Type the compression type
type: string
type: object
policies:
additionalProperties:
description: Policy is the Schema for the policy API

View File

@@ -57,8 +57,8 @@ spec:
format: int64
type: integer
compression:
description: ResourceTrackerCompression the compression for ResourceTracker
ManagedResources
description: ResourceTrackerCompression represents the compressed
components in ResourceTracker.
properties:
data:
type: string

View File

@@ -255,6 +255,8 @@ spec:
- "--feature-gates=ZstdResourceTracker={{- .Values.featureGates.zstdResourceTracker | toString -}}"
- "--feature-gates=ApplyOnce={{- .Values.featureGates.applyOnce | toString -}}"
- "--feature-gates=MultiStageComponentApply= {{- .Values.featureGates.multiStageComponentApply | toString -}}"
- "--feature-gates=GzipApplicationRevision={{- .Values.featureGates.gzipResourceTracker | toString -}}"
- "--feature-gates=ZstdApplicationRevision={{- .Values.featureGates.zstdResourceTracker | toString -}}"
{{ if .Values.authentication.enabled }}
{{ if .Values.authentication.withUser }}
- "--authentication-with-user"

View File

@@ -110,16 +110,21 @@ optimize:
enableResourceTrackerDeleteOnlyTrigger: true
##@param featureGates.enableLegacyComponentRevision if disabled, only component with rollout trait will create component revisions
##@param featureGates.gzipResourceTracker if enabled, resourceTracker will be compressed using gzip before being stored
##@param featureGates.zstdResourceTracker if enabled, resourceTracker will be compressed using zstd before being stored. It is much faster and more efficient than gzip. If both gzip and zstd are enabled, zstd will be used.
##@param featureGates.gzipResourceTracker compress ResourceTracker using gzip (good) before being stored. This is reduces network throughput when dealing with huge ResourceTrackers.
##@param featureGates.zstdResourceTracker compress ResourceTracker using zstd (fast and good) before being stored. This is reduces network throughput when dealing with huge ResourceTrackers. Note that zstd will be prioritized if you enable other compression options.
##@param featureGates.applyOnce if enabled, the apply-once feature will be applied to all applications, no state-keep and no resource data storage in ResourceTracker
##@param featureGates.multiStageComponentApply if enabled, the multiStageComponentApply feature will be combined with the stage field in TraitDefinition to complete the multi-stage apply.
##@param featureGates.gzipApplicationRevision compress apprev using gzip (good) before being stored. This is reduces network throughput when dealing with huge apprevs.
##@param featureGates.zstdApplicationRevision compress apprev using zstd (fast and good) before being stored. This is reduces network throughput when dealing with huge apprevs. Note that zstd will be prioritized if you enable other compression options.
##@param
featureGates:
enableLegacyComponentRevision: false
gzipResourceTracker: false
zstdResourceTracker: false
applyOnce: false
multiStageComponentApply: false
gzipApplicationRevision: false
zstdApplicationRevision: false
## @section MultiCluster parameters

View File

@@ -3199,6 +3199,16 @@ spec:
description: ComponentDefinitions records the snapshot of the componentDefinitions
related with the created/modified Application
type: object
compression:
description: Compression represents the compressed components in apprev
in base64 (if compression is enabled).
properties:
data:
type: string
type:
description: Type the compression type
type: string
type: object
policies:
additionalProperties:
description: Policy is the Schema for the policy API

View File

@@ -57,8 +57,8 @@ spec:
format: int64
type: integer
compression:
description: ResourceTrackerCompression the compression for ResourceTracker
ManagedResources
description: ResourceTrackerCompression represents the compressed
components in ResourceTracker.
properties:
data:
type: string

4
go.mod
View File

@@ -54,9 +54,8 @@ require (
github.com/hashicorp/hcl/v2 v2.9.1
github.com/hinshun/vt10x v0.0.0-20180616224451-1954e6464174
github.com/imdario/mergo v0.3.12
github.com/klauspost/compress v1.15.11
github.com/koding/websocketproxy v0.0.0-20181220232114-7ed82d81a28c
github.com/kubevela/pkg v0.0.0-20221024115939-a103acee6db2
github.com/kubevela/pkg v0.0.0-20221122113941-4eea2d7495b7
github.com/kubevela/prism v1.5.1-0.20220915071949-6bf3ad33f84f
github.com/kubevela/workflow v0.3.5-0.20221115021445-48df288898ad
github.com/kyokomi/emoji v2.2.4+incompatible
@@ -219,6 +218,7 @@ require (
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
github.com/klauspost/compress v1.15.12 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/kr/pty v1.1.8 // indirect
github.com/kr/text v0.2.0 // indirect

8
go.sum
View File

@@ -1303,8 +1303,8 @@ github.com/klauspost/compress v1.11.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYs
github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/compress v1.13.5/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c=
github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
github.com/klauspost/compress v1.15.12 h1:YClS/PImqYbn+UILDnqxQCZ3RehC9N318SU3kElDUEM=
github.com/klauspost/compress v1.15.12/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6/go.mod h1:+ZoRqAPRLkC4NPOvfYeR5KNOrY6TD+/sAC3HXPZgDYg=
github.com/klauspost/pgzip v1.0.2-0.20170402124221-0bf5dcad4ada/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
@@ -1331,8 +1331,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kubevela/cue v0.4.4-0.20221107123854-a976b0e340be h1:0xj/Rh4yVy54mUD2nLmAuN1AYgBkkHxBh4PoLGbIg5g=
github.com/kubevela/cue v0.4.4-0.20221107123854-a976b0e340be/go.mod h1:Ya12qn7FZc+LSN0qgEhzEpnzQsvnGHVgoDrqe9i3eNg=
github.com/kubevela/pkg v0.0.0-20221024115939-a103acee6db2 h1:C3cAfrxst1+dIWgLLhUQt1TQvEEpp1UTq9ZQB2xKbeI=
github.com/kubevela/pkg v0.0.0-20221024115939-a103acee6db2/go.mod h1:TgIGEB/r0NOy63Jzem7WsL3AIr34l+ClH9dmPqcZ4d4=
github.com/kubevela/pkg v0.0.0-20221122113941-4eea2d7495b7 h1:phXUBWHwU76L6kT+ywwbjbtFz7/simyrLeGfRJnJYrA=
github.com/kubevela/pkg v0.0.0-20221122113941-4eea2d7495b7/go.mod h1:EvlkrVSDzOlDoLU2BQc7y31+DBP/5Qn3J2+d+QmdXV8=
github.com/kubevela/prism v1.5.1-0.20220915071949-6bf3ad33f84f h1:1lUtU1alPThdcsn4MI6XjPb7eJLuZPpmlEdgjtnUMKw=
github.com/kubevela/prism v1.5.1-0.20220915071949-6bf3ad33f84f/go.mod h1:m724/7ANnB/iukyHW20+DicpeJMEC/JA0ZhgsHY10MA=
github.com/kubevela/workflow v0.3.5-0.20221115021445-48df288898ad h1:WLqg7iMrwvE7O4b0zRJCxoBzbUXv5aJpC5chqd4JdC4=

View File

@@ -48,9 +48,6 @@ clearRepo() {
echo "clear kubevela-core-api pkg/utils/errors"
rm -rf kubevela-core-api/pkg/utils/errors/*
echo "clear kubevela-core-api pkg/utils/compression"
rm -rf kubevela-core-api/pkg/utils/compression/*
echo "clear kubevela-core-api pkg/generated/client"
if [[ -d "kubevela-core-api/pkg/generated/client/" ]]
then
@@ -71,10 +68,6 @@ updateRepo() {
mkdir -p kubevela-core-api/pkg/utils/errors
cp -R kubevela/pkg/utils/errors/* kubevela-core-api/pkg/utils/errors/
echo "update kubevela-core-api pkg/utils/compression"
mkdir -p kubevela-core-api/pkg/utils/compression
cp -R kubevela/pkg/utils/compression/* kubevela-core-api/pkg/utils/compression/
echo "update kubevela-core-api pkg/generated/client"
cp -R kubevela/pkg/generated/client/* kubevela-core-api/pkg/generated/client/

View File

@@ -3199,6 +3199,16 @@ spec:
description: ComponentDefinitions records the snapshot of the componentDefinitions
related with the created/modified Application
type: object
compression:
description: Compression represents the compressed components in apprev
in base64 (if compression is enabled).
properties:
data:
type: string
type:
description: Type the compression type
type: string
type: object
policies:
additionalProperties:
description: Policy is the Schema for the policy API

View File

@@ -57,8 +57,8 @@ spec:
format: int64
type: integer
compression:
description: ResourceTrackerCompression the compression for ResourceTracker
ManagedResources
description: ResourceTrackerCompression represents the compressed
components in ResourceTracker.
properties:
data:
type: string

View File

@@ -32,20 +32,21 @@ e2e-setup-core-wo-auth:
.PHONY: e2e-setup-core-w-auth
e2e-setup-core-w-auth:
helm upgrade --install \
--create-namespace \
--namespace vela-system \
--set image.pullPolicy=IfNotPresent \
--set image.repository=vela-core-test \
--set applicationRevisionLimit=5 \
--set dependCheckWait=10s \
--set image.tag=$(GIT_COMMIT) \
--wait kubevela \
./charts/vela-core \
--set authentication.enabled=true \
--set authentication.withUser=true \
--set authentication.groupPattern=* \
--set featureGates.zstdResourceTracker=true
helm upgrade --install \
--create-namespace \
--namespace vela-system \
--set image.pullPolicy=IfNotPresent \
--set image.repository=vela-core-test \
--set applicationRevisionLimit=5 \
--set dependCheckWait=10s \
--set image.tag=$(GIT_COMMIT) \
--wait kubevela \
./charts/vela-core \
--set authentication.enabled=true \
--set authentication.withUser=true \
--set authentication.groupPattern=* \
--set featureGates.zstdResourceTracker=true \
--set featureGates.zstdApplicationRevision=true
.PHONY: e2e-setup-core
e2e-setup-core: e2e-setup-core-pre-hook e2e-setup-core-wo-auth e2e-setup-core-post-hook

View File

@@ -320,7 +320,9 @@ var _ = Describe("Test workflow service functions", func() {
Labels: map[string]string{"vela.io/wf-revision": "test-workflow-2-111"},
},
Spec: v1beta1.ApplicationRevisionSpec{
Application: *appWithRevision,
ApplicationRevisionCompressibleFields: v1beta1.ApplicationRevisionCompressibleFields{
Application: *appWithRevision,
},
},
}
err = workflowService.KubeClient.Create(ctx, appRevision)

View File

@@ -204,7 +204,9 @@ var _ = Describe("Test handleCheckManageWorkloadTrait func", func() {
}
appRev := v1beta1.ApplicationRevision{
Spec: v1beta1.ApplicationRevisionSpec{
TraitDefinitions: traitDefs,
ApplicationRevisionCompressibleFields: v1beta1.ApplicationRevisionCompressibleFields{
TraitDefinitions: traitDefs,
},
},
}
rolloutTrait := &unstructured.Unstructured{}

View File

@@ -217,11 +217,13 @@ var _ = Describe("Test Application workflow generator", func() {
Namespace: namespaceName,
},
Spec: oamcore.ApplicationRevisionSpec{
Application: *app.DeepCopy(),
ComponentDefinitions: make(map[string]oamcore.ComponentDefinition),
WorkloadDefinitions: make(map[string]oamcore.WorkloadDefinition),
TraitDefinitions: make(map[string]oamcore.TraitDefinition),
ScopeDefinitions: make(map[string]oamcore.ScopeDefinition),
ApplicationRevisionCompressibleFields: oamcore.ApplicationRevisionCompressibleFields{
Application: *app.DeepCopy(),
ComponentDefinitions: make(map[string]oamcore.ComponentDefinition),
WorkloadDefinitions: make(map[string]oamcore.WorkloadDefinition),
TraitDefinitions: make(map[string]oamcore.TraitDefinition),
ScopeDefinitions: make(map[string]oamcore.ScopeDefinition),
},
},
}
apprev.Spec.ComponentDefinitions["worker"] = *cd

View File

@@ -38,6 +38,8 @@ import (
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/kubevela/pkg/util/compression"
monitorContext "github.com/kubevela/pkg/monitor/context"
workflowv1alpha1 "github.com/kubevela/workflow/api/v1alpha1"
@@ -250,15 +252,17 @@ func (h *AppHandler) gatherRevisionSpec(af *appfile.Appfile) (*v1beta1.Applicati
copiedApp.Status = common.AppStatus{}
appRev := &v1beta1.ApplicationRevision{
Spec: v1beta1.ApplicationRevisionSpec{
Application: *copiedApp,
ComponentDefinitions: make(map[string]v1beta1.ComponentDefinition),
WorkloadDefinitions: make(map[string]v1beta1.WorkloadDefinition),
TraitDefinitions: make(map[string]v1beta1.TraitDefinition),
ScopeDefinitions: make(map[string]v1beta1.ScopeDefinition),
PolicyDefinitions: make(map[string]v1beta1.PolicyDefinition),
WorkflowStepDefinitions: make(map[string]v1beta1.WorkflowStepDefinition),
ScopeGVK: make(map[string]metav1.GroupVersionKind),
Policies: make(map[string]v1alpha1.Policy),
ApplicationRevisionCompressibleFields: v1beta1.ApplicationRevisionCompressibleFields{
Application: *copiedApp,
ComponentDefinitions: make(map[string]v1beta1.ComponentDefinition),
WorkloadDefinitions: make(map[string]v1beta1.WorkloadDefinition),
TraitDefinitions: make(map[string]v1beta1.TraitDefinition),
ScopeDefinitions: make(map[string]v1beta1.ScopeDefinition),
PolicyDefinitions: make(map[string]v1beta1.PolicyDefinition),
WorkflowStepDefinitions: make(map[string]v1beta1.WorkflowStepDefinition),
ScopeGVK: make(map[string]metav1.GroupVersionKind),
Policies: make(map[string]v1alpha1.Policy),
},
},
}
for _, w := range af.Workloads {
@@ -849,6 +853,15 @@ func (h *AppHandler) FinalizeAndApplyAppRevision(ctx context.Context) error {
return err
}
appRev.ResourceVersion = gotAppRev.ResourceVersion
// Set compression types (if enabled)
if utilfeature.DefaultMutableFeatureGate.Enabled(features.GzipApplicationRevision) {
appRev.Spec.Compression.SetType(compression.Gzip)
}
if utilfeature.DefaultMutableFeatureGate.Enabled(features.ZstdApplicationRevision) {
appRev.Spec.Compression.SetType(compression.Zstd)
}
return h.r.Update(ctx, appRev)
}

View File

@@ -132,10 +132,12 @@ var _ = Describe("test generate revision ", func() {
Name: "appRevision1",
},
Spec: v1beta1.ApplicationRevisionSpec{
ComponentDefinitions: make(map[string]v1beta1.ComponentDefinition),
WorkloadDefinitions: make(map[string]v1beta1.WorkloadDefinition),
TraitDefinitions: make(map[string]v1beta1.TraitDefinition),
ScopeDefinitions: make(map[string]v1beta1.ScopeDefinition),
ApplicationRevisionCompressibleFields: v1beta1.ApplicationRevisionCompressibleFields{
ComponentDefinitions: make(map[string]v1beta1.ComponentDefinition),
WorkloadDefinitions: make(map[string]v1beta1.WorkloadDefinition),
TraitDefinitions: make(map[string]v1beta1.TraitDefinition),
ScopeDefinitions: make(map[string]v1beta1.ScopeDefinition),
},
},
}
appRevision1.Spec.Application = app

View File

@@ -95,7 +95,7 @@ var _ = Describe("Test DefinitionRevision created by ComponentDefinition", func(
content, err := os.ReadFile("./test-data/webservice-cd.yaml")
Expect(err).Should(BeNil())
var cd v1beta1.ComponentDefinition
yaml.Unmarshal(content, &cd)
Expect(yaml.Unmarshal(content, &cd)).Should(BeNil())
cd.Name = cdName
cd.Namespace = namespace

View File

@@ -69,6 +69,13 @@ const (
// penalties are minimal.
ZstdResourceTracker featuregate.Feature = "ZstdResourceTracker"
// GzipApplicationRevision serves the same purpose as GzipResourceTracker,
// but for ApplicationRevision.
GzipApplicationRevision featuregate.Feature = "GzipApplicationRevision"
// ZstdApplicationRevision serves the same purpose as ZstdResourceTracker,
// but for ApplicationRevision.
ZstdApplicationRevision featuregate.Feature = "ZstdApplicationRevision"
// ApplyOnce enable the apply-once feature for all applications
// If enabled, no StateKeep will be run, ResourceTracker will also disable the storage of all resource data, only
// metadata will be kept
@@ -93,6 +100,8 @@ var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
ZstdResourceTracker: {Default: false, PreRelease: featuregate.Alpha},
ApplyOnce: {Default: false, PreRelease: featuregate.Alpha},
MultiStageComponentApply: {Default: false, PreRelease: featuregate.Alpha},
GzipApplicationRevision: {Default: false, PreRelease: featuregate.Alpha},
ZstdApplicationRevision: {Default: false, PreRelease: featuregate.Alpha},
}
func init() {

View File

@@ -21,6 +21,7 @@ import (
"fmt"
"github.com/crossplane/crossplane-runtime/pkg/meta"
"github.com/kubevela/pkg/util/compression"
"github.com/pkg/errors"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -34,7 +35,6 @@ import (
"github.com/oam-dev/kubevela/pkg/features"
"github.com/oam-dev/kubevela/pkg/monitor/metrics"
"github.com/oam-dev/kubevela/pkg/oam"
"github.com/oam-dev/kubevela/pkg/utils/compression"
velaerrors "github.com/oam-dev/kubevela/pkg/utils/errors"
)

View File

@@ -1,30 +0,0 @@
/*
Copyright 2022 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package compression
import "fmt"
type unsupportedCompressionTypeError string
func (e unsupportedCompressionTypeError) Error() string {
return fmt.Sprintf("unsupported compression type: %s", string(e))
}
// NewUnsupportedCompressionTypeError create a new unsupported compression type error
func NewUnsupportedCompressionTypeError(t string) error {
return unsupportedCompressionTypeError(t)
}

View File

@@ -1,27 +0,0 @@
/*
Copyright 2022 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package compression
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestErrors(t *testing.T) {
require.Equal(t, NewUnsupportedCompressionTypeError("x").Error(), "unsupported compression type: x")
}

View File

@@ -1,61 +0,0 @@
/*
Copyright 2022 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package compression
import (
"bytes"
"compress/gzip"
"encoding/base64"
"encoding/json"
"io"
)
// GzipObjectToString marshal object into json, compress it with gzip, encode the result with base64
func GzipObjectToString(obj interface{}) (string, error) {
bs, err := json.Marshal(obj)
if err != nil {
return "", err
}
var b bytes.Buffer
gz := gzip.NewWriter(&b)
if _, err = gz.Write(bs); err != nil {
return "", err
}
if err = gz.Flush(); err != nil {
return "", err
}
if err = gz.Close(); err != nil {
return "", err
}
return base64.StdEncoding.EncodeToString(b.Bytes()), nil
}
// GunzipStringToObject decode the compressed string with base64, decompress it with gzip, unmarshal it into obj
func GunzipStringToObject(compressed string, obj interface{}) error {
bs, err := base64.StdEncoding.DecodeString(compressed)
if err != nil {
return err
}
reader, err := gzip.NewReader(bytes.NewReader(bs))
if err != nil {
return err
}
if bs, err = io.ReadAll(reader); err != nil {
return err
}
return json.Unmarshal(bs, obj)
}

View File

@@ -1,29 +0,0 @@
/*
Copyright 2022 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package compression
// Type the compression type
type Type string
const (
// Uncompressed does not compress or encode data
Uncompressed Type = ""
// Gzip compresses data using gzip and encodes it using base64
Gzip Type = "gzip"
// Zstd compresses data using zstd and encodes it using base64
Zstd Type = "zstd"
)

View File

@@ -1,110 +0,0 @@
/*
Copyright 2022 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package compression
import (
"encoding/base64"
"encoding/json"
"github.com/klauspost/compress/zstd"
)
// Create a writer that caches compressors. For this operation type we supply a nil Reader.
var encoder, _ = zstd.NewWriter(nil,
// We use the default levels here because we got pretty good results.
// It is almost as fast as no compression at all when the object is large enough.
// Even with small objects, it is still very fast and efficient.
//
// Tests are here: /apis/core.oam.dev/v1beta1/resourcetracker_types_test.go
//
// Here are results:
// zstd.SpeedFastest:
// Compressed Size:
// uncompressed: 2131455 bytes 100.00%
// gzip: 273057 bytes 12.81%
// zstd: 191737 bytes 9.00%
// Marshal Time:
// no compression: 37740514 ns 1.00x
// gzip: 97389702 ns 2.58x
// zstd: 39866808 ns 1.06x
// zstd.SpeedDefault:
// Compressed Size:
// uncompressed: 2131455 bytes 100.00%
// gzip: 273057 bytes 12.81%
// zstd: 171577 bytes 8.05%
// Marshal Time:
// no compression: 42272142 ns 1.00x
// gzip: 90474722 ns 2.14x
// zstd: 39070416 ns 0.92x
// zstd.SpeedBetterCompression:
// Compressed Size:
// uncompressed: 2131455 bytes 100.00%
// gzip: 273057 bytes 12.81%
// zstd: 149061 bytes 6.99%
// Marshal Time:
// no compression: 38826717 ns 1.00x
// gzip: 94855264 ns 2.44x
// zstd: 48524197 ns 1.25x
zstd.WithEncoderLevel(zstd.SpeedDefault),
// TODO(charlie0129): give a dictionary to compressor to get even more improvements.
//
// Since we are dealing with highly-specialized small JSON data, a dictionary will
// give massive improvements, around 3x both (de)compression speed and size reduction,
// according to Facebook https://github.com/facebook/zstd#the-case-for-small-data-compression.
// zstd.WithEncoderDict(),
)
// Create a reader that caches decompressors.
var decoder, _ = zstd.NewReader(nil)
func compress(src []byte) []byte {
return encoder.EncodeAll(src, make([]byte, 0, len(src)))
}
func decompress(src []byte) ([]byte, error) {
return decoder.DecodeAll(src, nil)
}
// ZstdObjectToString marshals the object into json, compress it with zstd,
// encode the result with base64.
func ZstdObjectToString(obj interface{}) (string, error) {
bs, err := json.Marshal(obj)
if err != nil {
return "", err
}
compressedBytes := compress(bs)
return base64.StdEncoding.EncodeToString(compressedBytes), nil
}
// UnZstdStringToObject decodes the compressed string with base64,
// decompresses it with zstd, and unmarshals it. obj must be a pointer so that
// it can be updated.
func UnZstdStringToObject(encoded string, obj interface{}) error {
decoded, err := base64.StdEncoding.DecodeString(encoded)
if err != nil {
return err
}
decompressed, err := decompress(decoded)
if err != nil {
return err
}
return json.Unmarshal(decompressed, obj)
}

View File

@@ -1,50 +0,0 @@
/*
Copyright 2022 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package compression
import (
"math"
"testing"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
)
func TestZstdCompression(t *testing.T) {
obj := v1.ConfigMap{
Data: map[string]string{"1234": "5678"},
}
str, err := ZstdObjectToString(obj)
assert.NoError(t, err)
objOut := v1.ConfigMap{}
err = UnZstdStringToObject(str, &objOut)
assert.NoError(t, err)
assert.Equal(t, obj, objOut)
// Invalid obj
_, err = ZstdObjectToString(math.Inf(1))
assert.Error(t, err)
// Invalid base64 string
err = UnZstdStringToObject(".dew;.3234", &objOut)
assert.Error(t, err)
// Invalid zstd binary data
err = UnZstdStringToObject("MTIzNDUK", &objOut)
assert.Error(t, err)
}

View File

@@ -179,9 +179,11 @@ var appRev = v1beta1.ApplicationRevision{
},
},
Spec: v1beta1.ApplicationRevisionSpec{
Application: v1beta1.Application{
Spec: v1beta1.ApplicationSpec{
Components: []common.ApplicationComponent{},
ApplicationRevisionCompressibleFields: v1beta1.ApplicationRevisionCompressibleFields{
Application: v1beta1.Application{
Spec: v1beta1.ApplicationSpec{
Components: []common.ApplicationComponent{},
},
},
},
},

View File

@@ -18,6 +18,7 @@ import (
"fmt"
"io"
"github.com/kubevela/pkg/util/compression"
"github.com/pkg/errors"
"github.com/spf13/cobra"
apitypes "k8s.io/apimachinery/pkg/types"
@@ -82,37 +83,8 @@ func NewRevisionListCommand(c common.Args) *cobra.Command {
if err != nil {
return err
}
table := newUITable().AddRow("NAME", "PUBLISH_VERSION", "SUCCEEDED", "HASH", "BEGIN_TIME", "STATUS", "SIZE")
for _, rev := range revs {
var begin, status, hash, size string
status = "NotStart"
if rev.Status.Workflow != nil {
begin = rev.Status.Workflow.StartTime.Format("2006-01-02 15:04:05")
// aggregate workflow result
switch {
case rev.Status.Succeeded:
status = "Succeeded"
case rev.Status.Workflow.Terminated || rev.Status.Workflow.Suspend || rev.Status.Workflow.Finished:
status = "Failed"
case app.Status.LatestRevision != nil && app.Status.LatestRevision.Name == rev.Name:
status = "Executing"
default:
status = "Failed"
}
}
if labels := rev.GetLabels(); labels != nil {
hash = rev.GetLabels()[oam.LabelAppRevisionHash]
}
if bs, err := yaml.Marshal(rev.Spec); err == nil {
size = utils.ByteCountIEC(int64(len(bs)))
}
table.AddRow(rev.Name, oam.GetPublishVersion(rev.DeepCopy()), rev.Status.Succeeded, hash, begin, status, size)
}
if len(table.Rows) == 0 {
cmd.Printf("No revisions found for application %s/%s.\n", namespace, name)
} else {
cmd.Println(table.String())
}
printApprevs(cmd.OutOrStdout(), revs)
_, _ = cmd.OutOrStdout().Write([]byte("\n"))
return nil
},
}
@@ -209,7 +181,7 @@ func getRevision(ctx context.Context, c common.Args, format string, out io.Write
}
} else {
if format == "" {
printApprev(out, apprev)
printApprevs(out, []v1beta1.ApplicationRevision{apprev})
} else {
output, err := convertApplicationRevisionTo(format, &apprev)
if err != nil {
@@ -223,28 +195,43 @@ func getRevision(ctx context.Context, c common.Args, format string, out io.Write
return nil
}
func printApprev(out io.Writer, apprev v1beta1.ApplicationRevision) {
func printApprevs(out io.Writer, apprevs []v1beta1.ApplicationRevision) {
table := newUITable().AddRow("NAME", "PUBLISH_VERSION", "SUCCEEDED", "HASH", "BEGIN_TIME", "STATUS", "SIZE")
var begin, status, hash, size string
status = "NotStart"
if apprev.Status.Workflow != nil {
begin = apprev.Status.Workflow.StartTime.Format("2006-01-02 15:04:05")
// aggregate workflow result
switch {
case apprev.Status.Succeeded:
status = "Succeeded"
case apprev.Status.Workflow.Terminated || apprev.Status.Workflow.Suspend || apprev.Status.Workflow.Finished:
status = "Failed"
default:
status = "Executing or Failed"
for _, apprev := range apprevs {
var begin, status, hash, size string
status = "NotStart"
if apprev.Status.Workflow != nil {
begin = apprev.Status.Workflow.StartTime.Format("2006-01-02 15:04:05")
// aggregate workflow result
switch {
case apprev.Status.Succeeded:
status = "Succeeded"
case apprev.Status.Workflow.Terminated || apprev.Status.Workflow.Suspend || apprev.Status.Workflow.Finished:
status = "Failed"
default:
status = "Executing or Failed"
}
}
if labels := apprev.GetLabels(); labels != nil {
hash = apprev.GetLabels()[oam.LabelAppRevisionHash]
}
//nolint:gosec // apprev is only used here once, implicit memory aliasing is fine. (apprev pointer is required to call custom marshal methods)
if bs, err := yaml.Marshal(&apprev); err == nil {
compressedSize := len(bs)
size = utils.ByteCountIEC(int64(compressedSize))
// Show how much compressed if compression is enabled.
if apprev.Spec.Compression.Type != compression.Uncompressed {
// Get the original size.
apprev.Spec.Compression.Type = compression.Uncompressed
var uncompressedSize int
//nolint:gosec // apprev is only used here once, implicit memory aliasing is fine. (apprev pointer is required to call custom marshal methods)
if ubs, err := yaml.Marshal(&apprev); err == nil && len(ubs) > 0 {
uncompressedSize = len(ubs)
}
size += fmt.Sprintf(" (Compressed, %.0f%%)", float64(compressedSize)/float64(uncompressedSize)*100)
}
}
table.AddRow(apprev.Name, oam.GetPublishVersion(apprev.DeepCopy()), apprev.Status.Succeeded, hash, begin, status, size)
}
if labels := apprev.GetLabels(); labels != nil {
hash = apprev.GetLabels()[oam.LabelAppRevisionHash]
}
if bs, err := yaml.Marshal(apprev.Spec); err == nil {
size = utils.ByteCountIEC(int64(len(bs)))
}
table.AddRow(apprev.Name, oam.GetPublishVersion(apprev.DeepCopy()), apprev.Status.Succeeded, hash, begin, status, size)
fmt.Fprint(out, table.String())
}

View File

@@ -21,14 +21,15 @@ import (
"context"
"fmt"
"io/ioutil"
"strings"
"testing"
"time"
"github.com/kubevela/pkg/util/compression"
"github.com/stretchr/testify/assert"
"github.com/oam-dev/kubevela/apis/types"
"github.com/google/go-cmp/cmp"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
@@ -577,8 +578,8 @@ var _ = Describe("Test getRevision", func() {
Expect(getRevision(ctx, arg, format, out, name, namespace, def)).To(Succeed())
table := newUITable().AddRow("NAME", "PUBLISH_VERSION", "SUCCEEDED", "HASH", "BEGIN_TIME", "STATUS", "SIZE")
table.AddRow("first-vela-app-v1", "", "false", "1c3d847600ac0514", "", "NotStart", "19.1 KiB")
Expect(out.String()).To(Equal(table.String()))
table.AddRow("first-vela-app-v1", "", "false", "1c3d847600ac0514", "", "NotStart", "")
Expect(strings.ReplaceAll(out.String(), " ", "")).To(ContainSubstring(strings.ReplaceAll(table.String(), " ", "")))
})
It("Test normal case with yaml format", func() {
@@ -657,7 +658,7 @@ func TestPrintApprev(t *testing.T) {
},
Spec: v1beta1.ApplicationRevisionSpec{},
Status: v1beta1.ApplicationRevisionStatus{},
}, exp: tableOut("test-apprev0", "", "false", "", "", "NotStart", "95 B"),
}, exp: tableOut("test-apprev0", "", "false", "", "", "NotStart"),
},
"Succeeded": {out: &bytes.Buffer{}, apprev: v1beta1.ApplicationRevision{
ObjectMeta: metav1.ObjectMeta{
@@ -668,10 +669,12 @@ func TestPrintApprev(t *testing.T) {
},
},
Spec: v1beta1.ApplicationRevisionSpec{
Application: v1beta1.Application{
ObjectMeta: metav1.ObjectMeta{
Name: "test-app1",
Namespace: "dev2",
ApplicationRevisionCompressibleFields: v1beta1.ApplicationRevisionCompressibleFields{
Application: v1beta1.Application{
ObjectMeta: metav1.ObjectMeta{
Name: "test-app1",
Namespace: "dev2",
},
},
},
},
@@ -683,7 +686,7 @@ func TestPrintApprev(t *testing.T) {
},
Succeeded: true,
},
}, exp: tableOut("test-apprev1", "", "true", "1111231adfdf", "2022-08-12 11:45:26", "Succeeded", "135 B")},
}, exp: tableOut("test-apprev1", "", "true", "1111231adfdf", "2022-08-12 11:45:26", "Succeeded")},
"Failed": {out: &bytes.Buffer{}, apprev: v1beta1.ApplicationRevision{
ObjectMeta: metav1.ObjectMeta{
Name: "test-apprev2",
@@ -698,7 +701,7 @@ func TestPrintApprev(t *testing.T) {
Terminated: true,
},
},
}, exp: tableOut("test-apprev2", "", "false", "", "2022-08-12 11:45:26", "Failed", "95 B"),
}, exp: tableOut("test-apprev2", "", "false", "", "2022-08-12 11:45:26", "Failed"),
},
"Executing or Failed": {out: &bytes.Buffer{}, apprev: v1beta1.ApplicationRevision{
ObjectMeta: metav1.ObjectMeta{
@@ -713,25 +716,46 @@ func TestPrintApprev(t *testing.T) {
},
},
},
}, exp: tableOut("test-apprev3", "", "false", "", "2022-08-12 11:45:26", "Executing or Failed", "95 B"),
}, exp: tableOut("test-apprev3", "", "false", "", "2022-08-12 11:45:26", "Executing or Failed"),
},
"Compressed": {out: &bytes.Buffer{}, apprev: v1beta1.ApplicationRevision{
ObjectMeta: metav1.ObjectMeta{
Name: "test-apprev3",
Namespace: "dev3",
},
Spec: v1beta1.ApplicationRevisionSpec{
Compression: v1beta1.ApplicationRevisionCompression{
CompressedText: compression.CompressedText{
Type: "zstd",
},
},
},
Status: v1beta1.ApplicationRevisionStatus{
Workflow: &common2.WorkflowStatus{
StartTime: metav1.Time{
Time: ti,
},
},
},
}, exp: tableOut("test-apprev3", "", "false", "", "2022-08-12 11:45:26", "Executing or Failed"),
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
printApprev(tc.out, tc.apprev)
//fmt.Println(tc.out.String())
diff := cmp.Diff(tc.exp, tc.out.String())
if diff != "" {
t.Fatalf(diff)
printApprevs(tc.out, []v1beta1.ApplicationRevision{tc.apprev})
assert.Contains(t, strings.ReplaceAll(tc.out.String(), " ", ""), strings.ReplaceAll(tc.out.String(), " ", ""))
if tc.apprev.Spec.Compression.Type != compression.Uncompressed {
assert.Contains(t, tc.out.String(), "Compressed")
}
})
}
}
func tableOut(name, pv, s, hash, bt, status, size string) string {
func tableOut(name, pv, s, hash, bt, status string) string {
table := newUITable().AddRow("NAME", "PUBLISH_VERSION", "SUCCEEDED", "HASH", "BEGIN_TIME", "STATUS", "SIZE")
table.AddRow(name, pv, s, hash, bt, status, size)
table.AddRow(name, pv, s, hash, bt, status)
return table.String()
}

View File

@@ -20,9 +20,8 @@ import (
"strings"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"gotest.tools/assert"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/oam-dev/kubevela/apis/core.oam.dev/common"
@@ -58,7 +57,7 @@ func TestFormatApplicationString(t *testing.T) {
assert.ErrorContains(t, err, "not supported", "invalid format provided, should error out")
str, err = formatApplicationString("yaml", app)
assert.NilError(t, err)
assert.NoError(t, err)
assert.Equal(t, true, strings.Contains(str, `apiVersion: core.oam.dev/v1beta1
kind: Application
metadata:
@@ -70,7 +69,7 @@ status: {}
`), "formatted yaml is not correct")
str, err = formatApplicationString("json", app)
assert.NilError(t, err)
assert.NoError(t, err)
assert.Equal(t, true, strings.Contains(str, `{
"kind": "Application",
"apiVersion": "core.oam.dev/v1beta1",
@@ -88,7 +87,7 @@ status: {}
assert.ErrorContains(t, err, "jsonpath template", "no jsonpath template provided, should not pass")
str, err = formatApplicationString("jsonpath={.apiVersion}", app)
assert.NilError(t, err)
assert.NoError(t, err)
assert.Equal(t, str, "core.oam.dev/v1beta1")
str, err = formatApplicationString("jsonpath={.spec.components[?(@.name==\"test-server\")].type}", &v1beta1.Application{
@@ -105,7 +104,7 @@ status: {}
},
},
})
assert.NilError(t, err)
assert.NoError(t, err)
assert.Equal(t, str, "webservice")
}
@@ -133,10 +132,12 @@ func TestConvertApplicationRevisionTo(t *testing.T) {
Namespace: "dev",
},
Spec: v1beta1.ApplicationRevisionSpec{
Application: v1beta1.Application{
ObjectMeta: v1.ObjectMeta{
Name: "test-app",
Namespace: "dev",
ApplicationRevisionCompressibleFields: v1beta1.ApplicationRevisionCompressibleFields{
Application: v1beta1.Application{
ObjectMeta: v1.ObjectMeta{
Name: "test-app",
Namespace: "dev",
},
},
},
},
@@ -153,11 +154,7 @@ spec:
name: test-app
namespace: dev
spec:
components: null
status: {}
status:
succeeded: false
`, err: ""}},
components: null`, err: ""}},
"json": {format: "json", apprev: &v1beta1.ApplicationRevision{
TypeMeta: v1.TypeMeta{
Kind: "ApplicationRevision",
@@ -168,10 +165,12 @@ status:
Namespace: "dev",
},
Spec: v1beta1.ApplicationRevisionSpec{
Application: v1beta1.Application{
ObjectMeta: v1.ObjectMeta{
Name: "test-app",
Namespace: "dev",
ApplicationRevisionCompressibleFields: v1beta1.ApplicationRevisionCompressibleFields{
Application: v1beta1.Application{
ObjectMeta: v1.ObjectMeta{
Name: "test-app",
Namespace: "dev",
},
},
},
},
@@ -191,15 +190,7 @@ status:
"creationTimestamp": null
},
"spec": {
"components": null
},
"status": {}
}
},
"status": {
"succeeded": false
}
}`, err: ""}},
"components": null`, err: ""}},
"jsonpath": {format: "jsonpath={.apiVersion}", apprev: &v1beta1.ApplicationRevision{
TypeMeta: v1.TypeMeta{
Kind: "ApplicationRevision",
@@ -210,10 +201,12 @@ status:
Namespace: "dev",
},
Spec: v1beta1.ApplicationRevisionSpec{
Application: v1beta1.Application{
ObjectMeta: v1.ObjectMeta{
Name: "test-app",
Namespace: "dev",
ApplicationRevisionCompressibleFields: v1beta1.ApplicationRevisionCompressibleFields{
Application: v1beta1.Application{
ObjectMeta: v1.ObjectMeta{
Name: "test-app",
Namespace: "dev",
},
},
},
},
@@ -228,16 +221,18 @@ status:
Namespace: "dev",
},
Spec: v1beta1.ApplicationRevisionSpec{
Application: v1beta1.Application{
ObjectMeta: v1.ObjectMeta{
Name: "test-app",
Namespace: "dev",
},
Spec: v1beta1.ApplicationSpec{
Components: []common.ApplicationComponent{
{
Name: "test-server",
Type: "webservice",
ApplicationRevisionCompressibleFields: v1beta1.ApplicationRevisionCompressibleFields{
Application: v1beta1.Application{
ObjectMeta: v1.ObjectMeta{
Name: "test-app",
Namespace: "dev",
},
Spec: v1beta1.ApplicationSpec{
Components: []common.ApplicationComponent{
{
Name: "test-server",
Type: "webservice",
},
},
},
},
@@ -254,10 +249,12 @@ status:
Namespace: "dev1",
},
Spec: v1beta1.ApplicationRevisionSpec{
Application: v1beta1.Application{
ObjectMeta: v1.ObjectMeta{
Name: "test-app",
Namespace: "dev1",
ApplicationRevisionCompressibleFields: v1beta1.ApplicationRevisionCompressibleFields{
Application: v1beta1.Application{
ObjectMeta: v1.ObjectMeta{
Name: "test-app",
Namespace: "dev1",
},
},
},
},
@@ -270,10 +267,7 @@ status:
if err != nil {
assert.Equal(t, tc.exp.err, err.Error())
}
diff := cmp.Diff(tc.exp.out, out)
if diff != "" {
t.Fatalf(diff)
}
assert.Contains(t, out, tc.exp.out)
})
}
}

View File

@@ -552,13 +552,15 @@ func TestWorkflowRollback(t *testing.T) {
Namespace: "test",
},
Spec: v1beta1.ApplicationRevisionSpec{
Application: v1beta1.Application{
Spec: v1beta1.ApplicationSpec{
Components: []common.ApplicationComponent{{
Name: "revision-component",
Type: "worker",
Properties: &runtime.RawExtension{Raw: []byte(`{"cmd":["sleep","1000"],"image":"busybox"}`)},
}},
ApplicationRevisionCompressibleFields: v1beta1.ApplicationRevisionCompressibleFields{
Application: v1beta1.Application{
Spec: v1beta1.ApplicationSpec{
Components: []common.ApplicationComponent{{
Name: "revision-component",
Type: "worker",
Properties: &runtime.RawExtension{Raw: []byte(`{"cmd":["sleep","1000"],"image":"busybox"}`)},
}},
},
},
},
},