Skip to content

Commit

Permalink
Merge branch 'main' into talzacc/respcommandinfo_refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc authored May 8, 2024
2 parents 8938232 + 5cce676 commit ffb286b
Show file tree
Hide file tree
Showing 9 changed files with 280 additions and 189 deletions.
2 changes: 1 addition & 1 deletion libs/cluster/Server/ClusterProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ internal bool WaitForConfigTransition()
foreach (var s in sessions)
{
var entryEpoch = s.LocalCurrentEpoch;
if (entryEpoch != 0 && entryEpoch >= currentEpoch)
if (entryEpoch != 0 && entryEpoch < currentEpoch)
goto retry;
}
break;
Expand Down
2 changes: 1 addition & 1 deletion libs/cluster/Server/Replication/CheckpointEntry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public CheckpointEntry()
}

public long GetMinAofCoveredAddress()
=> Math.Min(storeCheckpointCoveredAofAddress, objectCheckpointCoveredAofAddress);
=> Math.Max(Math.Min(storeCheckpointCoveredAofAddress, objectCheckpointCoveredAofAddress), 64);

/// <summary>
/// Indicate addition of new reader by trying to increment reader counter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public void Dispose()
public async Task<bool> SendCheckpoint()
{
errorMsg = default;
var retryCount = 0;
var storeCkptManager = clusterProvider.GetReplicationLogCheckpointManager(StoreType.Main);
var objectStoreCkptManager = clusterProvider.GetReplicationLogCheckpointManager(StoreType.Object);
var current = clusterProvider.clusterManager.CurrentConfig;
Expand Down Expand Up @@ -96,6 +97,8 @@ public async Task<bool> SendCheckpoint()
{
localEntry.RemoveReader();
_ = Thread.Yield();
if (retryCount++ > 10)
throw new GarnetException("Attaching replica maximum retry count reached!");
goto retry;
}
}
Expand All @@ -110,6 +113,8 @@ public async Task<bool> SendCheckpoint()
{
localEntry.RemoveReader();
_ = Thread.Yield();
if (retryCount++ > 10)
throw new GarnetException("Attaching replica maximum retry count reached!");
goto retry;
}
}
Expand Down Expand Up @@ -187,7 +192,7 @@ public async Task<bool> SendCheckpoint()
var beginAddress = RecoveredReplicationOffset;
if (!recoverFromRemote)
{
//If replica is ahead of this primary it will force itself to forget and start syncing from RecoveredReplicationOffset
// If replica is ahead of this primary it will force itself to forget and start syncing from RecoveredReplicationOffset
if (replicaAofBeginAddress > ReplicationManager.kFirstValidAofAddress && replicaAofBeginAddress > RecoveredReplicationOffset)
{
logger?.LogInformation(
Expand Down
19 changes: 14 additions & 5 deletions libs/server/Resp/ArrayCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -668,15 +668,25 @@ private bool NetworkSELECT(byte* ptr)

readHead = (int)(ptr - recvBufferPtr);

if (string.Equals(result, "0"))
if (storeWrapper.serverOptions.EnableCluster)
{
while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend))
// Cluster mode does not allow DBID
while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_SELECT_CLUSTER_MODE, ref dcurr, dend))
SendAndReset();
}
else
{
while (!RespWriteUtils.WriteError("ERR invalid database index."u8, ref dcurr, dend))
SendAndReset();

if (string.Equals(result, "0"))
{
while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend))
SendAndReset();
}
else
{
while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_SELECT_INVALID_INDEX, ref dcurr, dend))
SendAndReset();
}
}
return true;
}
Expand Down Expand Up @@ -728,7 +738,6 @@ private bool NetworkKEYS<TGarnetApi>(int count, byte* ptr, ref TGarnetApi storag
return true;
}


private bool NetworkSCAN<TGarnetApi>(int count, byte* ptr, ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
Expand Down
3 changes: 3 additions & 0 deletions libs/server/Resp/CmdStrings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ public static ReadOnlySpan<byte> GetConfig(ReadOnlySpan<byte> key)
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER => "ERR value is not an integer or out of range."u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_UKNOWN_SUBCOMMAND => "ERR Unknown subcommand. Try LATENCY HELP."u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_INDEX_OUT_RANGE => "ERR index out of range"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_SELECT_INVALID_INDEX => "ERR invalid database index."u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_SELECT_CLUSTER_MODE => "ERR SELECT is not allowed in cluster mode"u8;

/// <summary>
/// Response string templates
/// </summary>
Expand Down
19 changes: 10 additions & 9 deletions libs/server/StoreWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ public void RecoverCheckpoint(bool recoverMainStoreFromToken = false, bool recov
{
storeVersion = !recoverMainStoreFromToken ? store.Recover() : store.Recover(storeIndexToken, storeHlogToken);
if (objectStore != null) objectStoreVersion = !recoverObjectStoreFromToken ? objectStore.Recover() : objectStore.Recover(objectStoreIndexToken, objectStoreHlogToken);
lastSaveTime = DateTimeOffset.UtcNow;
if (storeVersion > 0 || objectStoreVersion > 0)
lastSaveTime = DateTimeOffset.UtcNow;
}
catch (Exception ex)
{
Expand Down Expand Up @@ -269,8 +270,8 @@ public long ReplayAOF(long untilAddress = -1)
long replicationOffset = 0;
try
{
//When replaying AOF we do not want to write record again to AOF.
//So initialize local AofProcessor with recordToAof: false.
// When replaying AOF we do not want to write record again to AOF.
// So initialize local AofProcessor with recordToAof: false.
var aofProcessor = new AofProcessor(this, recordToAof: false, logger);
aofProcessor.Recover(untilAddress);
aofProcessor.Dispose();
Expand Down Expand Up @@ -578,22 +579,22 @@ void CompleteCheckpoint()
/// <summary>
/// Take a checkpoint if no checkpoint was taken after the provided time offset
/// </summary>
/// <param name="afterTime"></param>
/// <param name="entryTime"></param>
/// <returns></returns>
public async Task TakeOnDemandCheckpoint(DateTimeOffset afterTime)
public async Task TakeOnDemandCheckpoint(DateTimeOffset entryTime)
{
//Take lock to ensure not other task will be taking a checkpoint
// Take lock to ensure no other task will be taking a checkpoint
while (!StartCheckpoint())
await Task.Yield();

//If an external task has taken a checkpoint after the provided afterTime return
if (this.lastSaveTime > afterTime)
// If an external task has taken a checkpoint beyond the provided entryTime return
if (this.lastSaveTime > entryTime)
{
CompleteCheckpoint();
return;
}

//If no newer checkpoint was taken compared to the provided afterTime take a checkpoint
// Necessary to take a checkpoint because the latest checkpoint is before entryTime
await CheckpointTask(StoreType.All, logger: logger);
}

Expand Down
Loading

0 comments on commit ffb286b

Please sign in to comment.