mirror of
https://github.com/kubevela/kubevela.git
synced 2026-02-14 18:10:21 +00:00
Feat: reuse informer for app metrics (#4336)
Signed-off-by: Somefive <yd219913@alibaba-inc.com>
This commit is contained in:
@@ -36,6 +36,7 @@ import (
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/healthz"
|
||||
|
||||
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
|
||||
"github.com/oam-dev/kubevela/apis/types"
|
||||
"github.com/oam-dev/kubevela/pkg/auth"
|
||||
ctrlClient "github.com/oam-dev/kubevela/pkg/client"
|
||||
@@ -324,10 +325,11 @@ func main() {
|
||||
klog.InfoS("Use storage driver", "storageDriver", os.Getenv(system.StorageDriverEnv))
|
||||
|
||||
klog.Info("Start the vela application monitor")
|
||||
if err := watcher.StartApplicationMetricsWatcher(restConfig); err != nil {
|
||||
klog.ErrorS(err, "Unable to start application metrics watcher")
|
||||
os.Exit(1)
|
||||
informer, err := mgr.GetCache().GetInformer(context.Background(), &v1beta1.Application{})
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Unable to get informer for application")
|
||||
}
|
||||
watcher.StartApplicationMetricsWatcher(informer)
|
||||
|
||||
klog.Info("Start the vela controller manager")
|
||||
|
||||
|
||||
@@ -23,11 +23,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/dynamic/dynamicinformer"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache"
|
||||
|
||||
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
|
||||
"github.com/oam-dev/kubevela/pkg/monitor/metrics"
|
||||
@@ -39,44 +36,10 @@ type applicationMetricsWatcher struct {
|
||||
stepPhaseCounter map[string]int
|
||||
phaseDirty map[string]struct{}
|
||||
stepPhaseDirty map[string]struct{}
|
||||
informer cache.SharedIndexInformer
|
||||
informer ctrlcache.Informer
|
||||
stopCh chan struct{}
|
||||
}
|
||||
|
||||
func newApplicationMetricsWatcher(cfg *rest.Config) (*applicationMetricsWatcher, error) {
|
||||
dc, err := dynamic.NewForConfig(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
f := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dc, 0, metav1.NamespaceAll, nil)
|
||||
informer := f.ForResource(v1beta1.SchemeGroupVersion.WithResource("applications")).Informer()
|
||||
watcher := &applicationMetricsWatcher{
|
||||
phaseCounter: map[string]int{},
|
||||
stepPhaseCounter: map[string]int{},
|
||||
phaseDirty: map[string]struct{}{},
|
||||
stepPhaseDirty: map[string]struct{}{},
|
||||
informer: informer,
|
||||
stopCh: make(chan struct{}, 1),
|
||||
}
|
||||
watcher.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
app := watcher.getApp(obj)
|
||||
watcher.inc(app, 1)
|
||||
},
|
||||
UpdateFunc: func(oldObj, obj interface{}) {
|
||||
oldApp := watcher.getApp(oldObj)
|
||||
app := watcher.getApp(obj)
|
||||
watcher.inc(oldApp, -1)
|
||||
watcher.inc(app, 1)
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
app := watcher.getApp(obj)
|
||||
watcher.inc(app, -1)
|
||||
},
|
||||
})
|
||||
return watcher, nil
|
||||
}
|
||||
|
||||
func (watcher *applicationMetricsWatcher) getApp(obj interface{}) *v1beta1.Application {
|
||||
app := &v1beta1.Application{}
|
||||
bs, _ := json.Marshal(obj)
|
||||
@@ -132,15 +95,33 @@ func (watcher *applicationMetricsWatcher) run() {
|
||||
}
|
||||
}
|
||||
}()
|
||||
go watcher.informer.Run(watcher.stopCh)
|
||||
}
|
||||
|
||||
// StartApplicationMetricsWatcher start metrics watcher for reporting application stats
|
||||
func StartApplicationMetricsWatcher(cfg *rest.Config) error {
|
||||
watcher, err := newApplicationMetricsWatcher(cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
func StartApplicationMetricsWatcher(informer ctrlcache.Informer) {
|
||||
watcher := &applicationMetricsWatcher{
|
||||
phaseCounter: map[string]int{},
|
||||
stepPhaseCounter: map[string]int{},
|
||||
phaseDirty: map[string]struct{}{},
|
||||
stepPhaseDirty: map[string]struct{}{},
|
||||
informer: informer,
|
||||
stopCh: make(chan struct{}, 1),
|
||||
}
|
||||
watcher.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
app := watcher.getApp(obj)
|
||||
watcher.inc(app, 1)
|
||||
},
|
||||
UpdateFunc: func(oldObj, obj interface{}) {
|
||||
oldApp := watcher.getApp(oldObj)
|
||||
app := watcher.getApp(obj)
|
||||
watcher.inc(oldApp, -1)
|
||||
watcher.inc(app, 1)
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
app := watcher.getApp(obj)
|
||||
watcher.inc(app, -1)
|
||||
},
|
||||
})
|
||||
watcher.run()
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user