From e2f95e065ab516c056ef7ded8dc833524ac8df22 Mon Sep 17 00:00:00 2001 From: Somefive Date: Tue, 7 Feb 2023 11:12:40 +0800 Subject: [PATCH] Feat: update sharding using pkg (#5430) Signed-off-by: Somefive --- .github/dependabot.yml | 2 +- cmd/core/app/options/options.go | 5 +- cmd/core/app/server.go | 6 +- go.mod | 8 +- go.sum | 16 +- .../v1alpha2/application/revision.go | 2 +- pkg/controller/sharding/cache.go | 39 ---- pkg/controller/sharding/flags.go | 29 --- pkg/controller/sharding/scheduler.go | 220 ------------------ pkg/controller/sharding/suite_test.go | 84 ------- pkg/controller/sharding/util.go | 68 ------ pkg/controller/sharding/util_test.go | 62 ----- pkg/controller/sharding/vars.go | 58 ----- pkg/resourcetracker/app.go | 2 +- pkg/utils/app/reschedule.go | 2 +- pkg/utils/app/reschedule_test.go | 2 +- .../v1alpha2/application/mutating_handler.go | 4 +- .../v1alpha2/application/validation.go | 2 +- references/cli/up.go | 2 +- 19 files changed, 27 insertions(+), 586 deletions(-) delete mode 100644 pkg/controller/sharding/cache.go delete mode 100644 pkg/controller/sharding/flags.go delete mode 100644 pkg/controller/sharding/scheduler.go delete mode 100644 pkg/controller/sharding/suite_test.go delete mode 100644 pkg/controller/sharding/util.go delete mode 100644 pkg/controller/sharding/util_test.go delete mode 100644 pkg/controller/sharding/vars.go diff --git a/.github/dependabot.yml b/.github/dependabot.yml index c0ae9d59b..26bcd5116 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -12,7 +12,7 @@ updates: commit-message: prefix: "Chore: " include: "scope" -- package-ecosystem: "github-actions" + - package-ecosystem: "github-actions" directory: "/" schedule: interval: "weekly" diff --git a/cmd/core/app/options/options.go b/cmd/core/app/options/options.go index 966222e83..58201e26e 100644 --- a/cmd/core/app/options/options.go +++ b/cmd/core/app/options/options.go @@ -22,6 +22,7 @@ import ( pkgclient "github.com/kubevela/pkg/controller/client" ctrlrec "github.com/kubevela/pkg/controller/reconciler" + "github.com/kubevela/pkg/controller/sharding" pkgmulticluster "github.com/kubevela/pkg/multicluster" utillog "github.com/kubevela/pkg/util/log" wfTypes "github.com/kubevela/workflow/pkg/types" @@ -30,11 +31,9 @@ import ( standardcontroller "github.com/oam-dev/kubevela/pkg/controller" commonconfig "github.com/oam-dev/kubevela/pkg/controller/common" - "github.com/oam-dev/kubevela/pkg/controller/sharding" + oamcontroller "github.com/oam-dev/kubevela/pkg/controller/core.oam.dev" "github.com/oam-dev/kubevela/pkg/oam" "github.com/oam-dev/kubevela/pkg/resourcekeeper" - - oamcontroller "github.com/oam-dev/kubevela/pkg/controller/core.oam.dev" ) // CoreOptions contains everything necessary to create and run vela-core diff --git a/cmd/core/app/server.go b/cmd/core/app/server.go index 53099f50a..6515c0e21 100644 --- a/cmd/core/app/server.go +++ b/cmd/core/app/server.go @@ -26,6 +26,8 @@ import ( "time" velaclient "github.com/kubevela/pkg/controller/client" + "github.com/kubevela/pkg/controller/sharding" + "github.com/kubevela/pkg/meta" "github.com/kubevela/workflow/pkg/cue/packages" "github.com/pkg/errors" "github.com/spf13/cobra" @@ -46,7 +48,6 @@ import ( commonconfig "github.com/oam-dev/kubevela/pkg/controller/common" oamv1alpha2 "github.com/oam-dev/kubevela/pkg/controller/core.oam.dev/v1alpha2" "github.com/oam-dev/kubevela/pkg/controller/core.oam.dev/v1alpha2/application" - "github.com/oam-dev/kubevela/pkg/controller/sharding" "github.com/oam-dev/kubevela/pkg/controller/utils" "github.com/oam-dev/kubevela/pkg/features" "github.com/oam-dev/kubevela/pkg/monitor/watcher" @@ -88,6 +89,7 @@ the core control loops shipped with KubeVela`, if s.PprofAddr != "" { go pkgutil.EnablePprof(s.PprofAddr, nil) } + meta.Name = types.VelaCoreName klog.InfoS("KubeVela information", "version", version.VelaVersion, "revision", version.GitRevision) klog.InfoS("Disable capabilities", "name", s.DisableCaps) @@ -214,7 +216,7 @@ func prepareRunInShardingMode(ctx context.Context, mgr manager.Manager, s *optio if sharding.IsMaster() { klog.Infof("controller running in sharding mode, current shard is master") if !utilfeature.DefaultMutableFeatureGate.Enabled(features.DisableWebhookAutoSchedule) { - go sharding.DefaultApplicationScheduler.Get().Start(ctx) + go sharding.DefaultScheduler.Get().Start(ctx) } if err := prepareRun(ctx, mgr, s); err != nil { return err diff --git a/go.mod b/go.mod index 02e9204c3..4abb3e2f8 100644 --- a/go.mod +++ b/go.mod @@ -55,7 +55,7 @@ require ( github.com/hinshun/vt10x v0.0.0-20180616224451-1954e6464174 github.com/imdario/mergo v0.3.13 github.com/koding/websocketproxy v0.0.0-20181220232114-7ed82d81a28c - github.com/kubevela/pkg v0.0.0-20230118103503-4a6096e79c1c + github.com/kubevela/pkg v0.0.0-20230206074514-7c05c32743e8 github.com/kubevela/prism v1.7.0-alpha.1 github.com/kubevela/workflow v0.4.1 github.com/kyokomi/emoji v2.2.4+incompatible @@ -100,14 +100,14 @@ require ( k8s.io/apiextensions-apiserver v0.25.2 k8s.io/apimachinery v0.25.3 k8s.io/apiserver v0.25.3 - k8s.io/cli-runtime v0.25.2 + k8s.io/cli-runtime v0.25.3 k8s.io/client-go v0.25.3 k8s.io/component-base v0.25.3 k8s.io/helm v2.17.0+incompatible k8s.io/klog/v2 v2.70.1 k8s.io/kube-aggregator v0.25.3 - k8s.io/kubectl v0.25.2 - k8s.io/metrics v0.25.2 + k8s.io/kubectl v0.25.3 + k8s.io/metrics v0.25.3 k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed open-cluster-management.io/api v0.7.0 sigs.k8s.io/controller-runtime v0.12.3 diff --git a/go.sum b/go.sum index 70d60d143..0e6b1d271 100644 --- a/go.sum +++ b/go.sum @@ -1111,8 +1111,8 @@ github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/kubevela/pkg v0.0.0-20230118103503-4a6096e79c1c h1:XN6jFY58O5rnkQNoJ6k4kPOgxpe9ovsjDGsMgs9CekQ= -github.com/kubevela/pkg v0.0.0-20230118103503-4a6096e79c1c/go.mod h1:BeFBrXLEnoJZOgBOSrE/KzhIgpmJFRrK/kwVVKVAP1w= +github.com/kubevela/pkg v0.0.0-20230206074514-7c05c32743e8 h1:jWkEQVVovRqONGoJ+WHzDlsvJQEkmTMvcer40UbsEXw= +github.com/kubevela/pkg v0.0.0-20230206074514-7c05c32743e8/go.mod h1:zJTitvYbj1Vg4l4FvqjDRJEjufT6GRKs8m+fY3V9d3E= github.com/kubevela/prism v1.7.0-alpha.1 h1:oeZFn1Oy6gxSSFzMTfsWjLOCKaaooMVm1JGNK4j4Mlo= github.com/kubevela/prism v1.7.0-alpha.1/go.mod h1:AJSDfdA+RkRSnWx3xEcogbmOTpX+l7RSIwqVHxwUtaI= github.com/kubevela/workflow v0.4.1 h1:lYeWE9KgSSkb368u8G7cGfyzCz41Am8MdxgViRFJxXE= @@ -2660,8 +2660,8 @@ k8s.io/apiserver v0.22.2/go.mod h1:vrpMmbyjWrgdyOvZTSpsusQq5iigKNWv9o9KlDAbBHI= k8s.io/apiserver v0.22.6/go.mod h1:OlL1rGa2kKWGj2JEXnwBcul/BwC9Twe95gm4ohtiIIs= k8s.io/apiserver v0.25.3 h1:m7+xGuG5+KYAnEsqaFtDyWMkmMMEOFYlu+NlWv5qSBI= k8s.io/apiserver v0.25.3/go.mod h1:9bT47iM2fzRuhICJpM/RcQR9sqDDfZ7Yw60h0p3JW08= -k8s.io/cli-runtime v0.25.2 h1:XOx+SKRjBpYMLY/J292BHTkmyDffl/qOx3YSuFZkTuc= -k8s.io/cli-runtime v0.25.2/go.mod h1:OQx3+/0st6x5YpkkJQlEWLC73V0wHsOFMC1/roxV8Oc= +k8s.io/cli-runtime v0.25.3 h1:Zs7P7l7db/5J+KDePOVtDlArAa9pZXaDinGWGZl0aM8= +k8s.io/cli-runtime v0.25.3/go.mod h1:InHHsjkyW5hQsILJGpGjeruiDZT/R0OkROQgD6GzxO4= k8s.io/client-go v0.0.0-20190620085101-78d2af792bab/go.mod h1:E95RaSlHr79aHaX0aGSwcPNfygDiPKOVXdmivCIZT0k= k8s.io/client-go v0.0.0-20190918160344-1fbdaa4c8d90/go.mod h1:J69/JveO6XESwVgG53q3Uz5OSfgsv4uxpScmmyYOOlk= k8s.io/client-go v0.0.0-20191122220542-ed16ecbdf3a0/go.mod h1:tyxNgOmR/Xi39HrlQ/9LQgiHJgBvmY7gp95o5GpBA4o= @@ -2741,10 +2741,10 @@ k8s.io/kube-openapi v0.0.0-20211109043538-20434351676c/go.mod h1:vHXdDvt9+2spS2R k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65/go.mod h1:sX9MT8g7NVZM5lVL/j8QyCCJe8YSMW30QvGZWaCIDIk= k8s.io/kube-openapi v0.0.0-20220803162953-67bda5d908f1 h1:MQ8BAZPZlWk3S9K4a9NCkIFQtZShWqoha7snGixVgEA= k8s.io/kube-openapi v0.0.0-20220803162953-67bda5d908f1/go.mod h1:C/N6wCaBHeBHkHUesQOQy2/MZqGgMAFPqGsGQLdbZBU= -k8s.io/kubectl v0.25.2 h1:2993lTeVimxKSWx/7z2PiJxUILygRa3tmC4QhFaeioA= -k8s.io/kubectl v0.25.2/go.mod h1:eoBGJtKUj7x38KXelz+dqVtbtbKwCqyKzJWmBHU0prg= -k8s.io/metrics v0.25.2 h1:105TuPaIFfr4EHzN56WwZJO7r1UesuDytNTzeMqGySo= -k8s.io/metrics v0.25.2/go.mod h1:4NDAauOuEJ+NWO2+hWkhFE4rWBx/plLWJOYU3vGl0sA= +k8s.io/kubectl v0.25.3 h1:HnWJziEtmsm4JaJiKT33kG0kadx68MXxUE8UEbXnN4U= +k8s.io/kubectl v0.25.3/go.mod h1:glU7PiVj/R6Ud4A9FJdTcJjyzOtCJyc0eO7Mrbh3jlI= +k8s.io/metrics v0.25.3 h1:fp5RuALkbwI3UbKITdNYu6sa3LF4JPANR/ofq3oe+Fg= +k8s.io/metrics v0.25.3/go.mod h1:5j5FKJb8RHsb3Q2PLsD/p1mLiA1fTrl+a62Les+KDhc= k8s.io/utils v0.0.0-20190221042446-c2654d5206da/go.mod h1:8k8uAuAQ0rXslZKaEWd0c3oVhZz7sSzSiPnVZayjIX0= k8s.io/utils v0.0.0-20190801114015-581e00157fb1/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew= k8s.io/utils v0.0.0-20191114184206-e782cd3c129f/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew= diff --git a/pkg/controller/core.oam.dev/v1alpha2/application/revision.go b/pkg/controller/core.oam.dev/v1alpha2/application/revision.go index 0c7823fca..5b0336289 100644 --- a/pkg/controller/core.oam.dev/v1alpha2/application/revision.go +++ b/pkg/controller/core.oam.dev/v1alpha2/application/revision.go @@ -40,6 +40,7 @@ import ( "github.com/kubevela/pkg/util/compression" + "github.com/kubevela/pkg/controller/sharding" monitorContext "github.com/kubevela/pkg/monitor/context" workflowv1alpha1 "github.com/kubevela/workflow/api/v1alpha1" @@ -52,7 +53,6 @@ import ( helmapi "github.com/oam-dev/kubevela/pkg/appfile/helm/flux2apis" "github.com/oam-dev/kubevela/pkg/auth" "github.com/oam-dev/kubevela/pkg/component" - "github.com/oam-dev/kubevela/pkg/controller/sharding" "github.com/oam-dev/kubevela/pkg/controller/utils" "github.com/oam-dev/kubevela/pkg/cue/process" "github.com/oam-dev/kubevela/pkg/features" diff --git a/pkg/controller/sharding/cache.go b/pkg/controller/sharding/cache.go deleted file mode 100644 index 9a05b51d2..000000000 --- a/pkg/controller/sharding/cache.go +++ /dev/null @@ -1,39 +0,0 @@ -/* -Copyright 2023 The KubeVela Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package sharding - -import ( - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" - "sigs.k8s.io/controller-runtime/pkg/cache" - "sigs.k8s.io/controller-runtime/pkg/client" -) - -// BuildCache add shard-id label selector for given typed object -func BuildCache(scheme *runtime.Scheme, shardingObjects ...client.Object) cache.NewCacheFunc { - opts := cache.Options{ - Scheme: scheme, - SelectorsByObject: map[client.Object]cache.ObjectSelector{}, - } - if EnableSharding { - ls := labels.SelectorFromSet(map[string]string{LabelKubeVelaScheduledShardID: ShardID}) - for _, obj := range shardingObjects { - opts.SelectorsByObject[obj] = cache.ObjectSelector{Label: ls} - } - } - return cache.BuilderWithOptions(opts) -} diff --git a/pkg/controller/sharding/flags.go b/pkg/controller/sharding/flags.go deleted file mode 100644 index f83c7195d..000000000 --- a/pkg/controller/sharding/flags.go +++ /dev/null @@ -1,29 +0,0 @@ -/* -Copyright 2023 The KubeVela Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package sharding - -import ( - "github.com/spf13/pflag" -) - -// AddFlags add sharding flags -func AddFlags(fs *pflag.FlagSet) { - fs.BoolVar(&EnableSharding, "enable-sharding", EnableSharding, "When sharding enabled, the controller will run as master (shard-id=master) or slave mode (shard-id is any non-empty string except master). Refer to https://github.com/kubevela/kubevela/blob/master/design/vela-core/sharding.md for details.") - fs.StringVar(&ShardID, "shard-id", ShardID, "The id for sharding.") - fs.StringSliceVar(&SchedulableShards, "schedulable-shards", SchedulableShards, "The shard ids that are available for scheduling. If empty, dynamic discovery will be used.") - fs.DurationVar(&DynamicDiscoverySchedulerResyncPeriod, "sharding-slave-discovery-resync-period", DynamicDiscoverySchedulerResyncPeriod, "The resync period for default dynamic discovery scheduler.") -} diff --git a/pkg/controller/sharding/scheduler.go b/pkg/controller/sharding/scheduler.go deleted file mode 100644 index 1809ceaad..000000000 --- a/pkg/controller/sharding/scheduler.go +++ /dev/null @@ -1,220 +0,0 @@ -/* -Copyright 2023 The KubeVela Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package sharding - -import ( - "context" - "math/rand" - "sort" - "strings" - "sync" - "sync/atomic" - "time" - - "github.com/kubevela/pkg/util/k8s" - "github.com/kubevela/pkg/util/maps" - velaruntime "github.com/kubevela/pkg/util/runtime" - "github.com/kubevela/pkg/util/singleton" - "github.com/kubevela/pkg/util/slices" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/selection" - "k8s.io/client-go/tools/cache" - "k8s.io/klog/v2" - "k8s.io/kubectl/pkg/util/podutils" - "sigs.k8s.io/controller-runtime/pkg/client" - - "github.com/oam-dev/kubevela/pkg/utils/util" -) - -// Scheduler schedule shard-id for object -type Scheduler interface { - Start(context.Context) - Schedule(client.Object) bool -} - -var _ Scheduler = (*staticScheduler)(nil) - -// NewStaticScheduler create a scheduler that do not make update but only use predefined shards for allocate -func NewStaticScheduler(shards []string) Scheduler { - return &staticScheduler{shards: shards} -} - -type staticScheduler struct { - shards []string -} - -// Start . -func (in *staticScheduler) Start(ctx context.Context) { - klog.Infof("staticScheduler started, shards: [%s]", strings.Join(in.shards, ", ")) -} - -// Schedule the target object to a random shard -func (in *staticScheduler) Schedule(o client.Object) bool { - if _, scheduled := GetScheduledShardID(o); !scheduled { - if len(in.shards) > 0 { - // nolint - sid := in.shards[rand.Intn(len(in.shards))] - klog.Infof("staticScheduler schedule %s %s/%s to shard[%s]", o.GetObjectKind().GroupVersionKind().Kind, o.GetNamespace(), o.GetName(), sid) - SetScheduledShardID(o, sid) - return true - } - klog.Infof("staticDiscoveryScheduler no schedulable shard found for %s %s/%s", o.GetObjectKind().GroupVersionKind().Kind, o.GetNamespace(), o.GetName()) - } - return false -} - -var _ Scheduler = (*dynamicDiscoveryScheduler)(nil) - -// NewDynamicDiscoveryScheduler create a scheduler that allow dynamic discovery for available shards -func NewDynamicDiscoveryScheduler(name string, resyncPeriod time.Duration) Scheduler { - return &dynamicDiscoveryScheduler{ - name: name, - resyncPeriod: resyncPeriod, - candidates: map[string]map[string]bool{}, - } -} - -type dynamicDiscoveryScheduler struct { - mu sync.RWMutex - - name string - resyncPeriod time.Duration - candidates map[string]map[string]bool - roundRobinIndex atomic.Uint32 - - store cache.Store - informer cache.Controller -} - -func (in *dynamicDiscoveryScheduler) _registerPod(obj interface{}) { - if pod, ok := obj.(*corev1.Pod); ok { - id := pod.GetLabels()[LabelKubeVelaShardID] - healthy := podutils.IsPodReady(pod) - klog.Infof("dynamicDiscoveryScheduler register pod %s/%s (id: %s) with health status: %t", pod.Namespace, pod.Name, id, healthy) - in.mu.Lock() - defer in.mu.Unlock() - if _, exist := in.candidates[id]; !exist { - in.candidates[id] = map[string]bool{} - } - in.candidates[id][pod.Name] = healthy - } -} - -func (in *dynamicDiscoveryScheduler) _unregisterPod(obj interface{}) { - if pod, ok := obj.(*corev1.Pod); ok { - id := pod.GetLabels()[LabelKubeVelaShardID] - klog.Infof("dynamicDiscoveryScheduler unregister pod %s/%s", pod.Namespace, pod.Name) - in.mu.Lock() - defer in.mu.Unlock() - if _, exist := in.candidates[id]; exist { - delete(in.candidates[id], pod.Name) - if len(in.candidates[id]) == 0 { - delete(in.candidates, id) - } - } - } -} - -// resync the available shards -func (in *dynamicDiscoveryScheduler) resync(stopCh <-chan struct{}) { - ticker := time.NewTicker(in.resyncPeriod) - defer ticker.Stop() - for { - select { - case <-stopCh: - return - case <-ticker.C: - in.mu.Lock() - in.candidates = map[string]map[string]bool{} - in.mu.Unlock() - for _, obj := range in.store.List() { - in._registerPod(obj) - } - available := in.availableShards() - klog.Infof("dynamicDiscoveryScheduler resync finished, available shards: [%s]", strings.Join(available, ", ")) - } - } -} - -// Start run scheduler to watch pods and automatic register -func (in *dynamicDiscoveryScheduler) Start(ctx context.Context) { - klog.Infof("dynamicDiscoveryScheduler staring, watching pods in %s", util.GetRuntimeNamespace()) - cli := singleton.StaticClient.Get().CoreV1().RESTClient() - lw := cache.NewFilteredListWatchFromClient(cli, "pods", util.GetRuntimeNamespace(), func(options *metav1.ListOptions) { - ls := labels.NewSelector() - ls = ls.Add(*velaruntime.Must(labels.NewRequirement(LabelKubeVelaShardID, selection.Exists, nil))) - ls = ls.Add(*velaruntime.Must(labels.NewRequirement("app.kubernetes.io/name", selection.Equals, []string{in.name}))) - options.LabelSelector = ls.String() - }) - in.store, in.informer = cache.NewInformer(lw, &corev1.Pod{}, in.resyncPeriod, cache.ResourceEventHandlerFuncs{ - AddFunc: in._registerPod, - UpdateFunc: func(oldObj, newObj interface{}) { - if k8s.GetLabel(oldObj.(runtime.Object), LabelKubeVelaShardID) != k8s.GetLabel(newObj.(runtime.Object), LabelKubeVelaShardID) { - in._unregisterPod(oldObj) - } - in._registerPod(newObj) - }, - DeleteFunc: in._unregisterPod, - }) - stopCh := ctx.Done() - if stopCh == nil { - stopCh = make(chan struct{}) - } - if in.resyncPeriod > 0 { - go in.resync(stopCh) - } - klog.Infof("dynamicDiscoveryScheduler started") - in.informer.Run(stopCh) -} - -func (in *dynamicDiscoveryScheduler) availableShards() []string { - in.mu.RLock() - defer in.mu.RUnlock() - var available []string - for id, pods := range in.candidates { - if slices.Any(maps.Values(pods), func(x bool) bool { return x }) { - available = append(available, id) - } - } - return available -} - -func (in *dynamicDiscoveryScheduler) schedule() (string, bool) { - available := in.availableShards() - if len(available) == 0 { - return "", false - } - sort.Strings(available) - idx := in.roundRobinIndex.Add(1) % uint32(len(available)) - return available[idx], true -} - -// Schedule get available shard-id for application -func (in *dynamicDiscoveryScheduler) Schedule(o client.Object) bool { - if _, scheduled := GetScheduledShardID(o); !scheduled { - if sid, ok := in.schedule(); ok { - klog.Infof("dynamicDiscoveryScheduler schedule %s %s/%s to shard[%s]", o.GetObjectKind().GroupVersionKind().Kind, o.GetNamespace(), o.GetName(), sid) - SetScheduledShardID(o, sid) - return true - } - klog.Infof("dynamicDiscoveryScheduler no schedulable shard found for %s %s/%s", o.GetObjectKind().GroupVersionKind().Kind, o.GetNamespace(), o.GetName()) - } - return false -} diff --git a/pkg/controller/sharding/suite_test.go b/pkg/controller/sharding/suite_test.go deleted file mode 100644 index d889e2fe3..000000000 --- a/pkg/controller/sharding/suite_test.go +++ /dev/null @@ -1,84 +0,0 @@ -/* -Copyright 2023 The KubeVela Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package sharding_test - -import ( - "context" - "testing" - "time" - - "github.com/kubevela/pkg/util/singleton" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - "github.com/spf13/pflag" - corev1 "k8s.io/api/core/v1" - kerrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/kubernetes/scheme" - "sigs.k8s.io/controller-runtime/pkg/cache" - - "github.com/kubevela/pkg/util/test/bootstrap" - - "github.com/oam-dev/kubevela/pkg/controller/sharding" -) - -func TestSharding(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "Run client package test") -} - -var _ = bootstrap.InitKubeBuilderForTest() - -var _ = Describe("Test sharding", func() { - - It("Test scheduler", func() { - fs := pflag.NewFlagSet("-", pflag.ExitOnError) - sharding.AddFlags(fs) - Ω(fs.Parse([]string{"--enable-sharding", "--shard-id=s", "--schedulable-shards=s,t"})).To(Succeed()) - Ω(sharding.SchedulableShards).To(Equal([]string{"s", "t"})) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - cfg, cli := singleton.KubeConfig.Get(), singleton.KubeClient.Get() - - By("Test static scheduler") - scheduler := sharding.NewStaticScheduler([]string{"s"}) - go scheduler.Start(ctx) - cm1 := &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: "scheduled", Namespace: metav1.NamespaceDefault}} - Ω(scheduler.Schedule(cm1)).To(BeTrue()) - Ω(cli.Create(ctx, cm1)).To(Succeed()) - scheduler = sharding.NewStaticScheduler([]string{""}) - go scheduler.Start(ctx) - cm2 := &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: "unscheduled", Namespace: metav1.NamespaceDefault}} - Ω(scheduler.Schedule(cm1)).To(BeFalse()) - Ω(cli.Create(ctx, cm2)).To(Succeed()) - - By("Test cache") - store, err := sharding.BuildCache(scheme.Scheme, &corev1.ConfigMap{})(cfg, cache.Options{}) - Ω(err).To(Succeed()) - go func() { _ = store.Start(ctx) }() - Eventually(func(g Gomega) { - cms := &corev1.ConfigMapList{} - g.Expect(store.List(ctx, cms)).To(Succeed()) - g.Expect(len(cms.Items)).To(Equal(1)) - g.Expect(cms.Items[0].Name).To(Equal("scheduled")) - g.Expect(kerrors.IsNotFound(store.Get(ctx, types.NamespacedName{Name: cm2.Name, Namespace: cm2.Namespace}, &corev1.ConfigMap{}))).To(BeTrue()) - }).WithTimeout(5 * time.Second).Should(Succeed()) - }) - -}) diff --git a/pkg/controller/sharding/util.go b/pkg/controller/sharding/util.go deleted file mode 100644 index cf6093e54..000000000 --- a/pkg/controller/sharding/util.go +++ /dev/null @@ -1,68 +0,0 @@ -/* -Copyright 2023 The KubeVela Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package sharding - -import ( - "github.com/kubevela/pkg/util/k8s" - "sigs.k8s.io/controller-runtime/pkg/client" -) - -// GetScheduledShardID return the scheduled shard-id of the object and if it is scheduled -func GetScheduledShardID(o client.Object) (string, bool) { - ls := o.GetLabels() - if ls == nil { - return "", false - } - id, found := ls[LabelKubeVelaScheduledShardID] - return id, found && id != "" -} - -// SetScheduledShardID set shard-id to target object -func SetScheduledShardID(o client.Object, id string) { - _ = k8s.AddLabel(o, LabelKubeVelaScheduledShardID, id) -} - -// DelScheduledShardID delete shard-id from target object -func DelScheduledShardID(o client.Object) { - _ = k8s.DeleteLabel(o, LabelKubeVelaScheduledShardID) -} - -// PropagateScheduledShardIDLabel copy the shard-id from source obj to target -// obj, remove if not exist -func PropagateScheduledShardIDLabel(from client.Object, to client.Object) { - if EnableSharding { - sid, ok := GetScheduledShardID(from) - if ok { - SetScheduledShardID(to, sid) - } else { - DelScheduledShardID(to) - } - } -} - -// IsMaster check if current instance is master -func IsMaster() bool { - return ShardID == MasterShardID -} - -// GetShardIDSuffix return suffix for shard id if enabled -func GetShardIDSuffix() string { - if EnableSharding && !IsMaster() { - return "-" + ShardID - } - return "" -} diff --git a/pkg/controller/sharding/util_test.go b/pkg/controller/sharding/util_test.go deleted file mode 100644 index f99ec4842..000000000 --- a/pkg/controller/sharding/util_test.go +++ /dev/null @@ -1,62 +0,0 @@ -/* -Copyright 2023 The KubeVela Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package sharding_test - -import ( - "testing" - - "github.com/stretchr/testify/require" - corev1 "k8s.io/api/core/v1" - - "github.com/oam-dev/kubevela/pkg/controller/sharding" -) - -func TestScheduledShardID(t *testing.T) { - sharding.EnableSharding = true - defer func() { sharding.EnableSharding = false }() - - o := &corev1.ConfigMap{} - sharding.DelScheduledShardID(o) - _, scheduled := sharding.GetScheduledShardID(o) - require.False(t, scheduled) - - sharding.SetScheduledShardID(o, "s") - sid, scheduled := sharding.GetScheduledShardID(o) - require.True(t, scheduled) - require.Equal(t, "s", sid) - - p := &corev1.Secret{} - sharding.PropagateScheduledShardIDLabel(o, p) - sid, scheduled = sharding.GetScheduledShardID(p) - require.True(t, scheduled) - require.Equal(t, "s", sid) - - sharding.DelScheduledShardID(p) - _, scheduled = sharding.GetScheduledShardID(p) - require.False(t, scheduled) - - sharding.PropagateScheduledShardIDLabel(p, o) - _, scheduled = sharding.GetScheduledShardID(o) - require.False(t, scheduled) - - sharding.ShardID = "s" - require.Equal(t, "-s", sharding.GetShardIDSuffix()) - - sharding.ShardID = "master" - require.True(t, sharding.IsMaster()) - require.Equal(t, "", sharding.GetShardIDSuffix()) -} diff --git a/pkg/controller/sharding/vars.go b/pkg/controller/sharding/vars.go deleted file mode 100644 index 9fd78413a..000000000 --- a/pkg/controller/sharding/vars.go +++ /dev/null @@ -1,58 +0,0 @@ -/* -Copyright 2023 The KubeVela Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package sharding - -import ( - "time" - - "github.com/kubevela/pkg/util/singleton" - velaslices "github.com/kubevela/pkg/util/slices" - "k8s.io/klog/v2" - - "github.com/oam-dev/kubevela/apis/types" -) - -const ( - // LabelKubeVelaShardID label key for identify the shard id for the controller - LabelKubeVelaShardID = "controller.core.oam.dev/shard-id" - // LabelKubeVelaScheduledShardID label key for identify the scheduled shard id for the resource - LabelKubeVelaScheduledShardID = "controller.core.oam.dev/scheduled-shard-id" - // MasterShardID the master shard id - MasterShardID = "master" -) - -var ( - // ShardID the id of the shard - ShardID = MasterShardID - // EnableSharding whether enable sharding - EnableSharding bool - // SchedulableShards the shards for schedule - SchedulableShards []string - // DynamicDiscoverySchedulerResyncPeriod resync period for default dynamic discovery scheduler - DynamicDiscoverySchedulerResyncPeriod = 5 * time.Minute -) - -// DefaultApplicationScheduler default application scheduler -var DefaultApplicationScheduler = singleton.NewSingleton[Scheduler](func() Scheduler { - SchedulableShards = velaslices.Filter(SchedulableShards, func(s string) bool { return len(s) > 0 }) - if len(SchedulableShards) > 0 { - klog.Infof("staticScheduler initialized") - return NewStaticScheduler(SchedulableShards) - } - klog.Infof("dynamicDiscoveryScheduler initialized") - return NewDynamicDiscoveryScheduler(types.VelaCoreName, DynamicDiscoverySchedulerResyncPeriod) -}) diff --git a/pkg/resourcetracker/app.go b/pkg/resourcetracker/app.go index 91b298608..7e3c58a9b 100644 --- a/pkg/resourcetracker/app.go +++ b/pkg/resourcetracker/app.go @@ -21,6 +21,7 @@ import ( "fmt" "github.com/crossplane/crossplane-runtime/pkg/meta" + "github.com/kubevela/pkg/controller/sharding" "github.com/kubevela/pkg/util/compression" "github.com/pkg/errors" kerrors "k8s.io/apimachinery/pkg/api/errors" @@ -32,7 +33,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1" - "github.com/oam-dev/kubevela/pkg/controller/sharding" "github.com/oam-dev/kubevela/pkg/features" "github.com/oam-dev/kubevela/pkg/monitor/metrics" "github.com/oam-dev/kubevela/pkg/oam" diff --git a/pkg/utils/app/reschedule.go b/pkg/utils/app/reschedule.go index b162e0795..1b7cf6532 100644 --- a/pkg/utils/app/reschedule.go +++ b/pkg/utils/app/reschedule.go @@ -21,6 +21,7 @@ import ( "reflect" "strings" + "github.com/kubevela/pkg/controller/sharding" "github.com/kubevela/pkg/util/slices" "k8s.io/apimachinery/pkg/util/errors" "k8s.io/klog/v2" @@ -28,7 +29,6 @@ import ( "github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1" "github.com/oam-dev/kubevela/pkg/controller/core.oam.dev/v1alpha2/application" - "github.com/oam-dev/kubevela/pkg/controller/sharding" "github.com/oam-dev/kubevela/pkg/resourcetracker" ) diff --git a/pkg/utils/app/reschedule_test.go b/pkg/utils/app/reschedule_test.go index ee68bea0f..a08613dae 100644 --- a/pkg/utils/app/reschedule_test.go +++ b/pkg/utils/app/reschedule_test.go @@ -20,12 +20,12 @@ import ( "context" "testing" + "github.com/kubevela/pkg/controller/sharding" "github.com/stretchr/testify/require" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" "github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1" - "github.com/oam-dev/kubevela/pkg/controller/sharding" "github.com/oam-dev/kubevela/pkg/oam" apputil "github.com/oam-dev/kubevela/pkg/utils/app" "github.com/oam-dev/kubevela/pkg/utils/common" diff --git a/pkg/webhook/core.oam.dev/v1alpha2/application/mutating_handler.go b/pkg/webhook/core.oam.dev/v1alpha2/application/mutating_handler.go index 6948fdbd9..bf900f2ca 100644 --- a/pkg/webhook/core.oam.dev/v1alpha2/application/mutating_handler.go +++ b/pkg/webhook/core.oam.dev/v1alpha2/application/mutating_handler.go @@ -22,6 +22,7 @@ import ( "fmt" "net/http" + "github.com/kubevela/pkg/controller/sharding" "github.com/pkg/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilfeature "k8s.io/apiserver/pkg/util/feature" @@ -33,7 +34,6 @@ import ( "github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1" "github.com/oam-dev/kubevela/pkg/auth" - "github.com/oam-dev/kubevela/pkg/controller/sharding" "github.com/oam-dev/kubevela/pkg/features" "github.com/oam-dev/kubevela/pkg/oam" "github.com/oam-dev/kubevela/pkg/utils" @@ -93,7 +93,7 @@ func (h *MutatingHandler) handleSharding(ctx context.Context, req admission.Requ sharding.SetScheduledShardID(newApp, oid) return true, nil } - return sharding.DefaultApplicationScheduler.Get().Schedule(newApp), nil + return sharding.DefaultScheduler.Get().Schedule(newApp), nil } return false, nil } diff --git a/pkg/webhook/core.oam.dev/v1alpha2/application/validation.go b/pkg/webhook/core.oam.dev/v1alpha2/application/validation.go index a1e300d27..1084987ef 100644 --- a/pkg/webhook/core.oam.dev/v1alpha2/application/validation.go +++ b/pkg/webhook/core.oam.dev/v1alpha2/application/validation.go @@ -21,6 +21,7 @@ import ( "fmt" "time" + "github.com/kubevela/pkg/controller/sharding" "github.com/kubevela/prism/pkg/util/singleton" appsv1 "k8s.io/api/apps/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -30,7 +31,6 @@ import ( "github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1" "github.com/oam-dev/kubevela/pkg/appfile" - "github.com/oam-dev/kubevela/pkg/controller/sharding" "github.com/oam-dev/kubevela/pkg/features" "github.com/oam-dev/kubevela/pkg/oam" "github.com/oam-dev/kubevela/pkg/oam/util" diff --git a/references/cli/up.go b/references/cli/up.go index fce28a44f..c3f53a911 100644 --- a/references/cli/up.go +++ b/references/cli/up.go @@ -22,6 +22,7 @@ import ( "os" "time" + "github.com/kubevela/pkg/controller/sharding" "github.com/pkg/errors" "github.com/spf13/cobra" apitypes "k8s.io/apimachinery/pkg/types" @@ -35,7 +36,6 @@ import ( "github.com/oam-dev/kubevela/apis/types" velacmd "github.com/oam-dev/kubevela/pkg/cmd" cmdutil "github.com/oam-dev/kubevela/pkg/cmd/util" - "github.com/oam-dev/kubevela/pkg/controller/sharding" "github.com/oam-dev/kubevela/pkg/oam" pkgUtils "github.com/oam-dev/kubevela/pkg/utils" utilapp "github.com/oam-dev/kubevela/pkg/utils/app"