mirror of
https://github.com/resmoio/kubernetes-event-exporter.git
synced 2026-02-14 14:39:50 +00:00
Add owner references to EnhancedEvent, consolidate calls to apiserver, make cache size configurable and add metrics for reads (#144)
* Adds ownerReferences to the exported events This commit uses the same approach as labels and annotations and adds ownerReferences to the EnhancedEvent struct. The flow is as follows: * use an LRU cache to store the ownerReferences with object UID as the key * if the object doesn't exist in cache, look up using dynamic client and store it in cache * if the object exists in cache, return the value from cache * Reduce the number of GetObject calls by using a single cache to store all labels, annotations and ownerReferences Currently, every time there's an event, the events exporter runs GetObject for metadata like labels and annotations independently. This results in the same object being looked up multiple times for different pieces of the metadata. These number of calls grow as we want to look up additional information about the object like ownerReferences. So, in this change, a struct called `ObjectMetadata` is created to capture all the pieces of information that need to be added to the EnhancedEvent. And every time there's an event, the object is fetched from the kube-apiserver if it's not in the cache already and all pieces of metadata require only 1 call. The metadata is cached so repeated events about the same object don't result in more calls. Additionally, UID + ResourceVersion is used the cacheKey so if the object changes, it's looked up again. One more change here is introduction of a `deleted` field in the `EnhancedEvent.InvolvedObject` to capture whether the object is deleted. This helps receivers identify whether a resource is deleted and create rules for it when needed. Tests are added for these updates and the mock functions are moved to the test files. * Make the cache size configurable * Add metrics for number of reads served from cache and kube-apiserver respectively * Add `deleted` field to the involvedObject in EnhancedEvent to identify whether the resource is being deleted
This commit is contained in:
4
main.go
4
main.go
@@ -62,6 +62,8 @@ func main() {
|
||||
log.Fatal().Str("log_format", cfg.LogFormat).Msg("Unknown log format")
|
||||
}
|
||||
|
||||
cfg.SetDefaults()
|
||||
|
||||
if err := cfg.Validate(); err != nil {
|
||||
log.Fatal().Err(err).Msg("config validation failed")
|
||||
}
|
||||
@@ -87,7 +89,7 @@ func main() {
|
||||
}
|
||||
}
|
||||
|
||||
w := kube.NewEventWatcher(kubecfg, cfg.Namespace, cfg.MaxEventAgeSeconds, metricsStore, onEvent, cfg.OmitLookup)
|
||||
w := kube.NewEventWatcher(kubecfg, cfg.Namespace, cfg.MaxEventAgeSeconds, metricsStore, onEvent, cfg.OmitLookup, cfg.CacheSize)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
leaderLost := make(chan bool)
|
||||
|
||||
@@ -2,12 +2,18 @@ package exporter
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strconv"
|
||||
|
||||
"github.com/resmoio/kubernetes-event-exporter/pkg/kube"
|
||||
"github.com/resmoio/kubernetes-event-exporter/pkg/sinks"
|
||||
"github.com/rs/zerolog/log"
|
||||
"k8s.io/client-go/rest"
|
||||
)
|
||||
|
||||
const (
|
||||
DefaultCacheSize = 1024
|
||||
)
|
||||
|
||||
// Config allows configuration
|
||||
@@ -28,6 +34,24 @@ type Config struct {
|
||||
KubeBurst int `yaml:"kubeBurst,omitempty"`
|
||||
MetricsNamePrefix string `yaml:"metricsNamePrefix,omitempty"`
|
||||
OmitLookup bool `yaml:"omitLookup,omitempty"`
|
||||
CacheSize int `yaml:"cacheSize,omitempty"`
|
||||
}
|
||||
|
||||
func (c *Config) SetDefaults() {
|
||||
if c.CacheSize == 0 {
|
||||
c.CacheSize = DefaultCacheSize
|
||||
log.Debug().Msg("setting config.cacheSize=1024 (default)")
|
||||
}
|
||||
|
||||
if c.KubeBurst == 0 {
|
||||
c.KubeBurst = rest.DefaultBurst
|
||||
log.Debug().Msg(fmt.Sprintf("setting config.kubeBurst=%d (default)", rest.DefaultBurst))
|
||||
}
|
||||
|
||||
if c.KubeQPS == 0 {
|
||||
c.KubeQPS = rest.DefaultQPS
|
||||
log.Debug().Msg(fmt.Sprintf("setting config.kubeQPS=%.2f (default)", rest.DefaultQPS))
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Config) Validate() error {
|
||||
@@ -54,7 +78,7 @@ func (c *Config) validateDefaults() error {
|
||||
func (c *Config) validateMaxEventAgeSeconds() error {
|
||||
if c.ThrottlePeriod == 0 && c.MaxEventAgeSeconds == 0 {
|
||||
c.MaxEventAgeSeconds = 5
|
||||
log.Info().Msg("set config.maxEventAgeSeconds=5 (default)")
|
||||
log.Info().Msg("setting config.maxEventAgeSeconds=5 (default)")
|
||||
} else if c.ThrottlePeriod != 0 && c.MaxEventAgeSeconds != 0 {
|
||||
log.Error().Msg("cannot set both throttlePeriod (depricated) and MaxEventAgeSeconds")
|
||||
return errors.New("validateMaxEventAgeSeconds failed")
|
||||
|
||||
@@ -7,6 +7,8 @@ import (
|
||||
"github.com/goccy/go-yaml"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"k8s.io/client-go/rest"
|
||||
)
|
||||
|
||||
func readConfig(t *testing.T, yml string) Config {
|
||||
@@ -148,3 +150,11 @@ func TestValidate_MetricsNamePrefix_WhenInvalid(t *testing.T) {
|
||||
assert.Contains(t, output.String(), "config.metricsNamePrefix should match the regex: ^[a-zA-Z][a-zA-Z0-9_:]*_$")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetDefaults(t *testing.T) {
|
||||
config := Config{}
|
||||
config.SetDefaults()
|
||||
require.Equal(t, DefaultCacheSize, config.CacheSize)
|
||||
require.Equal(t, rest.DefaultQPS, config.KubeQPS)
|
||||
require.Equal(t, rest.DefaultBurst, config.KubeBurst)
|
||||
}
|
||||
|
||||
@@ -1,72 +0,0 @@
|
||||
package kube
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/rest"
|
||||
)
|
||||
|
||||
type AnnotationCache struct {
|
||||
dynClient dynamic.Interface
|
||||
clientset *kubernetes.Clientset
|
||||
|
||||
cache *lru.ARCCache
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
func NewAnnotationCache(kubeconfig *rest.Config) *AnnotationCache {
|
||||
cache, err := lru.NewARC(1024)
|
||||
if err != nil {
|
||||
panic("cannot init cache: " + err.Error())
|
||||
}
|
||||
return &AnnotationCache{
|
||||
dynClient: dynamic.NewForConfigOrDie(kubeconfig),
|
||||
clientset: kubernetes.NewForConfigOrDie(kubeconfig),
|
||||
cache: cache,
|
||||
}
|
||||
}
|
||||
|
||||
func (a *AnnotationCache) GetAnnotationsWithCache(reference *v1.ObjectReference) (map[string]string, error) {
|
||||
uid := reference.UID
|
||||
|
||||
if val, ok := a.cache.Get(uid); ok {
|
||||
return val.(map[string]string), nil
|
||||
}
|
||||
|
||||
obj, err := GetObject(reference, a.clientset, a.dynClient)
|
||||
if err == nil {
|
||||
annotations := obj.GetAnnotations()
|
||||
for key := range annotations {
|
||||
if strings.Contains(key, "kubernetes.io/") || strings.Contains(key, "k8s.io/") {
|
||||
delete(annotations, key)
|
||||
}
|
||||
}
|
||||
a.cache.Add(uid, annotations)
|
||||
return annotations, nil
|
||||
}
|
||||
|
||||
if errors.IsNotFound(err) {
|
||||
var empty map[string]string
|
||||
a.cache.Add(uid, empty)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return nil, err
|
||||
|
||||
}
|
||||
|
||||
func NewMockAnnotationCache() *AnnotationCache {
|
||||
cache, _ := lru.NewARC(1024)
|
||||
uid := types.UID("test")
|
||||
cache.Add(uid, map[string]string{"test": "test"})
|
||||
return &AnnotationCache{
|
||||
cache: cache,
|
||||
}
|
||||
}
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"time"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
type EnhancedEvent struct {
|
||||
@@ -40,8 +41,10 @@ func dedotMap(in map[string]string) map[string]string {
|
||||
|
||||
type EnhancedObjectReference struct {
|
||||
corev1.ObjectReference `json:",inline"`
|
||||
Labels map[string]string `json:"labels,omitempty"`
|
||||
Annotations map[string]string `json:"annotations,omitempty"`
|
||||
Labels map[string]string `json:"labels,omitempty"`
|
||||
Annotations map[string]string `json:"annotations,omitempty"`
|
||||
OwnerReferences []metav1.OwnerReference `json:"ownerReferences,omitempty"`
|
||||
Deleted bool `json:"deleted"`
|
||||
}
|
||||
|
||||
// ToJSON does not return an error because we are %99 confident it is JSON serializable.
|
||||
|
||||
@@ -1,68 +0,0 @@
|
||||
package kube
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/rest"
|
||||
)
|
||||
|
||||
type LabelCache struct {
|
||||
dynClient dynamic.Interface
|
||||
clientset *kubernetes.Clientset
|
||||
|
||||
cache *lru.ARCCache
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
func NewLabelCache(kubeconfig *rest.Config) *LabelCache {
|
||||
cache, err := lru.NewARC(1024)
|
||||
if err != nil {
|
||||
panic("cannot init cache: " + err.Error())
|
||||
}
|
||||
return &LabelCache{
|
||||
dynClient: dynamic.NewForConfigOrDie(kubeconfig),
|
||||
clientset: kubernetes.NewForConfigOrDie(kubeconfig),
|
||||
cache: cache,
|
||||
}
|
||||
}
|
||||
|
||||
func (l *LabelCache) GetLabelsWithCache(reference *v1.ObjectReference) (map[string]string, error) {
|
||||
uid := reference.UID
|
||||
|
||||
if val, ok := l.cache.Get(uid); ok {
|
||||
return val.(map[string]string), nil
|
||||
}
|
||||
|
||||
obj, err := GetObject(reference, l.clientset, l.dynClient)
|
||||
if err == nil {
|
||||
labels := obj.GetLabels()
|
||||
l.cache.Add(uid, labels)
|
||||
return labels, nil
|
||||
}
|
||||
|
||||
if errors.IsNotFound(err) {
|
||||
// There can be events without the involved objects existing, they seem to be not garbage collected?
|
||||
// Marking it nil so that we can return faster
|
||||
var empty map[string]string
|
||||
l.cache.Add(uid, empty)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// An non-ignorable error occurred
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func NewMockLabelCache() *LabelCache {
|
||||
cache, _ := lru.NewARC(1024)
|
||||
uid := types.UID("test")
|
||||
cache.Add(uid, map[string]string{"test": "test"})
|
||||
return &LabelCache{
|
||||
cache: cache,
|
||||
}
|
||||
}
|
||||
@@ -2,17 +2,57 @@ package kube
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
"github.com/resmoio/kubernetes-event-exporter/pkg/metrics"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/restmapper"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func GetObject(reference *v1.ObjectReference, clientset *kubernetes.Clientset, dynClient dynamic.Interface) (*unstructured.Unstructured, error) {
|
||||
type ObjectMetadataProvider interface {
|
||||
GetObjectMetadata(reference *v1.ObjectReference, clientset *kubernetes.Clientset, dynClient dynamic.Interface, metricsStore *metrics.Store) (ObjectMetadata, error)
|
||||
}
|
||||
|
||||
type ObjectMetadataCache struct {
|
||||
cache *lru.ARCCache
|
||||
}
|
||||
|
||||
var _ ObjectMetadataProvider = &ObjectMetadataCache{}
|
||||
|
||||
type ObjectMetadata struct {
|
||||
Annotations map[string]string
|
||||
Labels map[string]string
|
||||
OwnerReferences []metav1.OwnerReference
|
||||
Deleted bool
|
||||
}
|
||||
|
||||
func NewObjectMetadataProvider(size int) ObjectMetadataProvider {
|
||||
cache, err := lru.NewARC(size)
|
||||
if err != nil {
|
||||
panic("cannot init cache: " + err.Error())
|
||||
}
|
||||
|
||||
var o ObjectMetadataProvider = &ObjectMetadataCache{
|
||||
cache: cache,
|
||||
}
|
||||
|
||||
return o
|
||||
}
|
||||
|
||||
func (o *ObjectMetadataCache) GetObjectMetadata(reference *v1.ObjectReference, clientset *kubernetes.Clientset, dynClient dynamic.Interface, metricsStore *metrics.Store) (ObjectMetadata, error) {
|
||||
// ResourceVersion changes when the object is updated.
|
||||
// We use "UID/ResourceVersion" as cache key so that if the object is updated we get the new metadata.
|
||||
cacheKey := strings.Join([]string{string(reference.UID), reference.ResourceVersion}, "/")
|
||||
if val, ok := o.cache.Get(cacheKey); ok {
|
||||
metricsStore.KubeApiReadCacheHits.Inc()
|
||||
return val.(ObjectMetadata), nil
|
||||
}
|
||||
|
||||
var group, version string
|
||||
s := strings.Split(reference.APIVersion, "/")
|
||||
if len(s) == 1 {
|
||||
@@ -27,13 +67,13 @@ func GetObject(reference *v1.ObjectReference, clientset *kubernetes.Clientset, d
|
||||
|
||||
groupResources, err := restmapper.GetAPIGroupResources(clientset.Discovery())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return ObjectMetadata{}, err
|
||||
}
|
||||
|
||||
rm := restmapper.NewDiscoveryRESTMapper(groupResources)
|
||||
mapping, err := rm.RESTMapping(gk, version)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return ObjectMetadata{}, err
|
||||
}
|
||||
|
||||
item, err := dynClient.
|
||||
@@ -41,9 +81,22 @@ func GetObject(reference *v1.ObjectReference, clientset *kubernetes.Clientset, d
|
||||
Namespace(reference.Namespace).
|
||||
Get(context.Background(), reference.Name, metav1.GetOptions{})
|
||||
|
||||
metricsStore.KubeApiReadRequests.Inc()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return ObjectMetadata{}, err
|
||||
}
|
||||
|
||||
return item, nil
|
||||
objectMetadata := ObjectMetadata{
|
||||
OwnerReferences: item.GetOwnerReferences(),
|
||||
Labels: item.GetLabels(),
|
||||
Annotations: item.GetAnnotations(),
|
||||
}
|
||||
|
||||
if item.GetDeletionTimestamp() != nil {
|
||||
objectMetadata.Deleted = true
|
||||
}
|
||||
|
||||
o.cache.Add(cacheKey, objectMetadata)
|
||||
return objectMetadata, nil
|
||||
}
|
||||
|
||||
@@ -6,6 +6,8 @@ import (
|
||||
"github.com/resmoio/kubernetes-event-exporter/pkg/metrics"
|
||||
"github.com/rs/zerolog/log"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/rest"
|
||||
@@ -17,30 +19,32 @@ var startUpTime = time.Now()
|
||||
type EventHandler func(event *EnhancedEvent)
|
||||
|
||||
type EventWatcher struct {
|
||||
informer cache.SharedInformer
|
||||
stopper chan struct{}
|
||||
labelCache *LabelCache
|
||||
omitLookup bool
|
||||
annotationCache *AnnotationCache
|
||||
fn EventHandler
|
||||
maxEventAgeSeconds time.Duration
|
||||
metricsStore *metrics.Store
|
||||
informer cache.SharedInformer
|
||||
stopper chan struct{}
|
||||
objectMetadataCache ObjectMetadataProvider
|
||||
omitLookup bool
|
||||
fn EventHandler
|
||||
maxEventAgeSeconds time.Duration
|
||||
metricsStore *metrics.Store
|
||||
dynamicClient *dynamic.DynamicClient
|
||||
clientset *kubernetes.Clientset
|
||||
}
|
||||
|
||||
func NewEventWatcher(config *rest.Config, namespace string, MaxEventAgeSeconds int64, metricsStore *metrics.Store, fn EventHandler, omitLookup bool) *EventWatcher {
|
||||
func NewEventWatcher(config *rest.Config, namespace string, MaxEventAgeSeconds int64, metricsStore *metrics.Store, fn EventHandler, omitLookup bool, cacheSize int) *EventWatcher {
|
||||
clientset := kubernetes.NewForConfigOrDie(config)
|
||||
factory := informers.NewSharedInformerFactoryWithOptions(clientset, 0, informers.WithNamespace(namespace))
|
||||
informer := factory.Core().V1().Events().Informer()
|
||||
|
||||
watcher := &EventWatcher{
|
||||
informer: informer,
|
||||
stopper: make(chan struct{}),
|
||||
labelCache: NewLabelCache(config),
|
||||
omitLookup: omitLookup,
|
||||
annotationCache: NewAnnotationCache(config),
|
||||
fn: fn,
|
||||
maxEventAgeSeconds: time.Second * time.Duration(MaxEventAgeSeconds),
|
||||
metricsStore: metricsStore,
|
||||
informer: informer,
|
||||
stopper: make(chan struct{}),
|
||||
objectMetadataCache: NewObjectMetadataProvider(cacheSize),
|
||||
omitLookup: omitLookup,
|
||||
fn: fn,
|
||||
maxEventAgeSeconds: time.Second * time.Duration(MaxEventAgeSeconds),
|
||||
metricsStore: metricsStore,
|
||||
dynamicClient: dynamic.NewForConfigOrDie(config),
|
||||
clientset: clientset,
|
||||
}
|
||||
|
||||
informer.AddEventHandler(watcher)
|
||||
@@ -105,29 +109,21 @@ func (e *EventWatcher) onEvent(event *corev1.Event) {
|
||||
if e.omitLookup {
|
||||
ev.InvolvedObject.ObjectReference = *event.InvolvedObject.DeepCopy()
|
||||
} else {
|
||||
labels, err := e.labelCache.GetLabelsWithCache(&event.InvolvedObject)
|
||||
objectMetadata, err := e.objectMetadataCache.GetObjectMetadata(&event.InvolvedObject, e.clientset, e.dynamicClient, e.metricsStore)
|
||||
if err != nil {
|
||||
if ev.InvolvedObject.Kind != "CustomResourceDefinition" {
|
||||
log.Error().Err(err).Msg("Cannot list labels of the object")
|
||||
if errors.IsNotFound(err) {
|
||||
ev.InvolvedObject.Deleted = true
|
||||
log.Error().Err(err).Msg("Object not found, likely deleted")
|
||||
} else {
|
||||
log.Debug().Err(err).Msg("Cannot list labels of the object (CRD)")
|
||||
log.Error().Err(err).Msg("Failed to get object metadata")
|
||||
}
|
||||
// Ignoring error, but log it anyways
|
||||
} else {
|
||||
ev.InvolvedObject.Labels = labels
|
||||
ev.InvolvedObject.ObjectReference = *event.InvolvedObject.DeepCopy()
|
||||
}
|
||||
|
||||
annotations, err := e.annotationCache.GetAnnotationsWithCache(&event.InvolvedObject)
|
||||
if err != nil {
|
||||
if ev.InvolvedObject.Kind != "CustomResourceDefinition" {
|
||||
log.Error().Err(err).Msg("Cannot list annotations of the object")
|
||||
} else {
|
||||
log.Debug().Err(err).Msg("Cannot list annotations of the object (CRD)")
|
||||
}
|
||||
} else {
|
||||
ev.InvolvedObject.Annotations = annotations
|
||||
ev.InvolvedObject.Labels = objectMetadata.Labels
|
||||
ev.InvolvedObject.Annotations = objectMetadata.Annotations
|
||||
ev.InvolvedObject.OwnerReferences = objectMetadata.OwnerReferences
|
||||
ev.InvolvedObject.ObjectReference = *event.InvolvedObject.DeepCopy()
|
||||
ev.InvolvedObject.Deleted = objectMetadata.Deleted
|
||||
}
|
||||
}
|
||||
|
||||
@@ -147,17 +143,6 @@ func (e *EventWatcher) Stop() {
|
||||
close(e.stopper)
|
||||
}
|
||||
|
||||
func NewMockEventWatcher(MaxEventAgeSeconds int64, metricsStore *metrics.Store) *EventWatcher {
|
||||
watcher := &EventWatcher{
|
||||
labelCache: NewMockLabelCache(),
|
||||
annotationCache: NewMockAnnotationCache(),
|
||||
maxEventAgeSeconds: time.Second * time.Duration(MaxEventAgeSeconds),
|
||||
fn: func(event *EnhancedEvent) {},
|
||||
metricsStore: metricsStore,
|
||||
}
|
||||
return watcher
|
||||
}
|
||||
|
||||
func (e *EventWatcher) setStartUpTime(time time.Time) {
|
||||
startUpTime = time
|
||||
}
|
||||
|
||||
@@ -5,19 +5,79 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
"github.com/prometheus/client_golang/prometheus/testutil"
|
||||
"github.com/resmoio/kubernetes-event-exporter/pkg/metrics"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
)
|
||||
|
||||
type mockObjectMetadataProvider struct {
|
||||
cache *lru.ARCCache
|
||||
objDeleted bool
|
||||
}
|
||||
|
||||
func newMockObjectMetadataProvider() ObjectMetadataProvider {
|
||||
cache, err := lru.NewARC(1024)
|
||||
if err != nil {
|
||||
panic("cannot init cache: " + err.Error())
|
||||
}
|
||||
|
||||
cache.Add("test", ObjectMetadata{
|
||||
Annotations: map[string]string{"test": "test"},
|
||||
Labels: map[string]string{"test": "test"},
|
||||
OwnerReferences: []metav1.OwnerReference{
|
||||
{
|
||||
APIVersion: "testAPI",
|
||||
Kind: "testKind",
|
||||
Name: "testOwner",
|
||||
UID: "testOwner",
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
var o ObjectMetadataProvider = &mockObjectMetadataProvider{
|
||||
cache: cache,
|
||||
objDeleted: false,
|
||||
}
|
||||
|
||||
return o
|
||||
}
|
||||
|
||||
func (o *mockObjectMetadataProvider) GetObjectMetadata(reference *corev1.ObjectReference, clientset *kubernetes.Clientset, dynClient dynamic.Interface, metricsStore *metrics.Store) (ObjectMetadata, error) {
|
||||
if o.objDeleted {
|
||||
return ObjectMetadata{}, errors.NewNotFound(schema.GroupResource{}, "")
|
||||
}
|
||||
|
||||
val, _ := o.cache.Get("test")
|
||||
return val.(ObjectMetadata), nil
|
||||
}
|
||||
|
||||
var _ ObjectMetadataProvider = &mockObjectMetadataProvider{}
|
||||
|
||||
func newMockEventWatcher(MaxEventAgeSeconds int64, metricsStore *metrics.Store) *EventWatcher {
|
||||
watcher := &EventWatcher{
|
||||
objectMetadataCache: newMockObjectMetadataProvider(),
|
||||
maxEventAgeSeconds: time.Second * time.Duration(MaxEventAgeSeconds),
|
||||
fn: func(event *EnhancedEvent) {},
|
||||
metricsStore: metricsStore,
|
||||
}
|
||||
return watcher
|
||||
}
|
||||
|
||||
func TestEventWatcher_EventAge_whenEventCreatedBeforeStartup(t *testing.T) {
|
||||
// should not discard events as old as 300s=5m
|
||||
var MaxEventAgeSeconds int64 = 300
|
||||
metricsStore := metrics.NewMetricsStore("test_")
|
||||
ew := NewMockEventWatcher(MaxEventAgeSeconds, metricsStore)
|
||||
ew := newMockEventWatcher(MaxEventAgeSeconds, metricsStore)
|
||||
output := &bytes.Buffer{}
|
||||
log.Logger = log.Logger.Output(output)
|
||||
|
||||
@@ -64,7 +124,7 @@ func TestEventWatcher_EventAge_whenEventCreatedAfterStartupAndBeforeMaxAge(t *te
|
||||
// should not discard events as old as 300s=5m
|
||||
var MaxEventAgeSeconds int64 = 300
|
||||
metricsStore := metrics.NewMetricsStore("test_")
|
||||
ew := NewMockEventWatcher(MaxEventAgeSeconds, metricsStore)
|
||||
ew := newMockEventWatcher(MaxEventAgeSeconds, metricsStore)
|
||||
output := &bytes.Buffer{}
|
||||
log.Logger = log.Logger.Output(output)
|
||||
|
||||
@@ -126,7 +186,7 @@ func TestEventWatcher_EventAge_whenEventCreatedAfterStartupAndAfterMaxAge(t *tes
|
||||
// should not discard events as old as 300s=5m
|
||||
var MaxEventAgeSeconds int64 = 300
|
||||
metricsStore := metrics.NewMetricsStore("test_")
|
||||
ew := NewMockEventWatcher(MaxEventAgeSeconds, metricsStore)
|
||||
ew := newMockEventWatcher(MaxEventAgeSeconds, metricsStore)
|
||||
output := &bytes.Buffer{}
|
||||
log.Logger = log.Logger.Output(output)
|
||||
|
||||
@@ -173,3 +233,71 @@ func TestEventWatcher_EventAge_whenEventCreatedAfterStartupAndAfterMaxAge(t *tes
|
||||
|
||||
metrics.DestroyMetricsStore(metricsStore)
|
||||
}
|
||||
|
||||
func TestOnEvent_WithObjectMetadata(t *testing.T) {
|
||||
metricsStore := metrics.NewMetricsStore("test_")
|
||||
defer metrics.DestroyMetricsStore(metricsStore)
|
||||
ew := newMockEventWatcher(300, metricsStore)
|
||||
|
||||
event := EnhancedEvent{}
|
||||
ew.fn = func(e *EnhancedEvent) {
|
||||
event = *e
|
||||
}
|
||||
|
||||
startup := time.Now().Add(-10 * time.Minute)
|
||||
ew.setStartUpTime(startup)
|
||||
event1 := corev1.Event{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "event1"},
|
||||
LastTimestamp: metav1.Time{Time: startup.Add(8 * time.Minute)},
|
||||
InvolvedObject: corev1.ObjectReference{
|
||||
UID: "test",
|
||||
Name: "test-1",
|
||||
},
|
||||
}
|
||||
ew.onEvent(&event1)
|
||||
|
||||
require.Equal(t, types.UID("test"), event.InvolvedObject.UID)
|
||||
require.Equal(t, "test-1", event.InvolvedObject.Name)
|
||||
require.Equal(t, map[string]string{"test": "test"}, event.InvolvedObject.Annotations)
|
||||
require.Equal(t, map[string]string{"test": "test"}, event.InvolvedObject.Labels)
|
||||
require.Equal(t, []metav1.OwnerReference{
|
||||
{
|
||||
APIVersion: "testAPI",
|
||||
Kind: "testKind",
|
||||
Name: "testOwner",
|
||||
UID: "testOwner",
|
||||
},
|
||||
}, event.InvolvedObject.OwnerReferences)
|
||||
}
|
||||
|
||||
func TestOnEvent_DeletedObjects(t *testing.T) {
|
||||
metricsStore := metrics.NewMetricsStore("test_")
|
||||
defer metrics.DestroyMetricsStore(metricsStore)
|
||||
ew := newMockEventWatcher(300, metricsStore)
|
||||
ew.objectMetadataCache.(*mockObjectMetadataProvider).objDeleted = true
|
||||
|
||||
event := EnhancedEvent{}
|
||||
ew.fn = func(e *EnhancedEvent) {
|
||||
event = *e
|
||||
}
|
||||
|
||||
startup := time.Now().Add(-10 * time.Minute)
|
||||
ew.setStartUpTime(startup)
|
||||
event1 := corev1.Event{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "event1"},
|
||||
LastTimestamp: metav1.Time{Time: startup.Add(8 * time.Minute)},
|
||||
InvolvedObject: corev1.ObjectReference{
|
||||
UID: "test",
|
||||
Name: "test-1",
|
||||
},
|
||||
}
|
||||
|
||||
ew.onEvent(&event1)
|
||||
|
||||
require.Equal(t, types.UID("test"), event.InvolvedObject.UID)
|
||||
require.Equal(t, "test-1", event.InvolvedObject.Name)
|
||||
require.Equal(t, true, event.InvolvedObject.Deleted)
|
||||
require.Equal(t, map[string]string(nil), event.InvolvedObject.Annotations)
|
||||
require.Equal(t, map[string]string(nil), event.InvolvedObject.Labels)
|
||||
require.Equal(t, []metav1.OwnerReference(nil), event.InvolvedObject.OwnerReferences)
|
||||
}
|
||||
|
||||
@@ -15,11 +15,13 @@ import (
|
||||
)
|
||||
|
||||
type Store struct {
|
||||
EventsProcessed prometheus.Counter
|
||||
EventsDiscarded prometheus.Counter
|
||||
WatchErrors prometheus.Counter
|
||||
SendErrors prometheus.Counter
|
||||
BuildInfo prometheus.GaugeFunc
|
||||
EventsProcessed prometheus.Counter
|
||||
EventsDiscarded prometheus.Counter
|
||||
WatchErrors prometheus.Counter
|
||||
SendErrors prometheus.Counter
|
||||
BuildInfo prometheus.GaugeFunc
|
||||
KubeApiReadCacheHits prometheus.Counter
|
||||
KubeApiReadRequests prometheus.Counter
|
||||
}
|
||||
|
||||
// promLogger implements promhttp.Logger
|
||||
@@ -119,6 +121,14 @@ func NewMetricsStore(name_prefix string) *Store {
|
||||
Name: name_prefix + "send_event_errors",
|
||||
Help: "The total number of send event errors",
|
||||
}),
|
||||
KubeApiReadCacheHits: promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: name_prefix + "kube_api_read_cache_hits",
|
||||
Help: "The total number of read requests served from cache when looking up object metadata",
|
||||
}),
|
||||
KubeApiReadRequests: promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: name_prefix + "kube_api_read_cache_misses",
|
||||
Help: "The total number of read requests served from kube-apiserver when looking up object metadata",
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -128,5 +138,7 @@ func DestroyMetricsStore(store *Store) {
|
||||
prometheus.Unregister(store.WatchErrors)
|
||||
prometheus.Unregister(store.SendErrors)
|
||||
prometheus.Unregister(store.BuildInfo)
|
||||
prometheus.Unregister(store.KubeApiReadCacheHits)
|
||||
prometheus.Unregister(store.KubeApiReadRequests)
|
||||
store = nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user