Skip to content

Commit

Permalink
*: Have keeper notify when shutting down
Browse files Browse the repository at this point in the history
Avoided race condition in sending dying gasp

Renamed variable

Fixed linting issue

test: re-enable gofmt tests
  • Loading branch information
warpcomdev committed Nov 8, 2021
1 parent 3bb7499 commit c2950b2
Showing 1 changed file with 51 additions and 26 deletions.
77 changes: 51 additions & 26 deletions cmd/keeper/cmd/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ func (p *PostgresKeeper) usePgrewind(db *cluster.DB) bool {
return p.pgSUUsername != "" && p.pgSUPassword != "" && db.Spec.UsePgrewind
}

func (p *PostgresKeeper) updateKeeperInfo() error {
func (p *PostgresKeeper) updateKeeperInfo(shuttingDown bool) error {
p.localStateMutex.Lock()
keeperUID := p.keeperLocalState.UID
clusterUID := p.keeperLocalState.ClusterUID
Expand All @@ -601,6 +601,13 @@ func (p *PostgresKeeper) updateKeeperInfo() error {
log.Warnf("failed to get postgres binary version: %v", err)
}

lastPGState := p.getLastPGState()
if shuttingDown {
// Tell the sentinels we are shutting this down.
// We don't need the mutex since p.getLastPGState()
// does a deep copy.
lastPGState.Healthy = false
}
keeperInfo := &cluster.KeeperInfo{
InfoUID: common.UID(),
UID: keeperUID,
Expand All @@ -610,7 +617,7 @@ func (p *PostgresKeeper) updateKeeperInfo() error {
Maj: maj,
Min: min,
},
PostgresState: p.getLastPGState(),
PostgresState: lastPGState,

CanBeMaster: p.canBeMaster,
CanBeSynchronousReplica: p.canBeSynchronousReplica,
Expand Down Expand Up @@ -801,9 +808,9 @@ func (p *PostgresKeeper) getLastPGState() *cluster.PostgresState {
}

func (p *PostgresKeeper) Start(ctx context.Context) {
endSMCh := make(chan struct{})
endPgStatecheckerCh := make(chan struct{})
endUpdateKeeperInfo := make(chan struct{})
endPostgresKeeperSM := make(chan struct{}, 1)
endUpdatePGState := make(chan struct{}, 1)
endUpdateKeeperInfo := make(chan struct{}, 1)

var err error
var cd *cluster.ClusterData
Expand All @@ -828,9 +835,29 @@ func (p *PostgresKeeper) Start(ctx context.Context) {

_ = p.pgm.StopIfStarted(true)

smTimerCh := time.NewTimer(0).C
updatePGStateTimerCh := time.NewTimer(0).C
updateKeeperInfoTimerCh := time.NewTimer(0).C
postgresKeeperSMTimer := time.NewTimer(0)
updatePGStateTimer := time.NewTimer(0)
updateKeeperInfoTimer := time.NewTimer(0)

// Make sure there is a single goroutine sending KeeperInfo updates.
// We want to force an order in calls to updateKeeperInfo, so the
// KeeperInfo sent when the context is cancelled is guaranteed to be
// the last one.
doUpdateKeeperInfo := make(chan struct{}, 1)
go func() {
defer close(endUpdateKeeperInfo)
for range doUpdateKeeperInfo {
if err = p.updateKeeperInfo(false); err != nil {
log.Errorw("failed to update keeper info", zap.Error(err))
}
endUpdateKeeperInfo <- struct{}{}
}
// Once the channel is closed, send a dying gasp
if err = p.updateKeeperInfo(true); err != nil {
log.Errorw("failed to update keeper info", zap.Error(err))
}
}()

for {
// The sleepInterval can be updated during normal execution. Ensure we regularly
// refresh the metric to account for those changes.
Expand All @@ -839,42 +866,40 @@ func (p *PostgresKeeper) Start(ctx context.Context) {
select {
case <-ctx.Done():
log.Debugw("stopping stolon keeper")
close(doUpdateKeeperInfo) // This will notify sentinels we are shutting down
for range endUpdateKeeperInfo { // Wait until the dying gasp is sent
}
if err = p.pgm.StopIfStarted(true); err != nil {
log.Errorw("failed to stop pg instance", zap.Error(err))
}
p.end <- nil
return

case <-smTimerCh:
case <-postgresKeeperSMTimer.C:
go func() {
p.postgresKeeperSM(ctx)
endSMCh <- struct{}{}
endPostgresKeeperSM <- struct{}{}
}()

case <-endSMCh:
smTimerCh = time.NewTimer(p.sleepInterval).C
case <-endPostgresKeeperSM:
postgresKeeperSMTimer.Reset(p.sleepInterval)

case <-updatePGStateTimerCh:
// updateKeeperInfo two times faster than the sleep interval
case <-updatePGStateTimer.C:
// updatePGState two times faster than the sleep interval
go func() {
p.updatePGState(ctx)
endPgStatecheckerCh <- struct{}{}
endUpdatePGState <- struct{}{}
}()

case <-endPgStatecheckerCh:
// updateKeeperInfo two times faster than the sleep interval
updatePGStateTimerCh = time.NewTimer(p.sleepInterval / 2).C
case <-endUpdatePGState:
// updatePGState two times faster than the sleep interval
updatePGStateTimer.Reset(p.sleepInterval / 2)

case <-updateKeeperInfoTimerCh:
go func() {
if err := p.updateKeeperInfo(); err != nil {
log.Errorw("failed to update keeper info", zap.Error(err))
}
endUpdateKeeperInfo <- struct{}{}
}()
case <-updateKeeperInfoTimer.C:
doUpdateKeeperInfo <- struct{}{}

case <-endUpdateKeeperInfo:
updateKeeperInfoTimerCh = time.NewTimer(p.sleepInterval).C
updateKeeperInfoTimer.Reset(p.sleepInterval)
}
}
}
Expand Down

0 comments on commit c2950b2

Please sign in to comment.