Skip to content

Commit

Permalink
Fix Array EXISTS with cluster mode (#215)
Browse files Browse the repository at this point in the history
* cleanup and docs update

* fix array EXISTS and add unit test

---------

Co-authored-by: Tal Zaccai <[email protected]>
  • Loading branch information
vazois and TalZaccai authored Apr 1, 2024
1 parent 0c9c287 commit 96f5cc6
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 49 deletions.
12 changes: 6 additions & 6 deletions libs/cluster/Session/ClusterSlotCheck.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ private void Redirect(ushort slot, ClusterConfig config)

private void WriteClusterSlotVerificationMessage(ClusterConfig config, ClusterSlotVerificationResult vres, ref byte* dcurr, ref byte* dend)
{
ReadOnlySpan<byte> resp = default;
SlotVerifiedState state = vres.state;
ushort slot = vres.slot;
ReadOnlySpan<byte> resp;
var state = vres.state;
var slot = vres.slot;
string address;
int port;
switch (state)
Expand Down Expand Up @@ -97,7 +97,7 @@ public bool NetworkSingleKeySlotVerify(byte[] key, bool readOnly, byte SessionAs
/// <returns>True if redirect, False if can serve</returns>
public bool NetworkSingleKeySlotVerify(ArgSlice keySlice, bool readOnly, byte SessionAsking, ref byte* dcurr, ref byte* dend)
{
//If cluster is not enabled or a transaction is running skip slot check
// If cluster is not enabled or a transaction is running skip slot check
if (!clusterProvider.serverOptions.EnableCluster || txnManager.state == TxnState.Running) return false;

var config = clusterProvider.clusterManager.CurrentConfig;
Expand Down Expand Up @@ -126,7 +126,7 @@ public bool NetworkSingleKeySlotVerify(ArgSlice keySlice, bool readOnly, byte Se
public bool NetworkArraySlotVerify(int keyCount, ref byte* ptr, byte* endPtr, bool interleavedKeys, bool readOnly, byte SessionAsking, ref byte* dcurr, ref byte* dend, out bool retVal)
{
retVal = false;
//If cluster is not enabled or a transaction is running skip slot check
// If cluster is not enabled or a transaction is running skip slot check
if (!clusterProvider.serverOptions.EnableCluster || txnManager.state == TxnState.Running) return false;

var config = clusterProvider.clusterManager.CurrentConfig;
Expand All @@ -153,7 +153,7 @@ public bool NetworkArraySlotVerify(int keyCount, ref byte* ptr, byte* endPtr, bo
/// <returns></returns>
public bool NetworkKeyArraySlotVerify(ref ArgSlice[] keys, bool readOnly, byte SessionAsking, ref byte* dcurr, ref byte* dend, int count = -1)
{
//If cluster is not enabled or a transaction is running skip slot check
// If cluster is not enabled or a transaction is running skip slot check
if (!clusterProvider.serverOptions.EnableCluster || txnManager.state == TxnState.Running) return false;

var config = clusterProvider.clusterManager.CurrentConfig;
Expand Down
74 changes: 35 additions & 39 deletions libs/cluster/Session/ClusterSlotVerify.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ ClusterSlotVerificationResult SingleKeySlotVerify(ArgSlice keySlice, bool readOn
ClusterSlotVerificationResult SingleKeyReadSlotVerify(ClusterConfig config, ArgSlice keySlice, byte SessionAsking, int slot = -1)
{
var _slot = slot == -1 ? ArgSliceUtils.HashSlot(keySlice) : (ushort)slot;
bool IsLocal = config.IsLocal(_slot);
SlotState state = config.GetState(_slot);
var IsLocal = config.IsLocal(_slot);
var state = config.GetState(_slot);

// If local, then slot in not stable state
if (IsLocal)
Expand All @@ -42,7 +42,7 @@ ClusterSlotVerificationResult SingleKeyReadSlotVerify(ClusterConfig config, ArgS
if (state == SlotState.STABLE)
return new(SlotVerifiedState.OK, _slot);

//if key migrating and it exists serve read request
// If key migrating and it exists serve read request
if (state == SlotState.MIGRATING)
if (CheckIfKeyExists(keySlice))
return new(SlotVerifiedState.OK, _slot);
Expand All @@ -53,19 +53,19 @@ ClusterSlotVerificationResult SingleKeyReadSlotVerify(ClusterConfig config, ArgS
}
else
{
//if stable state and not local redirect to PRIMARY node
// If stable state and not local redirect to PRIMARY node
if (state == SlotState.STABLE)
return new(SlotVerifiedState.MOVED, _slot);
else if (state == SlotState.IMPORTING)
{
//if importing state respond to query only if preceded by asking
// If importing state respond to query only if preceded by asking
if (SessionAsking > 0)
return new(SlotVerifiedState.OK, _slot);
// if importing state and not asking redirect to source node
// If importing state and not asking redirect to source node
else
return new(SlotVerifiedState.MOVED, _slot);
}
//if offline respond with clusterdown
// If offline respond with clusterdown
else
return new(SlotVerifiedState.CLUSTERDOWN, _slot);
}
Expand All @@ -74,10 +74,10 @@ ClusterSlotVerificationResult SingleKeyReadSlotVerify(ClusterConfig config, ArgS
ClusterSlotVerificationResult SingleKeyReadWriteSlotVerify(ClusterConfig config, ArgSlice keySlice, byte SessionAsking, int slot = -1)
{
var _slot = slot == -1 ? ArgSliceUtils.HashSlot(keySlice) : (ushort)slot;
bool IsLocal = config.IsLocal(_slot, readCommand: readWriteSession);
SlotState state = config.GetState(_slot);
var IsLocal = config.IsLocal(_slot, readCommand: readWriteSession);
var state = config.GetState(_slot);

//Redirect r/w requests towards primary
// Redirect r/w requests towards primary
if (config.GetLocalNodeRole() == NodeRole.REPLICA)
return new(SlotVerifiedState.MOVED, _slot);

Expand All @@ -86,30 +86,30 @@ ClusterSlotVerificationResult SingleKeyReadWriteSlotVerify(ClusterConfig config,
if (IsLocal)
{
if (state == SlotState.MIGRATING)
//if key migrating and it exists cannot server write request
// If key migrating and it exists cannot server write request
if (CheckIfKeyExists(keySlice))
return new(SlotVerifiedState.MIGRATING, _slot);
//if key migrating can redirect with ask to target node
// If key migrating can redirect with ask to target node
else
return new(SlotVerifiedState.ASK, _slot);
else
return new(SlotVerifiedState.CLUSTERDOWN, _slot);
}
else
{
//if stable state and not local redirect to PRIMARY node
// If stable state and not local redirect to PRIMARY node
if (state == SlotState.STABLE)
return new(SlotVerifiedState.MOVED, _slot);
else if (state == SlotState.IMPORTING)
{
//if importing state respond to query only if preceeded by asking
// If importing state respond to query only if preceeded by asking
if (SessionAsking > 0)
return new(SlotVerifiedState.OK, _slot);
// if importing state and not asking redirect to source node
// If importing state and not asking redirect to source node
else
return new(SlotVerifiedState.MOVED, _slot);
}
//if offline respond with clusterdown
// If offline respond with clusterdown
else
return new(SlotVerifiedState.CLUSTERDOWN, _slot);
}
Expand All @@ -118,32 +118,32 @@ ClusterSlotVerificationResult SingleKeyReadWriteSlotVerify(ClusterConfig config,
ClusterSlotVerificationResult ArrayCrosslotVerify(int keyCount, ref byte* ptr, byte* endPtr, bool interleavedKeys, out bool retVal, out byte* keyPtr, out int ksize)
{
retVal = false;
bool crossSlot = false;
var crossSlot = false;
keyPtr = null;
ksize = 0;

byte* valPtr = null;
int vsize = 0;
var vsize = 0;

if (!RespReadUtils.ReadPtrWithLengthHeader(ref keyPtr, ref ksize, ref ptr, endPtr))
return new(SlotVerifiedState.OK, 0);

//skip value if key values are interleaved
// Skip value if key values are interleaved
if (interleavedKeys)
if (!RespReadUtils.ReadPtrWithLengthHeader(ref valPtr, ref vsize, ref ptr, endPtr))
return new(SlotVerifiedState.OK, 0);

var slot = NumUtils.HashSlot(keyPtr, ksize);

for (int c = 1; c < keyCount; c++)
for (var c = 1; c < keyCount; c++)
{
keyPtr = null;
ksize = 0;

if (!RespReadUtils.ReadPtrWithLengthHeader(ref keyPtr, ref ksize, ref ptr, endPtr))
return new(SlotVerifiedState.OK, slot);

//skip value if key values are interleaved
// Skip value if key values are interleaved
if (interleavedKeys)
if (!RespReadUtils.ReadPtrWithLengthHeader(ref valPtr, ref vsize, ref ptr, endPtr))
return new(SlotVerifiedState.OK, 0);
Expand All @@ -153,10 +153,7 @@ ClusterSlotVerificationResult ArrayCrosslotVerify(int keyCount, ref byte* ptr, b
}

retVal = true;
if (crossSlot)
return new(SlotVerifiedState.CROSSLOT, slot);
else
return new(SlotVerifiedState.OK, slot);
return crossSlot ? new(SlotVerifiedState.CROSSLOT, slot) : new(SlotVerifiedState.OK, slot);
}

ClusterSlotVerificationResult KeyArraySlotVerify(ClusterConfig config, int keyCount, ref byte* ptr, byte* endPtr, bool readOnly, bool interleavedKeys, byte SessionAsking, out bool retVal)
Expand All @@ -167,10 +164,11 @@ ClusterSlotVerificationResult KeyArraySlotVerify(ClusterConfig config, int keyCo
if (vres.state == SlotVerifiedState.CROSSLOT)
return vres;
else
if (readOnly)
return SingleKeyReadSlotVerify(config, new ArgSlice(keyPtr, ksize), SessionAsking, vres.slot);
else
return SingleKeyReadWriteSlotVerify(config, new ArgSlice(keyPtr, ksize), SessionAsking, vres.slot);
{
return readOnly
? SingleKeyReadSlotVerify(config, new ArgSlice(keyPtr, ksize), SessionAsking, vres.slot)
: SingleKeyReadWriteSlotVerify(config, new ArgSlice(keyPtr, ksize), SessionAsking, vres.slot);
}
}

ClusterSlotVerificationResult ArrayCrossSlotVerify(ref ArgSlice[] keys, int count)
Expand All @@ -179,8 +177,8 @@ ClusterSlotVerificationResult ArrayCrossSlotVerify(ref ArgSlice[] keys, int coun
var _end = count < 0 ? keys.Length : count;

var slot = ArgSliceUtils.HashSlot(keys[_offset]);
bool crossSlot = false;
for (int i = _offset; i < _end; i++)
var crossSlot = false;
for (var i = _offset; i < _end; i++)
{
var _slot = ArgSliceUtils.HashSlot(keys[i]);

Expand All @@ -191,10 +189,9 @@ ClusterSlotVerificationResult ArrayCrossSlotVerify(ref ArgSlice[] keys, int coun
}
}

if (crossSlot)
return new(SlotVerifiedState.CROSSLOT, slot);
else
return new(SlotVerifiedState.OK, slot);
return crossSlot
? new(SlotVerifiedState.CROSSLOT, slot)
: new(SlotVerifiedState.OK, slot);
}

ClusterSlotVerificationResult KeyArraySlotVerify(ClusterConfig config, ref ArgSlice[] keys, bool readOnly, byte SessionAsking, int count)
Expand All @@ -204,10 +201,9 @@ ClusterSlotVerificationResult KeyArraySlotVerify(ClusterConfig config, ref ArgSl
return vres;
else
{
if (readOnly)
return SingleKeyReadSlotVerify(config, keys[0], SessionAsking, vres.slot);
else
return SingleKeyReadWriteSlotVerify(config, keys[0], SessionAsking, vres.slot);
return readOnly
? SingleKeyReadSlotVerify(config, keys[0], SessionAsking, vres.slot)
: SingleKeyReadWriteSlotVerify(config, keys[0], SessionAsking, vres.slot);
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions libs/server/Resp/KeyAdminCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ private bool NetworkEXISTS<TGarnetApi>(int count, byte* ptr, ref TGarnetApi stor
ptr += 12;
int exists = 0;

// TODO: change to count when parser PR merges
if (NetworkArraySlotVerify(count - 1, ptr, interleavedKeys: false, readOnly: true, out bool retVal))
return retVal;

for (int i = 0; i < count - 1; i++)
{
byte* keyPtr = null;
Expand All @@ -159,9 +163,6 @@ private bool NetworkEXISTS<TGarnetApi>(int count, byte* ptr, ref TGarnetApi stor
if (!RespReadUtils.ReadPtrWithLengthHeader(ref keyPtr, ref ksize, ref ptr, recvBufferPtr + bytesRead))
return false;

if (NetworkSingleKeySlotVerify(keyPtr, ksize, true))
return true;

ArgSlice key = new(keyPtr, ksize);
var status = storageApi.EXISTS(key);
if (status == GarnetStatus.OK)
Expand Down
31 changes: 31 additions & 0 deletions libs/server/Resp/RespServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -846,12 +846,36 @@ private unsafe ObjectOutputHeader ProcessOutputWithHeader(SpanByteAndMemory outp
return header;
}

/// <summary>
/// This method is used to verify slot ownership for provided key.
/// On error this method writes to response buffer but does not drain recv buffer (caller is responsible for draining).
/// </summary>
/// <param name="key">Key bytes</param>
/// <param name="readOnly">Whether caller is going to perform a readonly or read/write operation.</param>
/// <returns>True when ownernship is verified, false otherwise</returns>
bool NetworkSingleKeySlotVerify(byte[] key, bool readOnly)
=> clusterSession != null && clusterSession.NetworkSingleKeySlotVerify(key, readOnly, SessionAsking, ref dcurr, ref dend);

/// <summary>
/// This method is used to verify slot ownership for provided key sequence.
/// On error this method writes to response buffer but does not drain recv buffer (caller is responsible for draining).
/// </summary>
/// <param name="keyPtr">Pointer to key bytes</param>
/// <param name="readOnly">Whether caller is going to perform a readonly or read/write operation</param>
/// <returns>True when ownernship is verified, false otherwise</returns>
bool NetworkSingleKeySlotVerify(byte* keyPtr, int ksize, bool readOnly)
=> clusterSession != null && clusterSession.NetworkSingleKeySlotVerify(new ArgSlice(keyPtr, ksize), readOnly, SessionAsking, ref dcurr, ref dend);

/// <summary>
/// This method is used to verify slot ownership for provided sequence of keys.
/// On error this method writes to response buffer and drains recv buffer.
/// </summary>
/// <param name="keyCount">Number of keys</param>
/// <param name="ptr">Starting poistion of RESP formatted key sequence</param>
/// <param name="interleavedKeys">Whether the sequence of keys are interleaved (e.g. MSET [key1] [value1] [key2] [value2]...) or non-interleaved (e.g. MGET [key1] [key2] [key3])</param>
/// <param name="readOnly">Whether caller is going to perform a readonly or read/write operation</param>
/// <param name="retVal">Used to indicate if parsing succeeded or failed due to lack of expected data</param>
/// <returns>True when ownernship is verified, false otherwise</returns>
bool NetworkArraySlotVerify(int keyCount, byte* ptr, bool interleavedKeys, bool readOnly, out bool retVal)
{
retVal = false;
Expand All @@ -863,6 +887,13 @@ bool NetworkArraySlotVerify(int keyCount, byte* ptr, bool interleavedKeys, bool
return false;
}

/// <summary>
/// This method is used to verify slot ownership for provided array of key argslices.
/// </summary>
/// <param name="keys">Array of key ArgSlice</param>
/// <param name="readOnly">Whether caller is going to perform a readonly or read/write operation</param>
/// <param name="count">Key count if different than keys array length</param>
/// <returns>True when ownernship is verified, false otherwise</returns>
bool NetworkKeyArraySlotVerify(ref ArgSlice[] keys, bool readOnly, int count = -1)
=> clusterSession != null && clusterSession.NetworkKeyArraySlotVerify(ref keys, readOnly, SessionAsking, ref dcurr, ref dend, count);
}
Expand Down
6 changes: 5 additions & 1 deletion test/Garnet.test.cluster/ClusterRedirectTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ private static bool ReplaceValue(ref byte[] value, string cmdStr, int i, out str

//6. EXISTS
new ("EXISTS", ["SET <key#0> <s#0>"], "EXISTS <key#0>", ["DEL <key#0>"], "1", null, (TestFlags.READONLY | TestFlags.SINGLEKEY | TestFlags.KEY_EXISTS)),
new ("EXISTS", ["SET <key#0> <s#0>"], "EXISTS <key#0>", ["DEL <key#0>"], "1", null, (TestFlags.READONLY | TestFlags.SINGLEKEY | TestFlags.KEY_EXISTS | TestFlags.ASKING)),
new ("EXISTS", ["SET <key#0> <s#0>"], "EXISTS <key#0>", ["DEL <key#0>"], "1", null, (TestFlags.READONLY | TestFlags.SINGLEKEY | TestFlags.KEY_EXISTS | TestFlags.ASKING)),

//7. INCR
new ("INCR", ["SET <key#0> 100"], "INCR <key#0>", ["DEL <key#0>"], "101", null, (TestFlags.SINGLEKEY | TestFlags.READ_WRITE)),
Expand Down Expand Up @@ -443,6 +443,10 @@ private static bool ReplaceValue(ref byte[] value, string cmdStr, int i, out str
//2. MGET
new ("MGET", ["MSET <key#0> <s#0> <key#1> <s#1> <key#2> <s#2>"],"MGET <key#0> <key#1> <key#2>", ["DEL <key#0>", "DEL <key#1>", "DEL <key#2>"], null, ["<s#0>", "<s#1>", "<s#2>"], (TestFlags.READONLY | TestFlags.MULTIKEY | TestFlags.KEY_EXISTS)),
new ("MGET", ["MSET <key#0> <s#0> <key#1> <s#1> <key#2> <s#2>"],"MGET <key#0> <key#1> <key#2>", ["DEL <key#0>", "DEL <key#1>", "DEL <key#2>"], null, ["<s#0>", "<s#1>", "<s#2>"], (TestFlags.READONLY | TestFlags.MULTIKEY | TestFlags.KEY_EXISTS | TestFlags.ASKING)),

//3. EXISTS
new ("EXISTS", ["MSET <key#0> <s#0> <key#1> <s#1> <key#2> <s#2>"],"EXISTS <key#0> <key#1> <key#2>", ["DEL <key#0>", "DEL <key#1>", "DEL <key#2>"], "3", ["<s#0>", "<s#1>", "<s#2>"], (TestFlags.READONLY | TestFlags.MULTIKEY | TestFlags.KEY_EXISTS)),
new ("EXISTS", ["MSET <key#0> <s#0> <key#1> <s#1> <key#2> <s#2>"],"EXISTS <key#0> <key#1> <key#2>", ["DEL <key#0>", "DEL <key#1>", "DEL <key#2>"], "3", ["<s#0>", "<s#1>", "<s#2>"], (TestFlags.READONLY | TestFlags.MULTIKEY | TestFlags.KEY_EXISTS | TestFlags.ASKING)),
#endregion

#region hllCommands
Expand Down

0 comments on commit 96f5cc6

Please sign in to comment.