diff --git a/deploy/00-roles.yaml b/deploy/00-roles.yaml index a405e320..a577ad82 100644 --- a/deploy/00-roles.yaml +++ b/deploy/00-roles.yaml @@ -25,3 +25,6 @@ rules: - apiGroups: ["*"] resources: ["*"] verbs: ["get", "watch", "list"] +- apiGroups: ["coordination.k8s.io"] + resources: ["leases"] + verbs: ["*"] diff --git a/go.mod b/go.mod index 6fcdb3af..f723f9ed 100644 --- a/go.mod +++ b/go.mod @@ -41,6 +41,7 @@ require ( github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect github.com/eapache/queue v1.1.0 // indirect github.com/emicklei/go-restful/v3 v3.10.1 // indirect + github.com/evanphx/json-patch v4.12.0+incompatible // indirect github.com/fatih/color v1.15.0 // indirect github.com/go-kit/log v0.2.1 // indirect github.com/go-logfmt/logfmt v0.5.1 // indirect diff --git a/go.sum b/go.sum index 32d6af3d..6781c182 100644 --- a/go.sum +++ b/go.sum @@ -67,6 +67,7 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH5pOlLGNtQ5lPWQu84= +github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs= github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw= github.com/flowstack/go-jsonschema v0.1.1/go.mod h1:yL7fNggx1o8rm9RlgXv7hTBWxdBM0rVwpMwimd3F3N0= diff --git a/main.go b/main.go index e40924ab..b8b2ccb5 100644 --- a/main.go +++ b/main.go @@ -34,7 +34,7 @@ func main() { configBytes = []byte(os.ExpandEnv(string(configBytes))) - cfg, err := setup.ParseConfigFromBites(configBytes) + cfg, err := setup.ParseConfigFromBytes(configBytes) if err != nil { log.Fatal().Msg(err.Error()) } @@ -64,6 +64,8 @@ func main() { cfg.SetDefaults() + log.Info().Msgf("Starting with config: %#v", cfg) + if err := cfg.Validate(); err != nil { log.Fatal().Err(err).Msg("config validation failed") } @@ -91,45 +93,61 @@ func main() { w := kube.NewEventWatcher(kubecfg, cfg.Namespace, cfg.MaxEventAgeSeconds, metricsStore, onEvent, cfg.OmitLookup, cfg.CacheSize) - ctx, cancel := context.WithCancel(context.Background()) - leaderLost := make(chan bool) + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer cancel() + if cfg.LeaderElection.Enabled { + var wasLeader bool + log.Info().Msg("leader election enabled") + + onStoppedLeading := func(ctx context.Context) { + select { + case <-ctx.Done(): + log.Info().Msg("Context was cancelled, stopping leader election loop") + default: + log.Info().Msg("Lost the leader lease, stopping leader election loop") + } + } + l, err := kube.NewLeaderElector(cfg.LeaderElection.LeaderElectionID, kubecfg, + // this method gets called when this instance becomes the leader func(_ context.Context) { - log.Info().Msg("leader election got") + wasLeader = true + log.Info().Msg("leader election won") w.Start() }, + // this method gets called when the leader election loop is closed + // either due to context cancellation or due to losing the leader lease func() { - log.Error().Msg("leader election lost") - leaderLost <- true + onStoppedLeading(ctx) + }, + func(identity string) { + log.Info().Msg("new leader observed: " + identity) }, ) if err != nil { log.Fatal().Err(err).Msg("create leaderelector failed") } - go l.Run(ctx) + + // Run returns if either the context is canceled or client stopped holding the leader lease + l.Run(ctx) + + // We get here either because we lost the leader lease or the context was canceled. + // In either case we want to stop the event watcher and exit. + // However, if we were the leader, we wait leaseDuration seconds before stopping + // so that we don't lose events until the next leader is elected. The new leader + // will only be elected after leaseDuration seconds. + if wasLeader { + log.Info().Msgf("waiting leaseDuration seconds before stopping: %s", kube.GetLeaseDuration()) + time.Sleep(kube.GetLeaseDuration()) + } } else { + log.Info().Msg("leader election disabled") w.Start() + <-ctx.Done() } - c := make(chan os.Signal, 1) - signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) - - gracefulExit := func() { - defer close(c) - defer close(leaderLost) - cancel() - w.Stop() - engine.Stop() - log.Info().Msg("Exiting") - } - - select { - case sig := <-c: - log.Info().Str("signal", sig.String()).Msg("Received signal to exit") - gracefulExit() - case <-leaderLost: - log.Warn().Msg("Leader election lost") - gracefulExit() - } + log.Info().Msg("Received signal to exit. Stopping.") + w.Stop() + engine.Stop() } diff --git a/pkg/kube/leaderelection.go b/pkg/kube/leaderelection.go index d7f9193e..15cbf7bb 100644 --- a/pkg/kube/leaderelection.go +++ b/pkg/kube/leaderelection.go @@ -28,6 +28,10 @@ const ( defaultRetryPeriod = 2 * time.Second ) +func GetLeaseDuration() time.Duration { + return defaultLeaseDuration +} + // NewResourceLock creates a new config map resource lock for use in a leader // election loop func newResourceLock(config *rest.Config, leaderElectionID string) (resourcelock.Interface, error) { @@ -53,7 +57,7 @@ func newResourceLock(config *rest.Config, leaderElectionID string) (resourcelock return nil, err } - return resourcelock.New(resourcelock.ConfigMapsLeasesResourceLock, + return resourcelock.New(resourcelock.LeasesResourceLock, leaderElectionNamespace, leaderElectionID, client.CoreV1(), @@ -82,7 +86,7 @@ func getInClusterNamespace() (string, error) { } // NewLeaderElector return a leader elector object using client-go -func NewLeaderElector(leaderElectionID string, config *rest.Config, startFunc func(context.Context), stopFunc func()) (*leaderelection.LeaderElector, error) { +func NewLeaderElector(leaderElectionID string, config *rest.Config, startFunc func(context.Context), stopFunc func(), newLeaderFunc func(string)) (*leaderelection.LeaderElector, error) { resourceLock, err := newResourceLock(config, leaderElectionID) if err != nil { return &leaderelection.LeaderElector{}, err @@ -96,6 +100,7 @@ func NewLeaderElector(leaderElectionID string, config *rest.Config, startFunc fu Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: startFunc, OnStoppedLeading: stopFunc, + OnNewLeader: newLeaderFunc, }, }) return l, err diff --git a/pkg/kube/watcher.go b/pkg/kube/watcher.go index ca3e17a7..c72c4838 100644 --- a/pkg/kube/watcher.go +++ b/pkg/kube/watcher.go @@ -1,6 +1,7 @@ package kube import ( + "sync" "time" "github.com/resmoio/kubernetes-event-exporter/pkg/metrics" @@ -19,6 +20,7 @@ var startUpTime = time.Now() type EventHandler func(event *EnhancedEvent) type EventWatcher struct { + wg sync.WaitGroup informer cache.SharedInformer stopper chan struct{} objectMetadataCache ObjectMetadataProvider @@ -135,12 +137,16 @@ func (e *EventWatcher) OnDelete(obj interface{}) { } func (e *EventWatcher) Start() { - go e.informer.Run(e.stopper) + e.wg.Add(1) + go func() { + defer e.wg.Done() + e.informer.Run(e.stopper) + }() } func (e *EventWatcher) Stop() { - e.stopper <- struct{}{} close(e.stopper) + e.wg.Wait() } func (e *EventWatcher) setStartUpTime(time time.Time) { diff --git a/pkg/setup/setup.go b/pkg/setup/setup.go index 99ff5dee..f3439388 100644 --- a/pkg/setup/setup.go +++ b/pkg/setup/setup.go @@ -8,7 +8,7 @@ import ( "github.com/resmoio/kubernetes-event-exporter/pkg/exporter" ) -func ParseConfigFromBites(configBytes []byte) (exporter.Config, error) { +func ParseConfigFromBytes(configBytes []byte) (exporter.Config, error) { var config exporter.Config err := yaml.Unmarshal(configBytes, &config) if err != nil { diff --git a/pkg/setup/setup_test.go b/pkg/setup/setup_test.go index 9421e0fc..089edec8 100644 --- a/pkg/setup/setup_test.go +++ b/pkg/setup/setup_test.go @@ -7,14 +7,14 @@ import ( "github.com/stretchr/testify/assert" ) -func Test_ParseConfigFromBites_ExampleConfigIsCorrect(t *testing.T) { +func Test_ParseConfigFromBytes_ExampleConfigIsCorrect(t *testing.T) { configBytes, err := os.ReadFile("../../config.example.yaml") if err != nil { assert.NoError(t, err, "cannot read config file: "+err.Error()) return } - config, err := ParseConfigFromBites(configBytes) + config, err := ParseConfigFromBytes(configBytes) assert.NoError(t, err) assert.NotEmpty(t, config.LogLevel) @@ -26,26 +26,26 @@ func Test_ParseConfigFromBites_ExampleConfigIsCorrect(t *testing.T) { assert.Equal(t, 10, len(config.Receivers)) } -func Test_ParseConfigFromBites_NoErrors(t *testing.T) { +func Test_ParseConfigFromBytes_NoErrors(t *testing.T) { configBytes := []byte(` logLevel: info logFormat: json `) - config, err := ParseConfigFromBites(configBytes) + config, err := ParseConfigFromBytes(configBytes) assert.NoError(t, err) assert.Equal(t, "info", config.LogLevel) assert.Equal(t, "json", config.LogFormat) } -func Test_ParseConfigFromBites_ErrorWhenCurlyBracesNotEscaped(t *testing.T) { +func Test_ParseConfigFromBytes_ErrorWhenCurlyBracesNotEscaped(t *testing.T) { configBytes := []byte(` logLevel: {{info}} logFormat: json `) - config, err := ParseConfigFromBites(configBytes) + config, err := ParseConfigFromBytes(configBytes) expectedErrorLine := "> 2 | logLevel: {{info}}" expectedErrorSuggestion := "Need to wrap values with special characters in quotes" @@ -56,26 +56,26 @@ logFormat: json assert.Equal(t, "", config.LogFormat) } -func Test_ParseConfigFromBites_OkWhenCurlyBracesEscaped(t *testing.T) { +func Test_ParseConfigFromBytes_OkWhenCurlyBracesEscaped(t *testing.T) { configBytes := []byte(` logLevel: "{{info}}" logFormat: json `) - config, err := ParseConfigFromBites(configBytes) + config, err := ParseConfigFromBytes(configBytes) assert.Nil(t, err) assert.Equal(t, "{{info}}", config.LogLevel) assert.Equal(t, "json", config.LogFormat) } -func Test_ParseConfigFromBites_ErrorErrorNotWithCurlyBraces(t *testing.T) { +func Test_ParseConfigFromBytes_ErrorErrorNotWithCurlyBraces(t *testing.T) { configBytes := []byte(` logLevelNotYAMLErrorError logFormat: json `) - config, err := ParseConfigFromBites(configBytes) + config, err := ParseConfigFromBytes(configBytes) expectedErrorLine := "> 2 | logLevelNotYAMLErrorError" expectedErrorSuggestion := "Need to wrap values with special characters in quotes"