diff --git a/pkg/apiserver/datastore/datastore.go b/pkg/apiserver/datastore/datastore.go index f1258d66a..320aeb7d0 100644 --- a/pkg/apiserver/datastore/datastore.go +++ b/pkg/apiserver/datastore/datastore.go @@ -73,6 +73,7 @@ type Entity interface { SetUpdateTime(time time.Time) PrimaryKey() string TableName() string + ShortTableName() string Index() map[string]string } @@ -141,7 +142,7 @@ type DataStore interface { // Get entity from database, Name() and TableName() can't return zero value. Get(ctx context.Context, entity Entity) error - // List entities from database, TableName() can't return zero value. + // List entities from database, TableName() can't return zero value, if no matches, it will return a zero list without error. List(ctx context.Context, query Entity, options *ListOptions) ([]Entity, error) // Count entities from database, TableName() can't return zero value. diff --git a/pkg/apiserver/datastore/kubeapi/kubeapi.go b/pkg/apiserver/datastore/kubeapi/kubeapi.go index 21418ec27..bd0a5bc5a 100644 --- a/pkg/apiserver/datastore/kubeapi/kubeapi.go +++ b/pkg/apiserver/datastore/kubeapi/kubeapi.go @@ -64,6 +64,7 @@ func New(ctx context.Context, cfg datastore.Config) (datastore.DataStore, error) return nil, fmt.Errorf("create namespace failure %w", err) } } + migrate(cfg.Database) return &kubeapi{ kubeclient: kubeClient, namespace: cfg.Database, @@ -71,7 +72,9 @@ func New(ctx context.Context, cfg datastore.Config) (datastore.DataStore, error) } func generateName(entity datastore.Entity) string { - name := fmt.Sprintf("veladatabase-%s-%s", entity.TableName(), entity.PrimaryKey()) + // record the old ways here, it'll be migrated + // name := fmt.Sprintf("veladatabase-%s-%s", entity.TableName(), entity.PrimaryKey()) + name := fmt.Sprintf("%s-%s", entity.ShortTableName(), entity.PrimaryKey()) return strings.ReplaceAll(name, "_", "-") } diff --git a/pkg/apiserver/datastore/kubeapi/migrate.go b/pkg/apiserver/datastore/kubeapi/migrate.go new file mode 100644 index 000000000..44760d2c5 --- /dev/null +++ b/pkg/apiserver/datastore/kubeapi/migrate.go @@ -0,0 +1,85 @@ +/* + Copyright 2022 The KubeVela Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package kubeapi + +import ( + "context" + "fmt" + "strings" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/klog" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/oam-dev/kubevela/pkg/apiserver/clients" + "github.com/oam-dev/kubevela/pkg/apiserver/model" +) + +// migrate will migrate the configmap to new short table name, it won't delete the configmaps: +// users can delete by the following commands: +// kubectl -n kubevela delete cm -l migrated=ok +func migrate(dbns string) { + kubeClient, err := clients.GetKubeClient() + if err != nil { + panic(err) + } + models := model.GetRegisterModels() + for _, k := range models { + var configMaps corev1.ConfigMapList + table := k.TableName() + selector, _ := labels.Parse(fmt.Sprintf("table=%s", table)) + if err = kubeClient.List(context.Background(), &configMaps, &client.ListOptions{Namespace: dbns, LabelSelector: selector}); err != nil { + err = client.IgnoreNotFound(err) + if err != nil { + klog.Errorf("migrate db for kubeapi storage err: %v", err) + continue + } + } + var migrated bool + for _, cm := range configMaps.Items { + if strings.HasPrefix(cm.Name, strings.ReplaceAll(k.ShortTableName()+"-", "_", "-")) { + migrated = true + break + } + } + if migrated || len(configMaps.Items) == 0 { + continue + } + klog.Infof("migrating data for table %v", k.TableName()) + for _, cm := range configMaps.Items { + cm := cm + checkprefix := strings.ReplaceAll(fmt.Sprintf("veladatabase-%s", k.TableName()), "_", "-") + if !strings.HasPrefix(cm.Name, checkprefix) { + continue + } + + cm.Labels["migrated"] = "ok" + err = kubeClient.Update(context.Background(), &cm) + if err != nil { + klog.Errorf("update migrated record %s for kubeapi storage err: %v", cm.Name, err) + } + cm.Name = strings.ReplaceAll(k.ShortTableName()+strings.TrimPrefix(cm.Name, checkprefix), "_", "-") + cm.ResourceVersion = "" + delete(cm.Labels, "migrated") + err = kubeClient.Create(context.Background(), &cm) + if err != nil { + klog.Errorf("migrate record %s for kubeapi storage err: %v", cm.Name, err) + } + } + } +} diff --git a/pkg/apiserver/model/application.go b/pkg/apiserver/model/application.go index 481ada99b..340513e6a 100644 --- a/pkg/apiserver/model/application.go +++ b/pkg/apiserver/model/application.go @@ -18,13 +18,14 @@ package model import ( "fmt" + "strings" "time" "github.com/oam-dev/kubevela/apis/core.oam.dev/common" ) func init() { - RegistModel(&ApplicationComponent{}, &ApplicationPolicy{}, &Application{}, &ApplicationRevision{}, &ApplicationTrigger{}) + RegisterModel(&ApplicationComponent{}, &ApplicationPolicy{}, &Application{}, &ApplicationRevision{}, &ApplicationTrigger{}) } // Application application delivery model @@ -43,7 +44,15 @@ func (a *Application) TableName() string { return tableNamePrefix + "application" } +// ShortTableName is the compressed version of table name for kubeapi storage and others +func (a *Application) ShortTableName() string { + return "app" +} + // PrimaryKey return custom primary key +// the app primary key is the app name, so the app name is globally unique in every namespace +// when the app is synced from CR, the first synced one be same with app name, +// if there's any conflicts, the name will be composed by - func (a *Application) PrimaryKey() string { return a.Name } @@ -60,6 +69,18 @@ func (a *Application) Index() map[string]string { return index } +// GetAppNameForSynced will trim namespace suffix for synced CR +func (a *Application) GetAppNameForSynced() string { + if a.Labels == nil { + return a.Name + } + namespace := a.Labels[LabelSyncNamespace] + if namespace == "" { + return a.Name + } + return strings.TrimSuffix(a.Name, "-"+namespace) +} + // ClusterSelector cluster selector type ClusterSelector struct { Name string `json:"name"` @@ -102,6 +123,11 @@ func (a *ApplicationComponent) TableName() string { return tableNamePrefix + "application_component" } +// ShortTableName is the compressed version of table name for kubeapi storage and others +func (a *ApplicationComponent) ShortTableName() string { + return "app_cmp" +} + // PrimaryKey return custom primary key func (a *ApplicationComponent) PrimaryKey() string { return fmt.Sprintf("%s-%s", a.AppPrimaryKey, a.Name) @@ -138,6 +164,11 @@ func (a *ApplicationPolicy) TableName() string { return tableNamePrefix + "application_policy" } +// ShortTableName is the compressed version of table name for kubeapi storage and others +func (a *ApplicationPolicy) ShortTableName() string { + return "app_plc" +} + // PrimaryKey return custom primary key func (a *ApplicationPolicy) PrimaryKey() string { return fmt.Sprintf("%s-%s", a.AppPrimaryKey, a.Name) @@ -270,6 +301,11 @@ func (a *ApplicationRevision) TableName() string { return tableNamePrefix + "application_revision" } +// ShortTableName is the compressed version of table name for kubeapi storage and others +func (a *ApplicationRevision) ShortTableName() string { + return "app_rev" +} + // PrimaryKey return custom primary key func (a *ApplicationRevision) PrimaryKey() string { return fmt.Sprintf("%s-%s", a.AppPrimaryKey, a.Version) @@ -349,6 +385,11 @@ func (w *ApplicationTrigger) TableName() string { return tableNamePrefix + "trigger" } +// ShortTableName is the compressed version of table name for kubeapi storage and others +func (w *ApplicationTrigger) ShortTableName() string { + return "app_tg" +} + // PrimaryKey return custom primary key func (w *ApplicationTrigger) PrimaryKey() string { return w.Token diff --git a/pkg/apiserver/model/cluster.go b/pkg/apiserver/model/cluster.go index 0ae7eadb1..9dd797695 100644 --- a/pkg/apiserver/model/cluster.go +++ b/pkg/apiserver/model/cluster.go @@ -23,7 +23,7 @@ import ( ) func init() { - RegistModel(&Cluster{}) + RegisterModel(&Cluster{}) } // ProviderInfo describes the information from provider API @@ -82,6 +82,11 @@ func (c *Cluster) TableName() string { return tableNamePrefix + "cluster" } +// ShortTableName is the compressed version of table name for kubeapi storage and others +func (c *Cluster) ShortTableName() string { + return "cls" +} + // PrimaryKey primary key for datastore func (c *Cluster) PrimaryKey() string { return c.Name diff --git a/pkg/apiserver/model/env.go b/pkg/apiserver/model/env.go index 787da02bb..ad3f159a8 100644 --- a/pkg/apiserver/model/env.go +++ b/pkg/apiserver/model/env.go @@ -17,7 +17,7 @@ limitations under the License. package model func init() { - RegistModel(&Env{}) + RegisterModel(&Env{}) } // Env models the data of env in database @@ -42,6 +42,11 @@ func (p *Env) TableName() string { return tableNamePrefix + "env" } +// ShortTableName is the compressed version of table name for kubeapi storage and others +func (p *Env) ShortTableName() string { + return "ev" +} + // PrimaryKey return custom primary key func (p *Env) PrimaryKey() string { return p.Name diff --git a/pkg/apiserver/model/envbinding.go b/pkg/apiserver/model/envbinding.go index defee4da9..9f6a3109d 100644 --- a/pkg/apiserver/model/envbinding.go +++ b/pkg/apiserver/model/envbinding.go @@ -19,13 +19,14 @@ package model import "fmt" func init() { - RegistModel(&EnvBinding{}) + RegisterModel(&EnvBinding{}) } // EnvBinding application env binding type EnvBinding struct { BaseModel AppPrimaryKey string `json:"appPrimaryKey"` + AppDeployName string `json:"appDeployName"` Name string `json:"name"` ComponentsPatch []ComponentPatch `json:"componentsPatchs"` } @@ -50,6 +51,11 @@ func (e *EnvBinding) TableName() string { return tableNamePrefix + "envbinding" } +// ShortTableName is the compressed version of table name for kubeapi storage and others +func (e *EnvBinding) ShortTableName() string { + return "evb" +} + // PrimaryKey return custom primary key func (e *EnvBinding) PrimaryKey() string { return fmt.Sprintf("%s-%s", e.AppPrimaryKey, e.Name) diff --git a/pkg/apiserver/model/model.go b/pkg/apiserver/model/model.go index feafcb7fb..57fe45c50 100644 --- a/pkg/apiserver/model/model.go +++ b/pkg/apiserver/model/model.go @@ -30,28 +30,37 @@ import ( var tableNamePrefix = "vela_" -var registedModels = map[string]Interface{} +var registeredModels = map[string]Interface{} // Interface model interface type Interface interface { TableName() string + ShortTableName() string } -// RegistModel regist model -func RegistModel(models ...Interface) { +// RegisterModel register model +func RegisterModel(models ...Interface) { for _, model := range models { - if _, exist := registedModels[model.TableName()]; exist { + if _, exist := registeredModels[model.TableName()]; exist { panic(fmt.Errorf("model table name %s conflict", model.TableName())) } - registedModels[model.TableName()] = model + registeredModels[model.TableName()] = model } } +// GetRegisterModels will return the register models +func GetRegisterModels() map[string]Interface { + return registeredModels +} + // JSONStruct json struct, same with runtime.RawExtension type JSONStruct map[string]interface{} -// NewJSONStruct new jsonstruct from runtime.RawExtension +// NewJSONStruct new json struct from runtime.RawExtension func NewJSONStruct(raw *runtime.RawExtension) (*JSONStruct, error) { + if raw == nil || raw.Raw == nil { + return nil, nil + } var data JSONStruct err := json.Unmarshal(raw.Raw, &data) if err != nil { @@ -60,7 +69,7 @@ func NewJSONStruct(raw *runtime.RawExtension) (*JSONStruct, error) { return &data, nil } -// NewJSONStructByString new jsonstruct from string +// NewJSONStructByString new json struct from string func NewJSONStructByString(source string) (*JSONStruct, error) { if source == "" { return nil, nil @@ -73,7 +82,7 @@ func NewJSONStructByString(source string) (*JSONStruct, error) { return &data, nil } -// NewJSONStructByStruct new jsonstruct from strcut object +// NewJSONStructByStruct new json struct from struct object func NewJSONStructByStruct(object interface{}) (*JSONStruct, error) { if object == nil { return nil, nil diff --git a/pkg/apiserver/model/project.go b/pkg/apiserver/model/project.go index 1b9580b32..a9f94bba0 100644 --- a/pkg/apiserver/model/project.go +++ b/pkg/apiserver/model/project.go @@ -17,10 +17,10 @@ limitations under the License. package model func init() { - RegistModel(&Project{}) + RegisterModel(&Project{}) } -// Project project model +// Project basic model type Project struct { BaseModel Name string `json:"name"` @@ -33,6 +33,11 @@ func (p *Project) TableName() string { return tableNamePrefix + "project" } +// ShortTableName is the compressed version of table name for kubeapi storage and others +func (p *Project) ShortTableName() string { + return "pj" +} + // PrimaryKey return custom primary key func (p *Project) PrimaryKey() string { return p.Name diff --git a/pkg/apiserver/model/system_info.go b/pkg/apiserver/model/system_info.go index 639af53f9..36880fa77 100644 --- a/pkg/apiserver/model/system_info.go +++ b/pkg/apiserver/model/system_info.go @@ -17,7 +17,7 @@ limitations under the License. package model func init() { - RegistModel(&SystemInfo{}) + RegisterModel(&SystemInfo{}) } // LoginType is the type of login @@ -43,6 +43,11 @@ func (u *SystemInfo) TableName() string { return tableNamePrefix + "system_info" } +// ShortTableName is the compressed version of table name for kubeapi storage and others +func (u *SystemInfo) ShortTableName() string { + return "sysi" +} + // PrimaryKey return custom primary key func (u *SystemInfo) PrimaryKey() string { return u.InstallID diff --git a/pkg/apiserver/model/target.go b/pkg/apiserver/model/target.go index 35f60218b..e78fa0c86 100644 --- a/pkg/apiserver/model/target.go +++ b/pkg/apiserver/model/target.go @@ -17,7 +17,7 @@ limitations under the License. package model func init() { - RegistModel(&Target{}) + RegisterModel(&Target{}) } // Target defines the delivery target information for the application @@ -36,6 +36,11 @@ func (d *Target) TableName() string { return tableNamePrefix + "target" } +// ShortTableName is the compressed version of table name for kubeapi storage and others +func (d *Target) ShortTableName() string { + return "tg" +} + // PrimaryKey return custom primary key func (d *Target) PrimaryKey() string { return d.Name diff --git a/pkg/apiserver/model/user.go b/pkg/apiserver/model/user.go index 6b0fa99e5..6de905002 100644 --- a/pkg/apiserver/model/user.go +++ b/pkg/apiserver/model/user.go @@ -19,7 +19,7 @@ package model import "strings" func init() { - RegistModel(&User{}) + RegisterModel(&User{}) } // User is the model of user @@ -37,6 +37,11 @@ func (u *User) TableName() string { return tableNamePrefix + "user" } +// ShortTableName return custom table name +func (u *User) ShortTableName() string { + return "usr" +} + // PrimaryKey return custom primary key func (u *User) PrimaryKey() string { return verifyUserValue(u.Name) diff --git a/pkg/apiserver/model/whole.go b/pkg/apiserver/model/whole.go new file mode 100644 index 000000000..03044673f --- /dev/null +++ b/pkg/apiserver/model/whole.go @@ -0,0 +1,73 @@ +/* + Copyright 2022 The KubeVela Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package model + +const ( + // AutoGenDesc describes the metadata in datastore that's automatically generated + AutoGenDesc = "Automatically converted from KubeVela Application in Kubernetes." + + // AutoGenProj describes the automatically created project + AutoGenProj = "Automatically generated by sync mechanism." + + // AutoGenEnvNamePrefix describes the common prefix for auto-generated env + AutoGenEnvNamePrefix = "synced-" + // AutoGenComp describes the creator of component that is auto-generated + AutoGenComp = "synced-comp" + // AutoGenPolicy describes the creator of policy that is auto-generated + AutoGenPolicy = "synced-policy" + // AutoGenRefPolicy describes the creator of policy that is auto-generated, this differs from AutoGenPolicy as the policy is referenced ones + AutoGenRefPolicy = "synced-ref-policy" + // AutoGenWorkflowNamePrefix describes the common prefix for auto-generated workflow + AutoGenWorkflowNamePrefix = "synced-" + // AutoGenTargetNamePrefix describes the common prefix for auto-generated target + AutoGenTargetNamePrefix = "synced-" + + // LabelSyncGeneration describes the generation synced from + LabelSyncGeneration = "ux.oam.dev/synced-generation" + // LabelSyncNamespace describes the namespace synced from + LabelSyncNamespace = "ux.oam.dev/from-namespace" +) + +// DataStoreApp is a memory struct that describes the model of an application in datastore +type DataStoreApp struct { + AppMeta *Application + Env *Env + Eb *EnvBinding + Comps []*ApplicationComponent + Policies []*ApplicationPolicy + Workflow *Workflow + Targets []*Target +} + +const ( + + // DefaultInitName is default object name for initialization + DefaultInitName = "default" + + // DefaultAddonProject is default addon projects + DefaultAddonProject = "addons" + + // DefaultInitNamespace is default namespace name for initialization + DefaultInitNamespace = "default" + + // DefaultTargetDescription describes default target created + DefaultTargetDescription = "Default target is created by velaux system automatically." + // DefaultEnvDescription describes default env created + DefaultEnvDescription = "Default environment is created by velaux system automatically." + // DefaultProjectDescription describes the default project created + DefaultProjectDescription = "Default project is created by velaux system automatically." +) diff --git a/pkg/apiserver/model/workflow.go b/pkg/apiserver/model/workflow.go index 24f0f79a6..204923109 100644 --- a/pkg/apiserver/model/workflow.go +++ b/pkg/apiserver/model/workflow.go @@ -25,8 +25,8 @@ import ( ) func init() { - RegistModel(&Workflow{}) - RegistModel(&WorkflowRecord{}) + RegisterModel(&Workflow{}) + RegisterModel(&WorkflowRecord{}) } // Workflow application delivery database model @@ -61,6 +61,11 @@ func (w *Workflow) TableName() string { return tableNamePrefix + "workflow" } +// ShortTableName is the compressed version of table name for kubeapi storage and others +func (w *Workflow) ShortTableName() string { + return "wf" +} + // PrimaryKey return custom primary key func (w *Workflow) PrimaryKey() string { return fmt.Sprintf("%s-%s", w.AppPrimaryKey, w.Name) @@ -118,6 +123,11 @@ func (w *WorkflowRecord) TableName() string { return tableNamePrefix + "workflow_record" } +// ShortTableName is the compressed version of table name for kubeapi storage and others +func (w *WorkflowRecord) ShortTableName() string { + return "wfr" +} + // PrimaryKey return custom primary key func (w *WorkflowRecord) PrimaryKey() string { return w.Name diff --git a/pkg/apiserver/rest/rest_server.go b/pkg/apiserver/rest/rest_server.go index c6e0ac36f..86c6016b3 100644 --- a/pkg/apiserver/rest/rest_server.go +++ b/pkg/apiserver/rest/rest_server.go @@ -39,6 +39,7 @@ import ( "github.com/oam-dev/kubevela/pkg/apiserver/rest/usecase" "github.com/oam-dev/kubevela/pkg/apiserver/rest/utils" "github.com/oam-dev/kubevela/pkg/apiserver/rest/webservice" + velasync "github.com/oam-dev/kubevela/pkg/apiserver/sync" utils2 "github.com/oam-dev/kubevela/pkg/utils" ) @@ -116,7 +117,6 @@ func (s *restServer) Run(ctx context.Context) error { go func() { leaderelection.RunOrDie(ctx, *l) }() - return s.startHTTP(ctx) } @@ -138,10 +138,13 @@ func (s *restServer) setupLeaderElection() (*leaderelection.LeaderElectionConfig RetryPeriod: time.Second * 2, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { - s.runLeader(ctx, s.cfg.LeaderConfig.Duration) + go velasync.Start(ctx, s.dataStore, restCfg) + s.runWorkflowRecordSync(ctx, s.cfg.LeaderConfig.Duration) }, OnStoppedLeading: func() { klog.Infof("leader lost: %s", s.cfg.LeaderConfig.ID) + // Currently, the started goroutine will all closed by the context, so there seems no need to call os.Exit here. + // But it can be safe to stop the process as leader lost. os.Exit(0) }, OnNewLeader: func(identity string) { @@ -155,7 +158,8 @@ func (s *restServer) setupLeaderElection() (*leaderelection.LeaderElectionConfig }, nil } -func (s restServer) runLeader(ctx context.Context, duration time.Duration) { +func (s restServer) runWorkflowRecordSync(ctx context.Context, duration time.Duration) { + klog.Infof("start to syncing workflow record") w := usecase.NewWorkflowUsecase(s.dataStore, usecase.NewEnvUsecase(s.dataStore)) t := time.NewTicker(duration) diff --git a/pkg/apiserver/rest/usecase/addon.go b/pkg/apiserver/rest/usecase/addon.go index a7be5d52c..34805bd39 100644 --- a/pkg/apiserver/rest/usecase/addon.go +++ b/pkg/apiserver/rest/usecase/addon.go @@ -26,14 +26,12 @@ import ( "sync" "time" - "k8s.io/client-go/discovery" - - k8stypes "k8s.io/apimachinery/pkg/types" - v1 "k8s.io/api/core/v1" errors2 "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" k8syaml "k8s.io/apimachinery/pkg/runtime/serializer/yaml" + k8stypes "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/discovery" "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/client" diff --git a/pkg/apiserver/rest/usecase/application.go b/pkg/apiserver/rest/usecase/application.go index 9bbabae65..a14105464 100644 --- a/pkg/apiserver/rest/usecase/application.go +++ b/pkg/apiserver/rest/usecase/application.go @@ -26,11 +26,10 @@ import ( "strings" "time" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/types" @@ -49,6 +48,7 @@ import ( apisv1 "github.com/oam-dev/kubevela/pkg/apiserver/rest/apis/v1" "github.com/oam-dev/kubevela/pkg/apiserver/rest/utils" "github.com/oam-dev/kubevela/pkg/apiserver/rest/utils/bcode" + "github.com/oam-dev/kubevela/pkg/apiserver/sync" "github.com/oam-dev/kubevela/pkg/oam" "github.com/oam-dev/kubevela/pkg/oam/discoverymapper" utils2 "github.com/oam-dev/kubevela/pkg/utils" @@ -279,7 +279,7 @@ func (c *applicationUsecaseImpl) GetApplicationStatus(ctx context.Context, appmo if err != nil { return nil, err } - err = c.kubeClient.Get(ctx, types.NamespacedName{Namespace: env.Namespace, Name: appmodel.Name}, &app) + err = c.kubeClient.Get(ctx, types.NamespacedName{Namespace: env.Namespace, Name: appmodel.GetAppNameForSynced()}, &app) if err != nil { if apierrors.IsNotFound(err) { return nil, nil @@ -294,9 +294,10 @@ func (c *applicationUsecaseImpl) GetApplicationStatus(ctx context.Context, appmo // GetApplicationCR get application CR in cluster func (c *applicationUsecaseImpl) GetApplicationCR(ctx context.Context, appModel *model.Application) (*v1beta1.ApplicationList, error) { + var apps v1beta1.ApplicationList selector := labels.NewSelector() - re, err := labels.NewRequirement(oam.AnnotationAppName, selection.Equals, []string{appModel.Name}) + re, err := labels.NewRequirement(oam.AnnotationAppName, selection.Equals, []string{appModel.GetAppNameForSynced()}) if err != nil { return nil, err } @@ -998,7 +999,7 @@ func (c *applicationUsecaseImpl) GetApplicationComponent(ctx context.Context, ap err := c.ds.Get(ctx, &component) if err != nil { if errors.Is(err, datastore.ErrRecordNotExist) { - return nil, bcode.ErrApplicationComponetNotExist + return nil, bcode.ErrApplicationComponentNotExist } return nil, err } @@ -1083,7 +1084,7 @@ func (c *applicationUsecaseImpl) createComponent(ctx context.Context, app *model if err := c.ds.Add(ctx, &componentModel); err != nil { if errors.Is(err, datastore.ErrRecordExist) { - return nil, bcode.ErrApplicationComponetExist + return nil, bcode.ErrApplicationComponentExist } log.Logger.Warnf("add component for app %s failure %s", utils2.Sanitize(app.PrimaryKey()), err.Error()) return nil, err @@ -1149,11 +1150,11 @@ func convertComponentModelToBase(componentModel *model.ApplicationComponent) *ap func (c *applicationUsecaseImpl) DeleteComponent(ctx context.Context, app *model.Application, component *model.ApplicationComponent) error { if component.Main { - return bcode.ErrApplicationComponetNotAllowDelete + return bcode.ErrApplicationComponentNotAllowDelete } if err := c.ds.Delete(ctx, component); err != nil { if errors.Is(err, datastore.ErrRecordNotExist) { - return bcode.ErrApplicationComponetNotExist + return bcode.ErrApplicationComponentNotExist } log.Logger.Warnf("delete app component %s failure %s", app.PrimaryKey(), err.Error()) return err @@ -1528,34 +1529,6 @@ func genWebhookToken() string { return string(b) } -func convertToModelComponent(appPrimaryKey string, component common.ApplicationComponent) (model.ApplicationComponent, error) { - bc := model.ApplicationComponent{ - AppPrimaryKey: appPrimaryKey, - Name: component.Name, - Type: component.Type, - ExternalRevision: component.ExternalRevision, - DependsOn: component.DependsOn, - Inputs: component.Inputs, - Outputs: component.Outputs, - Scopes: component.Scopes, - } - if component.Properties != nil { - properties, err := model.NewJSONStruct(component.Properties) - if err != nil { - return bc, err - } - bc.Properties = properties - } - for _, trait := range component.Traits { - properties, err := model.NewJSONStruct(trait.Properties) - if err != nil { - return bc, err - } - bc.Traits = append(bc.Traits, model.ApplicationTrait{CreateTime: time.Now(), UpdateTime: time.Now(), Properties: properties, Type: trait.Type, Alias: trait.Type, Description: "auto gen"}) - } - return bc, nil -} - func (c *applicationUsecaseImpl) getAppFromLatestRevision(ctx context.Context, appName string, envName string, version string) (*v1beta1.Application, error) { ar := &model.ApplicationRevision{AppPrimaryKey: appName} @@ -1590,7 +1563,7 @@ func (c *applicationUsecaseImpl) resetApp(ctx context.Context, targetApp *v1beta originComps, err := c.ds.List(ctx, &model.ApplicationComponent{AppPrimaryKey: appPrimaryKey}, &datastore.ListOptions{}) if err != nil { - return nil, bcode.ErrApplicationComponetNotExist + return nil, bcode.ErrApplicationComponentNotExist } var originCompNames []string @@ -1605,7 +1578,7 @@ func (c *applicationUsecaseImpl) resetApp(ctx context.Context, targetApp *v1beta targetCompNames = append(targetCompNames, comp.Name) } - readyToUpdate, readyToDelete, readyToAdd := compareSlices(originCompNames, targetCompNames) + readyToUpdate, readyToDelete, readyToAdd := utils2.ThreeWaySliceCompare(originCompNames, targetCompNames) // delete new app's components for _, compName := range readyToDelete { @@ -1624,7 +1597,7 @@ func (c *applicationUsecaseImpl) resetApp(ctx context.Context, targetApp *v1beta for _, comp := range targetComps { // add or update new app's components from old app if utils.StringsContain(readyToAdd, comp.Name) || utils.StringsContain(readyToUpdate, comp.Name) { - compModel, err := convertToModelComponent(appPrimaryKey, comp) + compModel, err := sync.ConvertFromCRComponent(appPrimaryKey, comp) if err != nil { return &apisv1.AppResetResponse{}, bcode.ErrInvalidProperties } diff --git a/pkg/apiserver/rest/usecase/envbinding.go b/pkg/apiserver/rest/usecase/envbinding.go index 43fb20524..949069334 100644 --- a/pkg/apiserver/rest/usecase/envbinding.go +++ b/pkg/apiserver/rest/usecase/envbinding.go @@ -114,7 +114,7 @@ func listFullEnvBinding(ctx context.Context, ds datastore.DataStore, option envL log.Logger.Errorf("envbinding invalid %s", err.Error()) continue } - list = append(list, convertEnvbindingModelToBase(eb, env, targets)) + list = append(list, convertEnvBindingModelToBase(eb, env, targets)) } return list, nil } @@ -315,7 +315,7 @@ func (e *envBindingUsecaseImpl) DetailEnvBinding(ctx context.Context, app *model return nil, err } return &apisv1.DetailEnvBindingResponse{ - EnvBindingBase: *convertEnvbindingModelToBase(envBinding, env, targets), + EnvBindingBase: *convertEnvBindingModelToBase(envBinding, env, targets), }, nil } @@ -346,11 +346,12 @@ func convertCreateReqToEnvBindingModel(app *model.Application, req apisv1.Create envBinding := model.EnvBinding{ AppPrimaryKey: app.Name, Name: req.Name, + AppDeployName: app.GetAppNameForSynced(), } return envBinding } -func convertEnvbindingModelToBase(envBinding *model.EnvBinding, env *model.Env, targets []*model.Target) *apisv1.EnvBindingBase { +func convertEnvBindingModelToBase(envBinding *model.EnvBinding, env *model.Env, targets []*model.Target) *apisv1.EnvBindingBase { var dtMap = make(map[string]*model.Target, len(targets)) for _, dte := range targets { dtMap[dte.Name] = dte @@ -379,7 +380,7 @@ func convertEnvbindingModelToBase(envBinding *model.EnvBinding, env *model.Env, Targets: envBindingTargets, CreateTime: envBinding.CreateTime, UpdateTime: envBinding.UpdateTime, - AppDeployName: envBinding.AppPrimaryKey, + AppDeployName: envBinding.AppDeployName, AppDeployNamespace: env.Namespace, } return ebb @@ -389,6 +390,7 @@ func convertToEnvBindingModel(app *model.Application, envBind apisv1.EnvBinding) re := model.EnvBinding{ AppPrimaryKey: app.Name, Name: envBind.Name, + AppDeployName: app.GetAppNameForSynced(), } return &re } @@ -397,31 +399,6 @@ func convertWorkflowName(envName string) string { return fmt.Sprintf("workflow-%s", envName) } -func compareSlices(a []string, b []string) ([]string, []string, []string) { - m := make(map[string]uint8) - for _, k := range a { - m[k] |= 1 << 0 - } - for _, k := range b { - m[k] |= 1 << 1 - } - - var inAAndB, inAButNotB, inBButNotA []string - for k, v := range m { - a := v&(1<<0) != 0 - b := v&(1<<1) != 0 - switch { - case a && b: - inAAndB = append(inAAndB, k) - case a && !b: - inAButNotB = append(inAButNotB, k) - case !a && b: - inBButNotA = append(inBButNotA, k) - } - } - return inAAndB, inAButNotB, inBButNotA -} - func isEnvStepType(stepType string) bool { return stepType == Deploy2Env || stepType == DeployCloudResource } diff --git a/pkg/apiserver/rest/usecase/project.go b/pkg/apiserver/rest/usecase/project.go index f033e4fcc..be8390db7 100644 --- a/pkg/apiserver/rest/usecase/project.go +++ b/pkg/apiserver/rest/usecase/project.go @@ -31,21 +31,6 @@ import ( "github.com/oam-dev/kubevela/pkg/multicluster" ) -const ( - - // DefaultInitName is default object name for initialization - DefaultInitName = "default" - // DefaultInitNamespace is default namespace name for initialization - DefaultInitNamespace = "default" - - // DefaultTargetDescription describes default target created - DefaultTargetDescription = "Default target is created by velaux system automatically." - // DefaultEnvDescription describes default env created - DefaultEnvDescription = "Default environment is created by velaux system automatically." - // DefaultProjectDescription describes the default project created - DefaultProjectDescription = "Default project is created by velaux system automatically." -) - // ProjectUsecase project manage usecase. type ProjectUsecase interface { GetProject(ctx context.Context, projectName string) (*model.Project, error) @@ -65,7 +50,7 @@ func NewProjectUsecase(ds datastore.DataStore) ProjectUsecase { log.Logger.Fatalf("get k8sClient failure: %s", err.Error()) } p := &projectUsecaseImpl{ds: ds, k8sClient: k8sClient} - p.initDefaultProjectEnvTarget(DefaultInitNamespace) + p.initDefaultProjectEnvTarget(model.DefaultInitNamespace) return p } @@ -84,15 +69,15 @@ func (p *projectUsecaseImpl) initDefaultProjectEnvTarget(defaultNamespace string } log.Logger.Info("no default project found, adding a default project with default env and target") - if err := createTargetNamespace(ctx, p.k8sClient, multicluster.ClusterLocalName, defaultNamespace, DefaultInitName); err != nil { + if err := createTargetNamespace(ctx, p.k8sClient, multicluster.ClusterLocalName, defaultNamespace, model.DefaultInitName); err != nil { log.Logger.Errorf("initialize default target namespace failed %v", err) return } // initialize default target first err = createTarget(ctx, p.ds, &model.Target{ - Name: DefaultInitName, + Name: model.DefaultInitName, Alias: "Default", - Description: DefaultTargetDescription, + Description: model.DefaultTargetDescription, Cluster: &model.ClusterTarget{ ClusterName: multicluster.ClusterLocalName, Namespace: defaultNamespace, @@ -106,12 +91,12 @@ func (p *projectUsecaseImpl) initDefaultProjectEnvTarget(defaultNamespace string // initialize default target first err = createEnv(ctx, p.k8sClient, p.ds, &model.Env{ - Name: DefaultInitName, + Name: model.DefaultInitName, Alias: "Default", - Description: DefaultEnvDescription, - Project: DefaultInitName, + Description: model.DefaultEnvDescription, + Project: model.DefaultInitName, Namespace: defaultNamespace, - Targets: []string{DefaultInitName}, + Targets: []string{model.DefaultInitName}, }) // for idempotence, ignore default env already exist error if err != nil && errors.Is(err, bcode.ErrEnvAlreadyExists) { @@ -120,14 +105,15 @@ func (p *projectUsecaseImpl) initDefaultProjectEnvTarget(defaultNamespace string } _, err = p.CreateProject(ctx, apisv1.CreateProjectRequest{ - Name: DefaultInitName, + Name: model.DefaultInitName, Alias: "Default", - Description: DefaultProjectDescription, + Description: model.DefaultProjectDescription, }) if err != nil { log.Logger.Errorf("initialize project failed %v", err) return } + } // GetProject get project diff --git a/pkg/apiserver/rest/usecase/project_test.go b/pkg/apiserver/rest/usecase/project_test.go index 1e75ab891..c5a39f24b 100644 --- a/pkg/apiserver/rest/usecase/project_test.go +++ b/pkg/apiserver/rest/usecase/project_test.go @@ -81,43 +81,43 @@ var _ = Describe("Test project usecase functions", func() { return k8sClient.Get(context.TODO(), types.NamespacedName{Name: defaultNamespace}, &namespace) }, time.Second*3, time.Microsecond*300).Should(BeNil()) - Expect(cmp.Diff(namespace.Labels[oam.LabelNamespaceOfEnvName], DefaultInitName)).Should(BeEmpty()) - Expect(cmp.Diff(namespace.Labels[oam.LabelNamespaceOfTargetName], DefaultInitName)).Should(BeEmpty()) + Expect(cmp.Diff(namespace.Labels[oam.LabelNamespaceOfEnvName], model.DefaultInitName)).Should(BeEmpty()) + Expect(cmp.Diff(namespace.Labels[oam.LabelNamespaceOfTargetName], model.DefaultInitName)).Should(BeEmpty()) Expect(cmp.Diff(namespace.Labels[oam.LabelControlPlaneNamespaceUsage], oam.VelaNamespaceUsageEnv)).Should(BeEmpty()) Expect(cmp.Diff(namespace.Labels[oam.LabelRuntimeNamespaceUsage], oam.VelaNamespaceUsageTarget)).Should(BeEmpty()) By("check project created") - dp, err := projectUsecase.GetProject(context.TODO(), DefaultInitName) + dp, err := projectUsecase.GetProject(context.TODO(), model.DefaultInitName) Expect(err).Should(BeNil()) Expect(dp.Alias).Should(BeEquivalentTo("Default")) - Expect(dp.Description).Should(BeEquivalentTo(DefaultProjectDescription)) + Expect(dp.Description).Should(BeEquivalentTo(model.DefaultProjectDescription)) By("check env created") - env, err := envImpl.GetEnv(context.TODO(), DefaultInitName) + env, err := envImpl.GetEnv(context.TODO(), model.DefaultInitName) Expect(err).Should(BeNil()) Expect(env.Alias).Should(BeEquivalentTo("Default")) - Expect(env.Description).Should(BeEquivalentTo(DefaultEnvDescription)) - Expect(env.Project).Should(BeEquivalentTo(DefaultInitName)) - Expect(env.Targets).Should(BeEquivalentTo([]string{DefaultInitName})) + Expect(env.Description).Should(BeEquivalentTo(model.DefaultEnvDescription)) + Expect(env.Project).Should(BeEquivalentTo(model.DefaultInitName)) + Expect(env.Targets).Should(BeEquivalentTo([]string{model.DefaultInitName})) Expect(env.Namespace).Should(BeEquivalentTo(defaultNamespace)) By("check target created") - tg, err := targetImpl.GetTarget(context.TODO(), DefaultInitName) + tg, err := targetImpl.GetTarget(context.TODO(), model.DefaultInitName) Expect(err).Should(BeNil()) Expect(tg.Alias).Should(BeEquivalentTo("Default")) - Expect(tg.Description).Should(BeEquivalentTo(DefaultTargetDescription)) + Expect(tg.Description).Should(BeEquivalentTo(model.DefaultTargetDescription)) Expect(tg.Cluster).Should(BeEquivalentTo(&model.ClusterTarget{ ClusterName: multicluster.ClusterLocalName, Namespace: defaultNamespace, })) - Expect(env.Targets).Should(BeEquivalentTo([]string{DefaultInitName})) + Expect(env.Targets).Should(BeEquivalentTo([]string{model.DefaultInitName})) Expect(env.Namespace).Should(BeEquivalentTo(defaultNamespace)) - err = targetImpl.DeleteTarget(context.TODO(), DefaultInitName) + err = targetImpl.DeleteTarget(context.TODO(), model.DefaultInitName) Expect(err).Should(BeNil()) - err = envImpl.DeleteEnv(context.TODO(), DefaultInitName) + err = envImpl.DeleteEnv(context.TODO(), model.DefaultInitName) Expect(err).Should(BeNil()) }) diff --git a/pkg/apiserver/rest/usecase/webhook.go b/pkg/apiserver/rest/usecase/webhook.go index 324bbcdf2..e7f9189e7 100644 --- a/pkg/apiserver/rest/usecase/webhook.go +++ b/pkg/apiserver/rest/usecase/webhook.go @@ -197,7 +197,7 @@ func (c *customHandlerImpl) handle(ctx context.Context, webhookTrigger *model.Ap } if err := c.w.ds.Get(ctx, component); err != nil { if errors.Is(err, datastore.ErrRecordNotExist) { - return nil, bcode.ErrApplicationComponetNotExist + return nil, bcode.ErrApplicationComponentNotExist } return nil, err } @@ -227,7 +227,7 @@ func (c *acrHandlerImpl) handle(ctx context.Context, webhookTrigger *model.Appli return nil, err } if len(comps) == 0 { - return nil, bcode.ErrApplicationComponetNotExist + return nil, bcode.ErrApplicationComponentNotExist } // use the first component as the target component @@ -287,7 +287,7 @@ func (c dockerHubHandlerImpl) handle(ctx context.Context, trigger *model.Applica return nil, err } if len(comps) == 0 { - return nil, bcode.ErrApplicationComponetNotExist + return nil, bcode.ErrApplicationComponentNotExist } // use the first component as the target component @@ -395,7 +395,7 @@ func (c *harborHandlerImpl) handle(ctx context.Context, webhookTrigger *model.Ap return nil, err } if len(comps) == 0 { - return nil, bcode.ErrApplicationComponetNotExist + return nil, bcode.ErrApplicationComponentNotExist } // use the first component as the target component @@ -461,7 +461,7 @@ func (j *jfrogHandlerImpl) handle(ctx context.Context, webhookTrigger *model.App return nil, err } if len(comps) == 0 { - return nil, bcode.ErrApplicationComponetNotExist + return nil, bcode.ErrApplicationComponentNotExist } // use the first component as the target component diff --git a/pkg/apiserver/rest/usecase/workflow_model.go b/pkg/apiserver/rest/usecase/workflow_model.go index b42b07448..0951e2bd6 100644 --- a/pkg/apiserver/rest/usecase/workflow_model.go +++ b/pkg/apiserver/rest/usecase/workflow_model.go @@ -58,7 +58,7 @@ func UpdateEnvWorkflow(ctx context.Context, kubeClient client.Client, ds datasto } var filteredSteps []apisv1.WorkflowStep - _, readyToDeleteSteps, readyToAddSteps := compareSlices(workflowStepNames, envStepNames) + _, readyToDeleteSteps, readyToAddSteps := utils2.ThreeWaySliceCompare(workflowStepNames, envStepNames) for _, step := range workflow.Steps { if isEnvStepType(step.Type) && utils.StringsContain(readyToDeleteSteps, step.Name) { diff --git a/pkg/apiserver/rest/utils/bcode/001_application.go b/pkg/apiserver/rest/utils/bcode/001_application.go index 30d57328f..8b1962c4b 100644 --- a/pkg/apiserver/rest/utils/bcode/001_application.go +++ b/pkg/apiserver/rest/utils/bcode/001_application.go @@ -37,11 +37,11 @@ var ErrDeployApplyFail = NewBcode(500, 10005, "application deploy apply failure" // ErrNoComponent no component var ErrNoComponent = NewBcode(200, 10006, "application not have components, can not deploy") -// ErrApplicationComponetExist application component is exist -var ErrApplicationComponetExist = NewBcode(400, 10007, "application component is exist") +// ErrApplicationComponentExist application component is exist +var ErrApplicationComponentExist = NewBcode(400, 10007, "application component is exist") -// ErrApplicationComponetNotExist application component is not exist -var ErrApplicationComponetNotExist = NewBcode(404, 10008, "application component is not exist") +// ErrApplicationComponentNotExist application component is not exist +var ErrApplicationComponentNotExist = NewBcode(404, 10008, "application component is not exist") // ErrApplicationPolicyExist application policy is exist var ErrApplicationPolicyExist = NewBcode(400, 10009, "application policy is exist") @@ -76,7 +76,7 @@ var ErrApplicationRevisionNotExist = NewBcode(404, 10018, "application revision // ErrApplicationRefusedDelete means the application cannot be deleted because it has been deployed var ErrApplicationRefusedDelete = NewBcode(400, 10019, "The application cannot be deleted because it has been deployed") -// ErrApplicationEnvRefusedDelete means he application env cannot be deleted because it has been deployed +// ErrApplicationEnvRefusedDelete means the application env cannot be deleted because it has been deployed var ErrApplicationEnvRefusedDelete = NewBcode(400, 10020, "The application envbinding cannot be deleted because it has been deployed") // ErrInvalidWebhookToken means the webhook token is invalid @@ -91,5 +91,5 @@ var ErrInvalidWebhookPayloadBody = NewBcode(400, 10023, "Invalid webhook payload // ErrApplicationTriggerNotExist means application trigger is not exist var ErrApplicationTriggerNotExist = NewBcode(404, 10024, "application trigger is not exist") -// ErrApplicationComponetNotAllowDelete means the component is main in one application, and it must be deleted before delete app. -var ErrApplicationComponetNotAllowDelete = NewBcode(400, 10025, "main component in application can not be deleted") +// ErrApplicationComponentNotAllowDelete means the component is main in one application, and it must be deleted before delete app. +var ErrApplicationComponentNotAllowDelete = NewBcode(400, 10025, "main component in application can not be deleted") diff --git a/pkg/apiserver/sync/cache.go b/pkg/apiserver/sync/cache.go new file mode 100644 index 000000000..115605029 --- /dev/null +++ b/pkg/apiserver/sync/cache.go @@ -0,0 +1,102 @@ +/* + Copyright 2022 The KubeVela Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package sync + +import ( + "context" + "errors" + "strconv" + "strings" + + "github.com/sirupsen/logrus" + + "github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1" + "github.com/oam-dev/kubevela/pkg/apiserver/datastore" + "github.com/oam-dev/kubevela/pkg/apiserver/model" +) + +type cached struct { + generation int64 + targets int64 +} + +// InitCache will initialize the cache +func (c *CR2UX) InitCache(ctx context.Context) error { + appsRaw, err := c.ds.List(ctx, &model.Application{}, &datastore.ListOptions{}) + if err != nil { + if errors.Is(err, datastore.ErrRecordNotExist) { + return nil + } + return err + } + for _, appR := range appsRaw { + app, ok := appR.(*model.Application) + if !ok { + continue + } + gen, ok := app.Labels[model.LabelSyncGeneration] + if !ok || gen == "" { + continue + } + namespace := app.Labels[model.LabelSyncNamespace] + var key = formatAppComposedName(app.Name, namespace) + if strings.HasSuffix(app.Name, namespace) { + key = app.Name + } + generation, _ := strconv.ParseInt(gen, 10, 64) + + // we should check targets if we synced from app status + c.updateCache(key, generation, 0) + } + return nil +} + +func (c *CR2UX) shouldSync(ctx context.Context, targetApp *v1beta1.Application, del bool) bool { + key := formatAppComposedName(targetApp.Name, targetApp.Namespace) + cachedData, ok := c.cache.Load(key) + if ok { + cd := cachedData.(*cached) + + // TODO(wonderflow): we should check targets if we sync that, it can avoid missing the status changed for targets updated in multi-cluster deploy, e.g. resumed suspend case. + + if cd.generation == targetApp.Generation && !del { + logrus.Infof("app %s/%s with generation(%v) hasn't updated, ignore the sync event..", targetApp.Name, targetApp.Namespace, targetApp.Generation) + return false + } + if del { + c.cache.Delete(key) + } + } + + sot := CheckSoTFromCR(targetApp) + + // This is a double check to make sure the app not be converted and un-deployed + sot = CheckSoTFromAppMeta(ctx, c.ds, targetApp.Name, targetApp.Namespace, sot) + + switch sot { + case FromUX, FromInner: + // we don't sync if the application is not created from CR + return false + default: + } + return true +} + +func (c *CR2UX) updateCache(key string, generation, targets int64) { + // update cache + c.cache.Store(key, &cached{generation: generation, targets: targets}) +} diff --git a/pkg/apiserver/sync/convert.go b/pkg/apiserver/sync/convert.go new file mode 100644 index 000000000..98c7bda18 --- /dev/null +++ b/pkg/apiserver/sync/convert.go @@ -0,0 +1,230 @@ +/* + Copyright 2022 The KubeVela Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package sync + +import ( + "context" + "strconv" + "strings" + "time" + + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/oam-dev/kubevela/apis/core.oam.dev/common" + "github.com/oam-dev/kubevela/apis/core.oam.dev/v1alpha1" + "github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1" + "github.com/oam-dev/kubevela/pkg/apiserver/model" + "github.com/oam-dev/kubevela/pkg/multicluster" + "github.com/oam-dev/kubevela/pkg/oam" + "github.com/oam-dev/kubevela/pkg/workflow/step" +) + +// ConvertFromCRComponent concerts Application CR Component object into velaux data store component +func ConvertFromCRComponent(appPrimaryKey string, component common.ApplicationComponent) (model.ApplicationComponent, error) { + bc := model.ApplicationComponent{ + AppPrimaryKey: appPrimaryKey, + Name: component.Name, + Type: component.Type, + ExternalRevision: component.ExternalRevision, + DependsOn: component.DependsOn, + Inputs: component.Inputs, + Outputs: component.Outputs, + Scopes: component.Scopes, + Creator: model.AutoGenComp, + } + if component.Properties != nil { + properties, err := model.NewJSONStruct(component.Properties) + if err != nil { + return bc, err + } + bc.Properties = properties + } + for _, trait := range component.Traits { + properties, err := model.NewJSONStruct(trait.Properties) + if err != nil { + return bc, err + } + bc.Traits = append(bc.Traits, model.ApplicationTrait{CreateTime: time.Now(), UpdateTime: time.Now(), Properties: properties, Type: trait.Type, Alias: trait.Type, Description: "auto gen"}) + } + return bc, nil +} + +// ConvertFromCRPolicy converts Application CR Policy object into velaux data store policy +func ConvertFromCRPolicy(appPrimaryKey string, policyCR v1beta1.AppPolicy, creator string) (model.ApplicationPolicy, error) { + plc := model.ApplicationPolicy{ + AppPrimaryKey: appPrimaryKey, + Name: policyCR.Name, + Type: policyCR.Type, + Creator: creator, + } + if policyCR.Properties != nil { + properties, err := model.NewJSONStruct(policyCR.Properties) + if err != nil { + return plc, err + } + plc.Properties = properties + } + return plc, nil +} + +// ConvertFromCRWorkflow converts Application CR Workflow section into velaux data store workflow +func ConvertFromCRWorkflow(ctx context.Context, cli client.Client, appPrimaryKey string, app *v1beta1.Application) (model.Workflow, []v1beta1.WorkflowStep, error) { + dataWf := model.Workflow{ + AppPrimaryKey: appPrimaryKey, + // every namespace has a synced env + EnvName: model.AutoGenEnvNamePrefix + app.Namespace, + // every application has a synced workflow + Name: model.AutoGenWorkflowNamePrefix + app.Name, + Alias: "Synced", + } + if app.Spec.Workflow == nil { + return dataWf, nil, nil + } + var steps []v1beta1.WorkflowStep + if app.Spec.Workflow.Ref != "" { + dataWf.Name = app.Spec.Workflow.Ref + wf := &v1alpha1.Workflow{} + if err := cli.Get(ctx, types.NamespacedName{Namespace: app.GetNamespace(), Name: app.Spec.Workflow.Ref}, wf); err != nil { + return dataWf, nil, err + } + steps = wf.Steps + } else { + steps = app.Spec.Workflow.Steps + } + for _, s := range steps { + if s.Properties == nil { + continue + } + properties, err := model.NewJSONStruct(s.Properties) + if err != nil { + return dataWf, nil, err + } + dataWf.Steps = append(dataWf.Steps, model.WorkflowStep{ + Name: s.Name, + Type: s.Type, + Inputs: s.Inputs, + Outputs: s.Outputs, + DependsOn: s.DependsOn, + Properties: properties, + }) + } + return dataWf, steps, nil +} + +// ConvertFromCRTargets converts deployed Cluster/Namespace from Application CR Status into velaux data store +func ConvertFromCRTargets(targetApp *v1beta1.Application) []*model.Target { + var targets []*model.Target + nc := make(map[string]struct{}) + for _, v := range targetApp.Status.AppliedResources { + var cluster = v.Cluster + if cluster == "" { + cluster = multicluster.ClusterLocalName + } + name := model.AutoGenTargetNamePrefix + cluster + "-" + v.Namespace + if _, ok := nc[name]; ok { + continue + } + nc[name] = struct{}{} + targets = append(targets, &model.Target{ + Name: name, + Cluster: &model.ClusterTarget{ + ClusterName: cluster, + Namespace: v.Namespace, + }, + }) + } + return targets +} + +// ConvertApp2DatastoreApp will convert Application CR to datastore application related resources +func (c *CR2UX) ConvertApp2DatastoreApp(ctx context.Context, targetApp *v1beta1.Application) (*model.DataStoreApp, error) { + cli := c.cli + + appName := c.getAppMetaName(ctx, targetApp.Name, targetApp.Namespace) + + project := model.DefaultInitName + if _, ok := targetApp.Labels[oam.LabelAddonName]; ok && strings.HasPrefix(targetApp.Name, "addon-") { + project = model.DefaultAddonProject + } + appMeta := &model.Application{ + Name: appName, + Description: model.AutoGenDesc, + Alias: targetApp.Name, + Project: project, + Labels: map[string]string{ + model.LabelSyncNamespace: targetApp.Namespace, + model.LabelSyncGeneration: strconv.FormatInt(targetApp.Generation, 10), + }, + } + + // 1. convert app meta and env + dsApp := &model.DataStoreApp{ + AppMeta: appMeta, + Env: &model.Env{ + Name: model.AutoGenEnvNamePrefix + targetApp.Namespace, + Namespace: targetApp.Namespace, + Description: model.AutoGenDesc, + Project: project, + Alias: "Synced", + }, + Eb: &model.EnvBinding{ + AppPrimaryKey: appMeta.PrimaryKey(), + Name: model.AutoGenEnvNamePrefix + targetApp.Namespace, + AppDeployName: appMeta.GetAppNameForSynced(), + }, + } + + // 2. convert component and trait + for _, cmp := range targetApp.Spec.Components { + compModel, err := ConvertFromCRComponent(appMeta.PrimaryKey(), cmp) + if err != nil { + return nil, err + } + dsApp.Comps = append(dsApp.Comps, &compModel) + } + + // 3. convert workflow + wf, steps, err := ConvertFromCRWorkflow(ctx, cli, appMeta.PrimaryKey(), targetApp) + if err != nil { + return nil, err + } + dsApp.Workflow = &wf + + // 4. convert policy, some policies are references in workflow step, we need to sync all the outside policy to make that work + outsidePLC, err := step.LoadExternalPoliciesForWorkflow(ctx, cli, targetApp.Namespace, steps, targetApp.Spec.Policies) + if err != nil { + return nil, err + } + for _, plc := range outsidePLC { + plcModel, err := ConvertFromCRPolicy(appMeta.PrimaryKey(), plc, model.AutoGenRefPolicy) + if err != nil { + return nil, err + } + dsApp.Policies = append(dsApp.Policies, &plcModel) + } + + // TODO(wonderflow): we don't sync targets as it can't be judged well in velaux env + // if we want to sync, we can extract targets from status, like below: + /* + dsApp.Targets = ConvertFromCRTargets(targetApp) + for _, t := range dsApp.Targets { + dsApp.Env.Targets = append(dsApp.Env.Targets, t.Name) + } + */ + return dsApp, nil +} diff --git a/pkg/apiserver/sync/cr2ux.go b/pkg/apiserver/sync/cr2ux.go new file mode 100644 index 000000000..d50be1244 --- /dev/null +++ b/pkg/apiserver/sync/cr2ux.go @@ -0,0 +1,198 @@ +/* + Copyright 2022 The KubeVela Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package sync + +import ( + "context" + "sync" + + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1" + "github.com/oam-dev/kubevela/pkg/apiserver/datastore" + "github.com/oam-dev/kubevela/pkg/apiserver/log" + "github.com/oam-dev/kubevela/pkg/apiserver/model" + "github.com/oam-dev/kubevela/pkg/oam" +) + +const ( + // FromCR means the data source of truth is from k8s CR + FromCR = "from-CR" + // FromUX means the data source of truth is from velaux data store + FromUX = "from-UX" + // FromInner means the data source of truth is from KubeVela inner usage, such as addon or configuration that don't want to be synced + FromInner = "from-inner" + + // SoT means the source of Truth from + SoT = "SourceOfTruth" +) + +// CheckSoTFromCR will check the source of truth of the application +func CheckSoTFromCR(targetApp *v1beta1.Application) string { + + if _, innerUse := targetApp.Annotations[oam.AnnotationSOTFromInner]; innerUse { + return FromInner + } + if _, appName := targetApp.Annotations[oam.AnnotationAppName]; appName { + return FromUX + } + return FromCR +} + +// CheckSoTFromAppMeta will check the source of truth marked in datastore +func CheckSoTFromAppMeta(ctx context.Context, ds datastore.DataStore, appName, namespace string, sotFromCR string) string { + + app := &model.Application{Name: formatAppComposedName(appName, namespace)} + err := ds.Get(ctx, app) + if err != nil { + app = &model.Application{Name: appName} + err = ds.Get(ctx, app) + if err != nil { + return sotFromCR + } + } + if app.Labels == nil || app.Labels[SoT] == "" { + return sotFromCR + } + return app.Labels[SoT] +} + +// CR2UX provides the Add/Update/Delete method +type CR2UX struct { + ds datastore.DataStore + cli client.Client + cache sync.Map +} + +func formatAppComposedName(name, namespace string) string { + return name + "-" + namespace +} + +// we need to prevent the case that one app is deleted ant it's name is pure appName, then other app with namespace suffix will be mixed +func (c *CR2UX) getAppMetaName(ctx context.Context, name, namespace string) string { + alreadyCreated := &model.Application{Name: formatAppComposedName(name, namespace)} + err := c.ds.Get(ctx, alreadyCreated) + if err == nil { + return formatAppComposedName(name, namespace) + } + + // check if it's created the first in database + existApp := &model.Application{Name: name} + err = c.ds.Get(ctx, existApp) + if err == nil { + en := existApp.Labels[model.LabelSyncNamespace] + if en != namespace { + return formatAppComposedName(name, namespace) + } + } + return name +} + +// AddOrUpdate will sync application CR to storage of VelaUX automatically +func (c *CR2UX) AddOrUpdate(ctx context.Context, targetApp *v1beta1.Application) error { + ds := c.ds + + if !c.shouldSync(ctx, targetApp, false) { + return nil + } + + dsApp, err := c.ConvertApp2DatastoreApp(ctx, targetApp) + if err != nil { + log.Logger.Errorf("Convert App to data store err %v", err) + return err + } + + if err = StoreProject(ctx, dsApp.AppMeta.Project, ds); err != nil { + log.Logger.Errorf("get or create project for sync process err %v", err) + return err + } + + if err = StoreEnv(ctx, dsApp, ds); err != nil { + log.Logger.Errorf("Store Env Metadata to data store err %v", err) + return err + } + if err = StoreEnvBinding(ctx, dsApp.Eb, ds); err != nil { + log.Logger.Errorf("Store EnvBinding Metadata to data store err %v", err) + return err + } + if err = StoreComponents(ctx, dsApp.AppMeta.Name, dsApp.Comps, ds); err != nil { + log.Logger.Errorf("Store Components Metadata to data store err %v", err) + return err + } + if err = StorePolicy(ctx, dsApp.AppMeta.Name, dsApp.Policies, ds); err != nil { + log.Logger.Errorf("Store Policy Metadata to data store err %v", err) + return err + } + if err = StoreWorkflow(ctx, dsApp, ds); err != nil { + log.Logger.Errorf("Store Workflow Metadata to data store err %v", err) + return err + } + /* + if err = StoreTargets(ctx, dsApp, ds); err != nil { + log.Logger.Errorf("Store targets to data store err %v", err) + return err + } + + */ + + if err = StoreAppMeta(ctx, dsApp, ds); err != nil { + log.Logger.Errorf("Store App Metadata to data store err %v", err) + return err + } + + // update cache + c.updateCache(dsApp.AppMeta.PrimaryKey(), targetApp.Generation, int64(len(dsApp.Targets))) + return nil +} + +// DeleteApp will delete the application as the CR was deleted +func (c *CR2UX) DeleteApp(ctx context.Context, targetApp *v1beta1.Application) error { + ds := c.ds + + if !c.shouldSync(ctx, targetApp, true) { + return nil + } + appName := c.getAppMetaName(ctx, targetApp.Name, targetApp.Namespace) + + _ = ds.Delete(ctx, &model.Application{Name: appName}) + + cmps, err := ds.List(ctx, &model.ApplicationComponent{AppPrimaryKey: appName}, &datastore.ListOptions{}) + if err != nil { + return err + } + for _, entity := range cmps { + comp := entity.(*model.ApplicationComponent) + if comp.Creator == model.AutoGenComp { + _ = ds.Delete(ctx, comp) + } + } + + plcs, err := ds.List(ctx, &model.ApplicationPolicy{AppPrimaryKey: appName}, &datastore.ListOptions{}) + if err != nil { + return err + } + for _, entity := range plcs { + comp := entity.(*model.ApplicationPolicy) + if comp.Creator == model.AutoGenPolicy { + _ = ds.Delete(ctx, comp) + } + } + + _ = ds.Delete(ctx, &model.Workflow{Name: model.AutoGenWorkflowNamePrefix + appName}) + + return nil +} diff --git a/pkg/apiserver/sync/store.go b/pkg/apiserver/sync/store.go new file mode 100644 index 000000000..99972824f --- /dev/null +++ b/pkg/apiserver/sync/store.go @@ -0,0 +1,230 @@ +/* + Copyright 2022 The KubeVela Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package sync + +import ( + "context" + "errors" + + "github.com/oam-dev/kubevela/pkg/apiserver/datastore" + "github.com/oam-dev/kubevela/pkg/apiserver/log" + "github.com/oam-dev/kubevela/pkg/apiserver/model" + "github.com/oam-dev/kubevela/pkg/utils" +) + +// StoreProject will create project for synced application +func StoreProject(ctx context.Context, name string, ds datastore.DataStore) error { + err := ds.Get(ctx, &model.Project{Name: name}) + if err == nil { + // it means the record already exists, don't need to add anything + return nil + } + if !errors.Is(err, datastore.ErrRecordNotExist) { + // other database error, return it + return err + } + proj := &model.Project{ + Name: name, + Description: model.AutoGenProj, + } + return ds.Add(ctx, proj) +} + +// StoreAppMeta will sync application metadata from CR to datastore +func StoreAppMeta(ctx context.Context, app *model.DataStoreApp, ds datastore.DataStore) error { + err := ds.Get(ctx, &model.Application{Name: app.AppMeta.Name}) + if err == nil { + // it means the record already exists + return ds.Put(ctx, app.AppMeta) + } + if !errors.Is(err, datastore.ErrRecordNotExist) { + // other database error, return it + return err + } + return ds.Add(ctx, app.AppMeta) +} + +// StoreEnv will sync application namespace from CR to datastore env, one namespace belongs to one env +func StoreEnv(ctx context.Context, app *model.DataStoreApp, ds datastore.DataStore) error { + curEnv := &model.Env{Name: app.Env.Name} + err := ds.Get(ctx, curEnv) + if err == nil { + // it means the record already exists, compare the targets + if utils.EqualSlice(curEnv.Targets, app.Env.Targets) { + return nil + } + return ds.Put(ctx, app.Env) + } + if !errors.Is(err, datastore.ErrRecordNotExist) { + // other database error, return it + return err + } + return ds.Add(ctx, app.Env) +} + +// StoreEnvBinding will add envbinding for application CR one application one envbinding +func StoreEnvBinding(ctx context.Context, eb *model.EnvBinding, ds datastore.DataStore) error { + err := ds.Get(ctx, eb) + if err == nil { + // it means the record already exists, don't need to add anything + return nil + } + if !errors.Is(err, datastore.ErrRecordNotExist) { + // other database error, return it + return err + } + return ds.Add(ctx, eb) +} + +// StoreComponents will sync application components from CR to datastore +func StoreComponents(ctx context.Context, appPrimaryKey string, expComps []*model.ApplicationComponent, ds datastore.DataStore) error { + + // list the existing components in datastore + originComps, err := ds.List(ctx, &model.ApplicationComponent{AppPrimaryKey: appPrimaryKey}, &datastore.ListOptions{}) + if err != nil { + return err + } + var originCompNames []string + for _, entity := range originComps { + comp := entity.(*model.ApplicationComponent) + originCompNames = append(originCompNames, comp.Name) + } + + var targetCompNames []string + for _, comp := range expComps { + targetCompNames = append(targetCompNames, comp.Name) + } + + _, readyToDelete, readyToAdd := utils.ThreeWaySliceCompare(originCompNames, targetCompNames) + + // delete the components that not belongs to the new app + for _, entity := range originComps { + comp := entity.(*model.ApplicationComponent) + // we only compare for components that automatically generated by sync process. + if comp.Creator != model.AutoGenComp { + continue + } + if !utils.StringsContain(readyToDelete, comp.Name) { + continue + } + if err := ds.Delete(ctx, comp); err != nil { + if errors.Is(err, datastore.ErrRecordNotExist) { + continue + } + log.Logger.Warnf("delete comp %s for app %s failure %s", comp.Name, appPrimaryKey, err.Error()) + } + } + + // add or update new app's components for datastore + for _, comp := range expComps { + if utils.StringsContain(readyToAdd, comp.Name) { + err = ds.Add(ctx, comp) + } else { + err = ds.Put(ctx, comp) + } + if err != nil { + log.Logger.Warnf("convert comp %s for app %s into datastore failure %s", comp.Name, utils.Sanitize(appPrimaryKey), err.Error()) + return err + } + } + return nil +} + +// StorePolicy will add/update/delete policies, we don't delete ref policy +func StorePolicy(ctx context.Context, appPrimaryKey string, expPolicies []*model.ApplicationPolicy, ds datastore.DataStore) error { + // list the existing policies for this app in datastore + originPolicies, err := ds.List(ctx, &model.ApplicationPolicy{AppPrimaryKey: appPrimaryKey}, &datastore.ListOptions{}) + if err != nil { + return err + } + var originPolicyNames []string + for _, entity := range originPolicies { + plc := entity.(*model.ApplicationPolicy) + originPolicyNames = append(originPolicyNames, plc.Name) + } + + var targetPLCNames []string + for _, plc := range expPolicies { + targetPLCNames = append(targetPLCNames, plc.Name) + } + + _, readyToDelete, readyToAdd := utils.ThreeWaySliceCompare(originPolicyNames, targetPLCNames) + + // delete the components that not belongs to the new app + for _, entity := range originPolicies { + plc := entity.(*model.ApplicationPolicy) + // we only compare for policies that automatically generated by sync process, and the policy should not be ref ones. + if plc.Creator != model.AutoGenPolicy { + continue + } + if !utils.StringsContain(readyToDelete, plc.Name) { + continue + } + if err := ds.Delete(ctx, plc); err != nil { + if errors.Is(err, datastore.ErrRecordNotExist) { + continue + } + log.Logger.Warnf("delete policy %s for app %s failure %s", plc.Name, appPrimaryKey, err.Error()) + } + } + + // add or update new app's policies for datastore + for _, plc := range expPolicies { + if utils.StringsContain(readyToAdd, plc.Name) { + err = ds.Add(ctx, plc) + } else { + err = ds.Put(ctx, plc) + } + if err != nil { + log.Logger.Warnf("convert policy %s for app %s into datastore failure %s", plc.Name, utils.Sanitize(appPrimaryKey), err.Error()) + return err + } + } + return nil +} + +// StoreWorkflow will sync workflow application CR to datastore, it updates the only one workflow from the application with specified name +func StoreWorkflow(ctx context.Context, dsApp *model.DataStoreApp, ds datastore.DataStore) error { + err := ds.Get(ctx, &model.Workflow{AppPrimaryKey: dsApp.AppMeta.Name, Name: dsApp.Workflow.Name}) + if err == nil { + // it means the record already exists, update it + return ds.Put(ctx, dsApp.Workflow) + } + if !errors.Is(err, datastore.ErrRecordNotExist) { + // other database error, return it + return err + } + return ds.Add(ctx, dsApp.Workflow) +} + +// StoreTargets will sync targets from application CR to datastore +func StoreTargets(ctx context.Context, dsApp *model.DataStoreApp, ds datastore.DataStore) error { + for _, t := range dsApp.Targets { + err := ds.Get(ctx, t) + if err == nil { + continue + } + if !errors.Is(err, datastore.ErrRecordNotExist) { + // other database error, return it + return err + } + if err = ds.Add(ctx, t); err != nil { + return err + } + } + return nil +} diff --git a/pkg/apiserver/sync/suit_test.go b/pkg/apiserver/sync/suit_test.go new file mode 100644 index 000000000..34f59eaa9 --- /dev/null +++ b/pkg/apiserver/sync/suit_test.go @@ -0,0 +1,100 @@ +/* + Copyright 2022 The KubeVela Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package sync + +import ( + "context" + "fmt" + "math/rand" + "testing" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "k8s.io/client-go/rest" + "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/envtest" + + "github.com/oam-dev/kubevela/pkg/apiserver/clients" + "github.com/oam-dev/kubevela/pkg/apiserver/datastore" + "github.com/oam-dev/kubevela/pkg/apiserver/datastore/kubeapi" + "github.com/oam-dev/kubevela/pkg/apiserver/datastore/mongodb" + "github.com/oam-dev/kubevela/pkg/utils/common" +) + +var cfg *rest.Config +var k8sClient client.Client +var testEnv *envtest.Environment + +func TestSync(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Sync Suite Test") +} + +var _ = BeforeSuite(func(done Done) { + rand.Seed(time.Now().UnixNano()) + By("bootstrapping Sync test environment") + + testEnv = &envtest.Environment{ + ControlPlaneStartTimeout: time.Minute * 3, + ControlPlaneStopTimeout: time.Minute, + UseExistingCluster: pointer.BoolPtr(false), + CRDDirectoryPaths: []string{"../../../charts/vela-core/crds"}, + } + + By("start kube test env") + var err error + cfg, err = testEnv.Start() + Expect(err).ShouldNot(HaveOccurred()) + Expect(cfg).ToNot(BeNil()) + + By("new kube client") + cfg.Timeout = time.Minute * 2 + k8sClient, err = client.New(cfg, client.Options{Scheme: common.Scheme}) + Expect(err).Should(BeNil()) + Expect(k8sClient).ToNot(BeNil()) + clients.SetKubeClient(k8sClient) + By("new kube client success") + clients.SetKubeClient(k8sClient) + Expect(err).Should(BeNil()) + close(done) +}, 240) + +var _ = AfterSuite(func() { + By("tearing down the test environment") + err := testEnv.Stop() + Expect(err).ToNot(HaveOccurred()) +}) + +func NewDatastore(cfg datastore.Config) (ds datastore.DataStore, err error) { + switch cfg.Type { + case "mongodb": + ds, err = mongodb.New(context.Background(), cfg) + if err != nil { + return nil, fmt.Errorf("create mongodb datastore instance failure %w", err) + } + case "kubeapi": + ds, err = kubeapi.New(context.Background(), cfg) + if err != nil { + return nil, fmt.Errorf("create mongodb datastore instance failure %w", err) + } + default: + return nil, fmt.Errorf("not support datastore type %s", cfg.Type) + } + return ds, nil +} diff --git a/pkg/apiserver/sync/testdata/test-app1.yaml b/pkg/apiserver/sync/testdata/test-app1.yaml new file mode 100644 index 000000000..7a8c74d86 --- /dev/null +++ b/pkg/apiserver/sync/testdata/test-app1.yaml @@ -0,0 +1,37 @@ +apiVersion: core.oam.dev/v1beta1 +kind: Application +metadata: + name: example +spec: + components: + - name: nginx + type: webservice + properties: + image: nginx + traits: + - type: gateway + properties: + domain: testsvc.example.com + http: + "/": 8000 + - name: nginx2 + type: webservice + properties: + image: nginx2 + policies: + - name: topology-beijing-demo + type: topology + properties: + clusterLabelSelector: + region: beijing + namespace: demo + - name: topology-local + type: topology + properties: + targets: ["local/demo", "local/ackone-demo"] + workflow: + steps: + - type: deploy + name: deploy-local + properties: + policies: ["topology-local", "topology-beijing-demo"] \ No newline at end of file diff --git a/pkg/apiserver/sync/testdata/test-app2.yaml b/pkg/apiserver/sync/testdata/test-app2.yaml new file mode 100644 index 000000000..6f1164c5a --- /dev/null +++ b/pkg/apiserver/sync/testdata/test-app2.yaml @@ -0,0 +1,10 @@ +apiVersion: core.oam.dev/v1beta1 +kind: Application +metadata: + name: example +spec: + components: + - name: blog + type: webservice + properties: + image: wordpress diff --git a/pkg/apiserver/sync/testdata/test-app3.yaml b/pkg/apiserver/sync/testdata/test-app3.yaml new file mode 100644 index 000000000..a3a80ab4b --- /dev/null +++ b/pkg/apiserver/sync/testdata/test-app3.yaml @@ -0,0 +1,33 @@ +apiVersion: core.oam.dev/v1beta1 +kind: Application +metadata: + name: example +spec: + components: + - name: blog + type: webservice + properties: + image: wordpress + traits: + - type: gateway + properties: + domain: testsvc.example.com + http: + "/": 8000 + - name: nginx2 + type: webservice + properties: + image: nginx + policies: + - name: topology-beijing-demo + type: topology + properties: + clusterLabelSelector: + region: beijing + namespace: demo + workflow: + steps: + - type: deploy + name: deploy-local + properties: + policies: ["topology-beijing-demo"] \ No newline at end of file diff --git a/pkg/apiserver/sync/worker.go b/pkg/apiserver/sync/worker.go new file mode 100644 index 000000000..a0d1c54f7 --- /dev/null +++ b/pkg/apiserver/sync/worker.go @@ -0,0 +1,103 @@ +/* + Copyright 2022 The KubeVela Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package sync + +import ( + "context" + "encoding/json" + "sync" + + "github.com/fatih/color" + "github.com/sirupsen/logrus" + v1 "k8s.io/api/core/v1" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1" + "github.com/oam-dev/kubevela/pkg/apiserver/clients" + "github.com/oam-dev/kubevela/pkg/apiserver/datastore" +) + +// Start prepares watchers and run their controllers, then waits for process termination signals +func Start(ctx context.Context, ds datastore.DataStore, cfg *rest.Config) { + k8sClient, err := clients.GetKubeClient() + if err != nil { + logrus.Fatal(err) + } + + dynamicClient, err := dynamic.NewForConfig(cfg) + if err != nil { + logrus.Fatal(err) + } + + f := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicClient, 0, v1.NamespaceAll, nil) + + startAppSyncing(ctx, f, ds, k8sClient) +} + +func startAppSyncing(ctx context.Context, factory dynamicinformer.DynamicSharedInformerFactory, ds datastore.DataStore, cli client.Client) { + var err error + informer := factory.ForResource(v1beta1.SchemeGroupVersion.WithResource("applications")).Informer() + getApp := func(obj interface{}) *v1beta1.Application { + app := &v1beta1.Application{} + bs, _ := json.Marshal(obj) + _ = json.Unmarshal(bs, app) + return app + } + cu := &CR2UX{ + ds: ds, + cli: cli, + cache: sync.Map{}, + } + if err = cu.InitCache(ctx); err != nil { + klog.Fatal("sync app init err", err) + } + + handlers := cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + app := getApp(obj) + klog.Infof("watched add app event, namespace: %s, name: %s", app.Namespace, app.Name) + err = cu.AddOrUpdate(ctx, app) + if err != nil { + logrus.Errorf("Application %-30s Create Sync to db err %v", color.WhiteString(app.Namespace+"/"+app.Name), err) + } + }, + UpdateFunc: func(oldObj, obj interface{}) { + app := getApp(obj) + klog.Infof("watched update app event, namespace: %s, name: %s", app.Namespace, app.Name) + err = cu.AddOrUpdate(ctx, app) + if err != nil { + klog.Errorf("Application %-30s Update Sync to db err %v", color.WhiteString(app.Namespace+"/"+app.Name), err) + } + }, + DeleteFunc: func(obj interface{}) { + app := getApp(obj) + klog.Infof("watched delete app event, namespace: %s, name: %s", app.Namespace, app.Name) + err = cu.DeleteApp(ctx, app) + if err != nil { + klog.Errorf("Application %-30s Deleted Sync to db err %v", color.WhiteString(app.Namespace+"/"+app.Name), err) + } + }, + } + informer.AddEventHandler(handlers) + klog.Info("app syncing started") + informer.Run(ctx.Done()) +} diff --git a/pkg/apiserver/sync/worker_test.go b/pkg/apiserver/sync/worker_test.go new file mode 100644 index 000000000..f13911ee6 --- /dev/null +++ b/pkg/apiserver/sync/worker_test.go @@ -0,0 +1,127 @@ +/* + Copyright 2022 The KubeVela Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package sync + +import ( + "context" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + corev1 "k8s.io/api/core/v1" + + "github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1" + "github.com/oam-dev/kubevela/pkg/apiserver/datastore" + "github.com/oam-dev/kubevela/pkg/apiserver/model" + "github.com/oam-dev/kubevela/pkg/oam/util" + common2 "github.com/oam-dev/kubevela/pkg/utils/common" +) + +var _ = Describe("Test Worker CR sync to datastore", func() { + BeforeEach(func() { + }) + + It("Test app sync test-app1", func() { + + By("Preparing database") + dbNamespace := "sync-db-ns1-test" + appNS1 := "sync-worker-test-ns1" + appNS2 := "sync-worker-test-ns2" + ds, err := NewDatastore(datastore.Config{Type: "kubeapi", Database: "sync-test-db1"}) + Expect(ds).ToNot(BeNil()) + Expect(err).Should(BeNil()) + var ns = corev1.Namespace{} + ns.Name = dbNamespace + err = k8sClient.Create(context.TODO(), &ns) + Expect(err).Should(SatisfyAny(BeNil(), &util.AlreadyExistMatcher{})) + ns.Name = appNS1 + ns.ResourceVersion = "" + err = k8sClient.Create(context.TODO(), &ns) + Expect(err).Should(SatisfyAny(BeNil(), &util.AlreadyExistMatcher{})) + ns.Name = appNS2 + ns.ResourceVersion = "" + err = k8sClient.Create(context.TODO(), &ns) + Expect(err).Should(SatisfyAny(BeNil(), &util.AlreadyExistMatcher{})) + + By("Start syncing") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go Start(ctx, ds, cfg) + + By("create test app1 and check the syncing results") + app1 := &v1beta1.Application{} + Expect(common2.ReadYamlToObject("testdata/test-app1.yaml", app1)).Should(BeNil()) + app1.Namespace = appNS1 + Expect(k8sClient.Create(context.TODO(), app1)).Should(BeNil()) + + Eventually(func() error { + appm := model.Application{Name: app1.Name} + return ds.Get(ctx, &appm) + }, 10*time.Second, 100*time.Millisecond).Should(BeNil()) + + comp1 := model.ApplicationComponent{AppPrimaryKey: app1.Name, Name: "nginx"} + Expect(ds.Get(ctx, &comp1)).Should(BeNil()) + Expect(comp1.Properties).Should(BeEquivalentTo(&model.JSONStruct{"image": "nginx"})) + + comp2 := model.ApplicationComponent{AppPrimaryKey: app1.Name, Name: "nginx2"} + Expect(ds.Get(ctx, &comp2)).Should(BeNil()) + Expect(comp2.Properties).Should(BeEquivalentTo(&model.JSONStruct{"image": "nginx2"})) + + appPlc1 := model.ApplicationPolicy{AppPrimaryKey: app1.Name, Name: "topology-beijing-demo"} + Expect(ds.Get(ctx, &appPlc1)).Should(BeNil()) + Expect(appPlc1.Properties).Should(BeEquivalentTo(&model.JSONStruct{"namespace": "demo", "clusterLabelSelector": map[string]interface{}{"region": "beijing"}})) + + appPlc2 := model.ApplicationPolicy{AppPrimaryKey: app1.Name, Name: "topology-local"} + Expect(ds.Get(ctx, &appPlc2)).Should(BeNil()) + Expect(appPlc2.Properties).Should(BeEquivalentTo(&model.JSONStruct{"targets": []interface{}{"local/demo", "local/ackone-demo"}})) + + appwf1 := model.Workflow{AppPrimaryKey: app1.Name, Name: model.AutoGenWorkflowNamePrefix + app1.Name} + Expect(ds.Get(ctx, &appwf1)).Should(BeNil()) + + By("create test app2 and check the syncing results") + app2 := &v1beta1.Application{} + Expect(common2.ReadYamlToObject("testdata/test-app2.yaml", app2)).Should(BeNil()) + app2.Namespace = appNS2 + Expect(k8sClient.Create(context.TODO(), app2)).Should(BeNil()) + + Eventually(func() error { + appm := model.Application{Name: formatAppComposedName(app2.Name, app2.Namespace)} + return ds.Get(ctx, &appm) + }, 10*time.Second, 100*time.Millisecond).Should(BeNil()) + + By("delete test app1 and check the syncing results") + Expect(k8sClient.Delete(context.TODO(), app1)).Should(BeNil()) + Eventually(func() error { + appm := model.Application{Name: app1.Name} + return ds.Get(ctx, &appm) + }, 10*time.Second, 100*time.Millisecond).Should(BeEquivalentTo(datastore.ErrRecordNotExist)) + + By("update test app2 and check the syncing results") + newapp2 := &v1beta1.Application{} + Expect(common2.ReadYamlToObject("testdata/test-app3.yaml", newapp2)).Should(BeNil()) + app2.Spec = newapp2.Spec + Expect(k8sClient.Update(context.TODO(), app2)).Should(BeNil()) + + Eventually(func() error { + appm := model.ApplicationComponent{AppPrimaryKey: formatAppComposedName(app2.Name, app2.Namespace), Name: "nginx2"} + return ds.Get(ctx, &appm) + }, 10*time.Second, 100*time.Millisecond).Should(BeNil()) + + }) + +}) diff --git a/pkg/oam/labels.go b/pkg/oam/labels.go index 3867cff9f..8dba9f204 100644 --- a/pkg/oam/labels.go +++ b/pkg/oam/labels.go @@ -172,6 +172,7 @@ const ( AnnotationWorkflowName = "app.oam.dev/workflowName" // AnnotationAppName specifies the name for application in db. + // Note: the annotation is only created by velaUX, please don't use it in other Source of Truth. AnnotationAppName = "app.oam.dev/appName" // AnnotationAppAlias specifies the alias for application in db. @@ -189,4 +190,7 @@ const ( // AnnotationServiceAccountName indicates the name of the ServiceAccount to use to apply Components and run Workflow. // ServiceAccount will be used in the local cluster only. AnnotationServiceAccountName = "app.oam.dev/service-account-name" + + // AnnotationSOTFromInner indicates the application source of truth is from inner and should not be synced + AnnotationSOTFromInner = "sot.oam.dev/from-inner" ) diff --git a/pkg/utils/strings.go b/pkg/utils/strings.go index 2886f387a..77678c54e 100644 --- a/pkg/utils/strings.go +++ b/pkg/utils/strings.go @@ -16,6 +16,11 @@ limitations under the License. package utils +import ( + "reflect" + "sort" +) + // StringsContain strings contain func StringsContain(items []string, source string) bool { for _, item := range items { @@ -25,3 +30,36 @@ func StringsContain(items []string, source string) bool { } return false } + +// ThreeWaySliceCompare will compare two string slice, with the three return values: [both in A and B], [only in A], [only in B] +func ThreeWaySliceCompare(a []string, b []string) ([]string, []string, []string) { + m := make(map[string]struct{}) + for _, k := range b { + m[k] = struct{}{} + } + + var AB, AO, BO []string + for _, k := range a { + _, ok := m[k] + if !ok { + AO = append(AO, k) + continue + } + AB = append(AB, k) + delete(m, k) + } + for k := range m { + BO = append(BO, k) + } + sort.Strings(AB) + sort.Strings(AO) + sort.Strings(BO) + return AB, AO, BO +} + +// EqualSlice checks if two slice are equal +func EqualSlice(a, b []string) bool { + sort.Strings(a) + sort.Strings(b) + return reflect.DeepEqual(a, b) +} diff --git a/pkg/utils/strings_test.go b/pkg/utils/strings_test.go new file mode 100644 index 000000000..7ad35faf8 --- /dev/null +++ b/pkg/utils/strings_test.go @@ -0,0 +1,79 @@ +/* + Copyright 2022 The KubeVela Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package utils + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCompareSlice(t *testing.T) { + caseA := []string{"c", "b", "a"} + caseB := []string{"c", "b", "a"} + gotab, gotao, gotbo := ThreeWaySliceCompare(caseA, caseB) + assert.Equal(t, gotab, []string{"a", "b", "c"}) + assert.Equal(t, gotao, []string(nil)) + assert.Equal(t, gotbo, []string(nil)) + + caseA = []string{"c", "b"} + caseB = []string{"c", "a"} + gotab, gotao, gotbo = ThreeWaySliceCompare(caseA, caseB) + assert.Equal(t, gotab, []string{"c"}) + assert.Equal(t, gotao, []string{"b"}) + assert.Equal(t, gotbo, []string{"a"}) + + caseA = []string{"c", "b"} + caseB = nil + gotab, gotao, gotbo = ThreeWaySliceCompare(caseA, caseB) + assert.Equal(t, gotab, []string(nil)) + assert.Equal(t, gotao, []string{"b", "c"}) + assert.Equal(t, gotbo, []string(nil)) + + caseA = nil + caseB = []string{"c", "b"} + gotab, gotao, gotbo = ThreeWaySliceCompare(caseA, caseB) + assert.Equal(t, gotab, []string(nil)) + assert.Equal(t, gotao, []string(nil)) + assert.Equal(t, gotbo, []string{"b", "c"}) +} + +func TestEqual(t *testing.T) { + caseA := []string{"c", "b", "a"} + caseB := []string{"c", "b", "a"} + assert.Equal(t, EqualSlice(caseA, caseB), true) + + caseA = []string{"c", "a", "b"} + caseB = []string{"c", "b", "a"} + assert.Equal(t, EqualSlice(caseA, caseB), true) + + caseA = []string{"c", "a", "b"} + caseB = []string{"b", "a", "c"} + assert.Equal(t, EqualSlice(caseA, caseB), true) + + caseA = []string{"c", "a", "b"} + caseB = []string{"b", "a", "c", "d"} + assert.Equal(t, EqualSlice(caseA, caseB), false) + + caseA = []string{} + caseB = []string{} + assert.Equal(t, EqualSlice(caseA, caseB), true) + + caseA = []string{} + caseB = []string{"b", "a", "c"} + assert.Equal(t, EqualSlice(caseA, caseB), false) +} diff --git a/pkg/velaql/providers/query/collector.go b/pkg/velaql/providers/query/collector.go index 72eef4a2f..6d76b757f 100644 --- a/pkg/velaql/providers/query/collector.go +++ b/pkg/velaql/providers/query/collector.go @@ -144,6 +144,9 @@ func (c *AppCollector) FindResourceFromResourceTrackerSpec(app *v1beta1.Applicat } return nil, err } + if objRef.Cluster == "" { + objRef.Cluster = multicluster.ClusterLocalName + } resources = append(resources, Resource{ Cluster: objRef.Cluster, Revision: obj.GetLabels()[oam.LabelAppRevision],