mirror of
https://github.com/resmoio/kubernetes-event-exporter.git
synced 2026-02-14 14:39:50 +00:00
Improves leader election so that we don't lose events (#153)
Currently, when a replica loses its leadership, a new leader isn't elected until leaseDuration seconds. Here, that is 15s. The max time till we get a new leader is leaseDuration (15s) + retryPeriod (2s) = 17s. This commit updates the shutdown process such that if the leader replica is sent a shutdown signal, it sleeps for leaseDuration seconds. This allows the leader replica to continue to export events until a new leader is elected. And a new leader is elected only if lease hasn't been renewed and leaseDuration expires. In addition to this, leader election now uses the leases object instead of configMaps and leases. The clusterRole is also updated to allow writing to the leases object. For use cases where no event loss is tolerable, users should use maxEventAgeSeconds to > 1.
This commit is contained in:
@@ -25,3 +25,6 @@ rules:
|
||||
- apiGroups: ["*"]
|
||||
resources: ["*"]
|
||||
verbs: ["get", "watch", "list"]
|
||||
- apiGroups: ["coordination.k8s.io"]
|
||||
resources: ["leases"]
|
||||
verbs: ["*"]
|
||||
|
||||
1
go.mod
1
go.mod
@@ -41,6 +41,7 @@ require (
|
||||
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
|
||||
github.com/eapache/queue v1.1.0 // indirect
|
||||
github.com/emicklei/go-restful/v3 v3.10.1 // indirect
|
||||
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
|
||||
github.com/fatih/color v1.15.0 // indirect
|
||||
github.com/go-kit/log v0.2.1 // indirect
|
||||
github.com/go-logfmt/logfmt v0.5.1 // indirect
|
||||
|
||||
1
go.sum
1
go.sum
@@ -67,6 +67,7 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m
|
||||
github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ=
|
||||
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
|
||||
github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH5pOlLGNtQ5lPWQu84=
|
||||
github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
|
||||
github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs=
|
||||
github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw=
|
||||
github.com/flowstack/go-jsonschema v0.1.1/go.mod h1:yL7fNggx1o8rm9RlgXv7hTBWxdBM0rVwpMwimd3F3N0=
|
||||
|
||||
72
main.go
72
main.go
@@ -34,7 +34,7 @@ func main() {
|
||||
|
||||
configBytes = []byte(os.ExpandEnv(string(configBytes)))
|
||||
|
||||
cfg, err := setup.ParseConfigFromBites(configBytes)
|
||||
cfg, err := setup.ParseConfigFromBytes(configBytes)
|
||||
if err != nil {
|
||||
log.Fatal().Msg(err.Error())
|
||||
}
|
||||
@@ -64,6 +64,8 @@ func main() {
|
||||
|
||||
cfg.SetDefaults()
|
||||
|
||||
log.Info().Msgf("Starting with config: %#v", cfg)
|
||||
|
||||
if err := cfg.Validate(); err != nil {
|
||||
log.Fatal().Err(err).Msg("config validation failed")
|
||||
}
|
||||
@@ -91,45 +93,61 @@ func main() {
|
||||
|
||||
w := kube.NewEventWatcher(kubecfg, cfg.Namespace, cfg.MaxEventAgeSeconds, metricsStore, onEvent, cfg.OmitLookup, cfg.CacheSize)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
leaderLost := make(chan bool)
|
||||
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||||
defer cancel()
|
||||
|
||||
if cfg.LeaderElection.Enabled {
|
||||
var wasLeader bool
|
||||
log.Info().Msg("leader election enabled")
|
||||
|
||||
onStoppedLeading := func(ctx context.Context) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Info().Msg("Context was cancelled, stopping leader election loop")
|
||||
default:
|
||||
log.Info().Msg("Lost the leader lease, stopping leader election loop")
|
||||
}
|
||||
}
|
||||
|
||||
l, err := kube.NewLeaderElector(cfg.LeaderElection.LeaderElectionID, kubecfg,
|
||||
// this method gets called when this instance becomes the leader
|
||||
func(_ context.Context) {
|
||||
log.Info().Msg("leader election got")
|
||||
wasLeader = true
|
||||
log.Info().Msg("leader election won")
|
||||
w.Start()
|
||||
},
|
||||
// this method gets called when the leader election loop is closed
|
||||
// either due to context cancellation or due to losing the leader lease
|
||||
func() {
|
||||
log.Error().Msg("leader election lost")
|
||||
leaderLost <- true
|
||||
onStoppedLeading(ctx)
|
||||
},
|
||||
func(identity string) {
|
||||
log.Info().Msg("new leader observed: " + identity)
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
log.Fatal().Err(err).Msg("create leaderelector failed")
|
||||
}
|
||||
go l.Run(ctx)
|
||||
|
||||
// Run returns if either the context is canceled or client stopped holding the leader lease
|
||||
l.Run(ctx)
|
||||
|
||||
// We get here either because we lost the leader lease or the context was canceled.
|
||||
// In either case we want to stop the event watcher and exit.
|
||||
// However, if we were the leader, we wait leaseDuration seconds before stopping
|
||||
// so that we don't lose events until the next leader is elected. The new leader
|
||||
// will only be elected after leaseDuration seconds.
|
||||
if wasLeader {
|
||||
log.Info().Msgf("waiting leaseDuration seconds before stopping: %s", kube.GetLeaseDuration())
|
||||
time.Sleep(kube.GetLeaseDuration())
|
||||
}
|
||||
} else {
|
||||
log.Info().Msg("leader election disabled")
|
||||
w.Start()
|
||||
<-ctx.Done()
|
||||
}
|
||||
|
||||
c := make(chan os.Signal, 1)
|
||||
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
gracefulExit := func() {
|
||||
defer close(c)
|
||||
defer close(leaderLost)
|
||||
cancel()
|
||||
w.Stop()
|
||||
engine.Stop()
|
||||
log.Info().Msg("Exiting")
|
||||
}
|
||||
|
||||
select {
|
||||
case sig := <-c:
|
||||
log.Info().Str("signal", sig.String()).Msg("Received signal to exit")
|
||||
gracefulExit()
|
||||
case <-leaderLost:
|
||||
log.Warn().Msg("Leader election lost")
|
||||
gracefulExit()
|
||||
}
|
||||
log.Info().Msg("Received signal to exit. Stopping.")
|
||||
w.Stop()
|
||||
engine.Stop()
|
||||
}
|
||||
|
||||
@@ -28,6 +28,10 @@ const (
|
||||
defaultRetryPeriod = 2 * time.Second
|
||||
)
|
||||
|
||||
func GetLeaseDuration() time.Duration {
|
||||
return defaultLeaseDuration
|
||||
}
|
||||
|
||||
// NewResourceLock creates a new config map resource lock for use in a leader
|
||||
// election loop
|
||||
func newResourceLock(config *rest.Config, leaderElectionID string) (resourcelock.Interface, error) {
|
||||
@@ -53,7 +57,7 @@ func newResourceLock(config *rest.Config, leaderElectionID string) (resourcelock
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resourcelock.New(resourcelock.ConfigMapsLeasesResourceLock,
|
||||
return resourcelock.New(resourcelock.LeasesResourceLock,
|
||||
leaderElectionNamespace,
|
||||
leaderElectionID,
|
||||
client.CoreV1(),
|
||||
@@ -82,7 +86,7 @@ func getInClusterNamespace() (string, error) {
|
||||
}
|
||||
|
||||
// NewLeaderElector return a leader elector object using client-go
|
||||
func NewLeaderElector(leaderElectionID string, config *rest.Config, startFunc func(context.Context), stopFunc func()) (*leaderelection.LeaderElector, error) {
|
||||
func NewLeaderElector(leaderElectionID string, config *rest.Config, startFunc func(context.Context), stopFunc func(), newLeaderFunc func(string)) (*leaderelection.LeaderElector, error) {
|
||||
resourceLock, err := newResourceLock(config, leaderElectionID)
|
||||
if err != nil {
|
||||
return &leaderelection.LeaderElector{}, err
|
||||
@@ -96,6 +100,7 @@ func NewLeaderElector(leaderElectionID string, config *rest.Config, startFunc fu
|
||||
Callbacks: leaderelection.LeaderCallbacks{
|
||||
OnStartedLeading: startFunc,
|
||||
OnStoppedLeading: stopFunc,
|
||||
OnNewLeader: newLeaderFunc,
|
||||
},
|
||||
})
|
||||
return l, err
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package kube
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/resmoio/kubernetes-event-exporter/pkg/metrics"
|
||||
@@ -19,6 +20,7 @@ var startUpTime = time.Now()
|
||||
type EventHandler func(event *EnhancedEvent)
|
||||
|
||||
type EventWatcher struct {
|
||||
wg sync.WaitGroup
|
||||
informer cache.SharedInformer
|
||||
stopper chan struct{}
|
||||
objectMetadataCache ObjectMetadataProvider
|
||||
@@ -135,12 +137,16 @@ func (e *EventWatcher) OnDelete(obj interface{}) {
|
||||
}
|
||||
|
||||
func (e *EventWatcher) Start() {
|
||||
go e.informer.Run(e.stopper)
|
||||
e.wg.Add(1)
|
||||
go func() {
|
||||
defer e.wg.Done()
|
||||
e.informer.Run(e.stopper)
|
||||
}()
|
||||
}
|
||||
|
||||
func (e *EventWatcher) Stop() {
|
||||
e.stopper <- struct{}{}
|
||||
close(e.stopper)
|
||||
e.wg.Wait()
|
||||
}
|
||||
|
||||
func (e *EventWatcher) setStartUpTime(time time.Time) {
|
||||
|
||||
@@ -8,7 +8,7 @@ import (
|
||||
"github.com/resmoio/kubernetes-event-exporter/pkg/exporter"
|
||||
)
|
||||
|
||||
func ParseConfigFromBites(configBytes []byte) (exporter.Config, error) {
|
||||
func ParseConfigFromBytes(configBytes []byte) (exporter.Config, error) {
|
||||
var config exporter.Config
|
||||
err := yaml.Unmarshal(configBytes, &config)
|
||||
if err != nil {
|
||||
|
||||
@@ -7,14 +7,14 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func Test_ParseConfigFromBites_ExampleConfigIsCorrect(t *testing.T) {
|
||||
func Test_ParseConfigFromBytes_ExampleConfigIsCorrect(t *testing.T) {
|
||||
configBytes, err := os.ReadFile("../../config.example.yaml")
|
||||
if err != nil {
|
||||
assert.NoError(t, err, "cannot read config file: "+err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
config, err := ParseConfigFromBites(configBytes)
|
||||
config, err := ParseConfigFromBytes(configBytes)
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, config.LogLevel)
|
||||
@@ -26,26 +26,26 @@ func Test_ParseConfigFromBites_ExampleConfigIsCorrect(t *testing.T) {
|
||||
assert.Equal(t, 10, len(config.Receivers))
|
||||
}
|
||||
|
||||
func Test_ParseConfigFromBites_NoErrors(t *testing.T) {
|
||||
func Test_ParseConfigFromBytes_NoErrors(t *testing.T) {
|
||||
configBytes := []byte(`
|
||||
logLevel: info
|
||||
logFormat: json
|
||||
`)
|
||||
|
||||
config, err := ParseConfigFromBites(configBytes)
|
||||
config, err := ParseConfigFromBytes(configBytes)
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, "info", config.LogLevel)
|
||||
assert.Equal(t, "json", config.LogFormat)
|
||||
}
|
||||
|
||||
func Test_ParseConfigFromBites_ErrorWhenCurlyBracesNotEscaped(t *testing.T) {
|
||||
func Test_ParseConfigFromBytes_ErrorWhenCurlyBracesNotEscaped(t *testing.T) {
|
||||
configBytes := []byte(`
|
||||
logLevel: {{info}}
|
||||
logFormat: json
|
||||
`)
|
||||
|
||||
config, err := ParseConfigFromBites(configBytes)
|
||||
config, err := ParseConfigFromBytes(configBytes)
|
||||
|
||||
expectedErrorLine := "> 2 | logLevel: {{info}}"
|
||||
expectedErrorSuggestion := "Need to wrap values with special characters in quotes"
|
||||
@@ -56,26 +56,26 @@ logFormat: json
|
||||
assert.Equal(t, "", config.LogFormat)
|
||||
}
|
||||
|
||||
func Test_ParseConfigFromBites_OkWhenCurlyBracesEscaped(t *testing.T) {
|
||||
func Test_ParseConfigFromBytes_OkWhenCurlyBracesEscaped(t *testing.T) {
|
||||
configBytes := []byte(`
|
||||
logLevel: "{{info}}"
|
||||
logFormat: json
|
||||
`)
|
||||
|
||||
config, err := ParseConfigFromBites(configBytes)
|
||||
config, err := ParseConfigFromBytes(configBytes)
|
||||
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, "{{info}}", config.LogLevel)
|
||||
assert.Equal(t, "json", config.LogFormat)
|
||||
}
|
||||
|
||||
func Test_ParseConfigFromBites_ErrorErrorNotWithCurlyBraces(t *testing.T) {
|
||||
func Test_ParseConfigFromBytes_ErrorErrorNotWithCurlyBraces(t *testing.T) {
|
||||
configBytes := []byte(`
|
||||
logLevelNotYAMLErrorError
|
||||
logFormat: json
|
||||
`)
|
||||
|
||||
config, err := ParseConfigFromBites(configBytes)
|
||||
config, err := ParseConfigFromBytes(configBytes)
|
||||
|
||||
expectedErrorLine := "> 2 | logLevelNotYAMLErrorError"
|
||||
expectedErrorSuggestion := "Need to wrap values with special characters in quotes"
|
||||
|
||||
Reference in New Issue
Block a user