From d7ba03a966b48af796579749c7db669033b265a4 Mon Sep 17 00:00:00 2001 From: Ronak Nathani <7279934+ronaknnathani@users.noreply.github.com> Date: Fri, 23 Feb 2024 00:26:06 -0500 Subject: [PATCH] Improves leader election so that we don't lose events (#153) Currently, when a replica loses its leadership, a new leader isn't elected until leaseDuration seconds. Here, that is 15s. The max time till we get a new leader is leaseDuration (15s) + retryPeriod (2s) = 17s. This commit updates the shutdown process such that if the leader replica is sent a shutdown signal, it sleeps for leaseDuration seconds. This allows the leader replica to continue to export events until a new leader is elected. And a new leader is elected only if lease hasn't been renewed and leaseDuration expires. In addition to this, leader election now uses the leases object instead of configMaps and leases. The clusterRole is also updated to allow writing to the leases object. For use cases where no event loss is tolerable, users should use maxEventAgeSeconds to > 1. --- deploy/00-roles.yaml | 3 ++ go.mod | 1 + go.sum | 1 + main.go | 72 ++++++++++++++++++++++++-------------- pkg/kube/leaderelection.go | 9 +++-- pkg/kube/watcher.go | 10 ++++-- pkg/setup/setup.go | 2 +- pkg/setup/setup_test.go | 20 +++++------ 8 files changed, 76 insertions(+), 42 deletions(-) diff --git a/deploy/00-roles.yaml b/deploy/00-roles.yaml index a405e320..a577ad82 100644 --- a/deploy/00-roles.yaml +++ b/deploy/00-roles.yaml @@ -25,3 +25,6 @@ rules: - apiGroups: ["*"] resources: ["*"] verbs: ["get", "watch", "list"] +- apiGroups: ["coordination.k8s.io"] + resources: ["leases"] + verbs: ["*"] diff --git a/go.mod b/go.mod index 6fcdb3af..f723f9ed 100644 --- a/go.mod +++ b/go.mod @@ -41,6 +41,7 @@ require ( github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect github.com/eapache/queue v1.1.0 // indirect github.com/emicklei/go-restful/v3 v3.10.1 // indirect + github.com/evanphx/json-patch v4.12.0+incompatible // indirect github.com/fatih/color v1.15.0 // indirect github.com/go-kit/log v0.2.1 // indirect github.com/go-logfmt/logfmt v0.5.1 // indirect diff --git a/go.sum b/go.sum index 32d6af3d..6781c182 100644 --- a/go.sum +++ b/go.sum @@ -67,6 +67,7 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH5pOlLGNtQ5lPWQu84= +github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs= github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw= github.com/flowstack/go-jsonschema v0.1.1/go.mod h1:yL7fNggx1o8rm9RlgXv7hTBWxdBM0rVwpMwimd3F3N0= diff --git a/main.go b/main.go index e40924ab..b8b2ccb5 100644 --- a/main.go +++ b/main.go @@ -34,7 +34,7 @@ func main() { configBytes = []byte(os.ExpandEnv(string(configBytes))) - cfg, err := setup.ParseConfigFromBites(configBytes) + cfg, err := setup.ParseConfigFromBytes(configBytes) if err != nil { log.Fatal().Msg(err.Error()) } @@ -64,6 +64,8 @@ func main() { cfg.SetDefaults() + log.Info().Msgf("Starting with config: %#v", cfg) + if err := cfg.Validate(); err != nil { log.Fatal().Err(err).Msg("config validation failed") } @@ -91,45 +93,61 @@ func main() { w := kube.NewEventWatcher(kubecfg, cfg.Namespace, cfg.MaxEventAgeSeconds, metricsStore, onEvent, cfg.OmitLookup, cfg.CacheSize) - ctx, cancel := context.WithCancel(context.Background()) - leaderLost := make(chan bool) + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer cancel() + if cfg.LeaderElection.Enabled { + var wasLeader bool + log.Info().Msg("leader election enabled") + + onStoppedLeading := func(ctx context.Context) { + select { + case <-ctx.Done(): + log.Info().Msg("Context was cancelled, stopping leader election loop") + default: + log.Info().Msg("Lost the leader lease, stopping leader election loop") + } + } + l, err := kube.NewLeaderElector(cfg.LeaderElection.LeaderElectionID, kubecfg, + // this method gets called when this instance becomes the leader func(_ context.Context) { - log.Info().Msg("leader election got") + wasLeader = true + log.Info().Msg("leader election won") w.Start() }, + // this method gets called when the leader election loop is closed + // either due to context cancellation or due to losing the leader lease func() { - log.Error().Msg("leader election lost") - leaderLost <- true + onStoppedLeading(ctx) + }, + func(identity string) { + log.Info().Msg("new leader observed: " + identity) }, ) if err != nil { log.Fatal().Err(err).Msg("create leaderelector failed") } - go l.Run(ctx) + + // Run returns if either the context is canceled or client stopped holding the leader lease + l.Run(ctx) + + // We get here either because we lost the leader lease or the context was canceled. + // In either case we want to stop the event watcher and exit. + // However, if we were the leader, we wait leaseDuration seconds before stopping + // so that we don't lose events until the next leader is elected. The new leader + // will only be elected after leaseDuration seconds. + if wasLeader { + log.Info().Msgf("waiting leaseDuration seconds before stopping: %s", kube.GetLeaseDuration()) + time.Sleep(kube.GetLeaseDuration()) + } } else { + log.Info().Msg("leader election disabled") w.Start() + <-ctx.Done() } - c := make(chan os.Signal, 1) - signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) - - gracefulExit := func() { - defer close(c) - defer close(leaderLost) - cancel() - w.Stop() - engine.Stop() - log.Info().Msg("Exiting") - } - - select { - case sig := <-c: - log.Info().Str("signal", sig.String()).Msg("Received signal to exit") - gracefulExit() - case <-leaderLost: - log.Warn().Msg("Leader election lost") - gracefulExit() - } + log.Info().Msg("Received signal to exit. Stopping.") + w.Stop() + engine.Stop() } diff --git a/pkg/kube/leaderelection.go b/pkg/kube/leaderelection.go index d7f9193e..15cbf7bb 100644 --- a/pkg/kube/leaderelection.go +++ b/pkg/kube/leaderelection.go @@ -28,6 +28,10 @@ const ( defaultRetryPeriod = 2 * time.Second ) +func GetLeaseDuration() time.Duration { + return defaultLeaseDuration +} + // NewResourceLock creates a new config map resource lock for use in a leader // election loop func newResourceLock(config *rest.Config, leaderElectionID string) (resourcelock.Interface, error) { @@ -53,7 +57,7 @@ func newResourceLock(config *rest.Config, leaderElectionID string) (resourcelock return nil, err } - return resourcelock.New(resourcelock.ConfigMapsLeasesResourceLock, + return resourcelock.New(resourcelock.LeasesResourceLock, leaderElectionNamespace, leaderElectionID, client.CoreV1(), @@ -82,7 +86,7 @@ func getInClusterNamespace() (string, error) { } // NewLeaderElector return a leader elector object using client-go -func NewLeaderElector(leaderElectionID string, config *rest.Config, startFunc func(context.Context), stopFunc func()) (*leaderelection.LeaderElector, error) { +func NewLeaderElector(leaderElectionID string, config *rest.Config, startFunc func(context.Context), stopFunc func(), newLeaderFunc func(string)) (*leaderelection.LeaderElector, error) { resourceLock, err := newResourceLock(config, leaderElectionID) if err != nil { return &leaderelection.LeaderElector{}, err @@ -96,6 +100,7 @@ func NewLeaderElector(leaderElectionID string, config *rest.Config, startFunc fu Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: startFunc, OnStoppedLeading: stopFunc, + OnNewLeader: newLeaderFunc, }, }) return l, err diff --git a/pkg/kube/watcher.go b/pkg/kube/watcher.go index ca3e17a7..c72c4838 100644 --- a/pkg/kube/watcher.go +++ b/pkg/kube/watcher.go @@ -1,6 +1,7 @@ package kube import ( + "sync" "time" "github.com/resmoio/kubernetes-event-exporter/pkg/metrics" @@ -19,6 +20,7 @@ var startUpTime = time.Now() type EventHandler func(event *EnhancedEvent) type EventWatcher struct { + wg sync.WaitGroup informer cache.SharedInformer stopper chan struct{} objectMetadataCache ObjectMetadataProvider @@ -135,12 +137,16 @@ func (e *EventWatcher) OnDelete(obj interface{}) { } func (e *EventWatcher) Start() { - go e.informer.Run(e.stopper) + e.wg.Add(1) + go func() { + defer e.wg.Done() + e.informer.Run(e.stopper) + }() } func (e *EventWatcher) Stop() { - e.stopper <- struct{}{} close(e.stopper) + e.wg.Wait() } func (e *EventWatcher) setStartUpTime(time time.Time) { diff --git a/pkg/setup/setup.go b/pkg/setup/setup.go index 99ff5dee..f3439388 100644 --- a/pkg/setup/setup.go +++ b/pkg/setup/setup.go @@ -8,7 +8,7 @@ import ( "github.com/resmoio/kubernetes-event-exporter/pkg/exporter" ) -func ParseConfigFromBites(configBytes []byte) (exporter.Config, error) { +func ParseConfigFromBytes(configBytes []byte) (exporter.Config, error) { var config exporter.Config err := yaml.Unmarshal(configBytes, &config) if err != nil { diff --git a/pkg/setup/setup_test.go b/pkg/setup/setup_test.go index 9421e0fc..089edec8 100644 --- a/pkg/setup/setup_test.go +++ b/pkg/setup/setup_test.go @@ -7,14 +7,14 @@ import ( "github.com/stretchr/testify/assert" ) -func Test_ParseConfigFromBites_ExampleConfigIsCorrect(t *testing.T) { +func Test_ParseConfigFromBytes_ExampleConfigIsCorrect(t *testing.T) { configBytes, err := os.ReadFile("../../config.example.yaml") if err != nil { assert.NoError(t, err, "cannot read config file: "+err.Error()) return } - config, err := ParseConfigFromBites(configBytes) + config, err := ParseConfigFromBytes(configBytes) assert.NoError(t, err) assert.NotEmpty(t, config.LogLevel) @@ -26,26 +26,26 @@ func Test_ParseConfigFromBites_ExampleConfigIsCorrect(t *testing.T) { assert.Equal(t, 10, len(config.Receivers)) } -func Test_ParseConfigFromBites_NoErrors(t *testing.T) { +func Test_ParseConfigFromBytes_NoErrors(t *testing.T) { configBytes := []byte(` logLevel: info logFormat: json `) - config, err := ParseConfigFromBites(configBytes) + config, err := ParseConfigFromBytes(configBytes) assert.NoError(t, err) assert.Equal(t, "info", config.LogLevel) assert.Equal(t, "json", config.LogFormat) } -func Test_ParseConfigFromBites_ErrorWhenCurlyBracesNotEscaped(t *testing.T) { +func Test_ParseConfigFromBytes_ErrorWhenCurlyBracesNotEscaped(t *testing.T) { configBytes := []byte(` logLevel: {{info}} logFormat: json `) - config, err := ParseConfigFromBites(configBytes) + config, err := ParseConfigFromBytes(configBytes) expectedErrorLine := "> 2 | logLevel: {{info}}" expectedErrorSuggestion := "Need to wrap values with special characters in quotes" @@ -56,26 +56,26 @@ logFormat: json assert.Equal(t, "", config.LogFormat) } -func Test_ParseConfigFromBites_OkWhenCurlyBracesEscaped(t *testing.T) { +func Test_ParseConfigFromBytes_OkWhenCurlyBracesEscaped(t *testing.T) { configBytes := []byte(` logLevel: "{{info}}" logFormat: json `) - config, err := ParseConfigFromBites(configBytes) + config, err := ParseConfigFromBytes(configBytes) assert.Nil(t, err) assert.Equal(t, "{{info}}", config.LogLevel) assert.Equal(t, "json", config.LogFormat) } -func Test_ParseConfigFromBites_ErrorErrorNotWithCurlyBraces(t *testing.T) { +func Test_ParseConfigFromBytes_ErrorErrorNotWithCurlyBraces(t *testing.T) { configBytes := []byte(` logLevelNotYAMLErrorError logFormat: json `) - config, err := ParseConfigFromBites(configBytes) + config, err := ParseConfigFromBytes(configBytes) expectedErrorLine := "> 2 | logLevelNotYAMLErrorError" expectedErrorSuggestion := "Need to wrap values with special characters in quotes"