diff --git a/libs/cluster/Session/ClusterSlotCheck.cs b/libs/cluster/Session/ClusterSlotCheck.cs index 3c10ff8381..0a7fedbb5a 100644 --- a/libs/cluster/Session/ClusterSlotCheck.cs +++ b/libs/cluster/Session/ClusterSlotCheck.cs @@ -43,9 +43,9 @@ private void Redirect(ushort slot, ClusterConfig config) private void WriteClusterSlotVerificationMessage(ClusterConfig config, ClusterSlotVerificationResult vres, ref byte* dcurr, ref byte* dend) { - ReadOnlySpan resp = default; - SlotVerifiedState state = vres.state; - ushort slot = vres.slot; + ReadOnlySpan resp; + var state = vres.state; + var slot = vres.slot; string address; int port; switch (state) @@ -97,7 +97,7 @@ public bool NetworkSingleKeySlotVerify(byte[] key, bool readOnly, byte SessionAs /// True if redirect, False if can serve public bool NetworkSingleKeySlotVerify(ArgSlice keySlice, bool readOnly, byte SessionAsking, ref byte* dcurr, ref byte* dend) { - //If cluster is not enabled or a transaction is running skip slot check + // If cluster is not enabled or a transaction is running skip slot check if (!clusterProvider.serverOptions.EnableCluster || txnManager.state == TxnState.Running) return false; var config = clusterProvider.clusterManager.CurrentConfig; @@ -126,7 +126,7 @@ public bool NetworkSingleKeySlotVerify(ArgSlice keySlice, bool readOnly, byte Se public bool NetworkArraySlotVerify(int keyCount, ref byte* ptr, byte* endPtr, bool interleavedKeys, bool readOnly, byte SessionAsking, ref byte* dcurr, ref byte* dend, out bool retVal) { retVal = false; - //If cluster is not enabled or a transaction is running skip slot check + // If cluster is not enabled or a transaction is running skip slot check if (!clusterProvider.serverOptions.EnableCluster || txnManager.state == TxnState.Running) return false; var config = clusterProvider.clusterManager.CurrentConfig; @@ -153,7 +153,7 @@ public bool NetworkArraySlotVerify(int keyCount, ref byte* ptr, byte* endPtr, bo /// public bool NetworkKeyArraySlotVerify(ref ArgSlice[] keys, bool readOnly, byte SessionAsking, ref byte* dcurr, ref byte* dend, int count = -1) { - //If cluster is not enabled or a transaction is running skip slot check + // If cluster is not enabled or a transaction is running skip slot check if (!clusterProvider.serverOptions.EnableCluster || txnManager.state == TxnState.Running) return false; var config = clusterProvider.clusterManager.CurrentConfig; diff --git a/libs/cluster/Session/ClusterSlotVerify.cs b/libs/cluster/Session/ClusterSlotVerify.cs index d22373c733..59ebe5ede7 100644 --- a/libs/cluster/Session/ClusterSlotVerify.cs +++ b/libs/cluster/Session/ClusterSlotVerify.cs @@ -23,8 +23,8 @@ ClusterSlotVerificationResult SingleKeySlotVerify(ArgSlice keySlice, bool readOn ClusterSlotVerificationResult SingleKeyReadSlotVerify(ClusterConfig config, ArgSlice keySlice, byte SessionAsking, int slot = -1) { var _slot = slot == -1 ? ArgSliceUtils.HashSlot(keySlice) : (ushort)slot; - bool IsLocal = config.IsLocal(_slot); - SlotState state = config.GetState(_slot); + var IsLocal = config.IsLocal(_slot); + var state = config.GetState(_slot); // If local, then slot in not stable state if (IsLocal) @@ -42,7 +42,7 @@ ClusterSlotVerificationResult SingleKeyReadSlotVerify(ClusterConfig config, ArgS if (state == SlotState.STABLE) return new(SlotVerifiedState.OK, _slot); - //if key migrating and it exists serve read request + // If key migrating and it exists serve read request if (state == SlotState.MIGRATING) if (CheckIfKeyExists(keySlice)) return new(SlotVerifiedState.OK, _slot); @@ -53,19 +53,19 @@ ClusterSlotVerificationResult SingleKeyReadSlotVerify(ClusterConfig config, ArgS } else { - //if stable state and not local redirect to PRIMARY node + // If stable state and not local redirect to PRIMARY node if (state == SlotState.STABLE) return new(SlotVerifiedState.MOVED, _slot); else if (state == SlotState.IMPORTING) { - //if importing state respond to query only if preceded by asking + // If importing state respond to query only if preceded by asking if (SessionAsking > 0) return new(SlotVerifiedState.OK, _slot); - // if importing state and not asking redirect to source node + // If importing state and not asking redirect to source node else return new(SlotVerifiedState.MOVED, _slot); } - //if offline respond with clusterdown + // If offline respond with clusterdown else return new(SlotVerifiedState.CLUSTERDOWN, _slot); } @@ -74,10 +74,10 @@ ClusterSlotVerificationResult SingleKeyReadSlotVerify(ClusterConfig config, ArgS ClusterSlotVerificationResult SingleKeyReadWriteSlotVerify(ClusterConfig config, ArgSlice keySlice, byte SessionAsking, int slot = -1) { var _slot = slot == -1 ? ArgSliceUtils.HashSlot(keySlice) : (ushort)slot; - bool IsLocal = config.IsLocal(_slot, readCommand: readWriteSession); - SlotState state = config.GetState(_slot); + var IsLocal = config.IsLocal(_slot, readCommand: readWriteSession); + var state = config.GetState(_slot); - //Redirect r/w requests towards primary + // Redirect r/w requests towards primary if (config.GetLocalNodeRole() == NodeRole.REPLICA) return new(SlotVerifiedState.MOVED, _slot); @@ -86,10 +86,10 @@ ClusterSlotVerificationResult SingleKeyReadWriteSlotVerify(ClusterConfig config, if (IsLocal) { if (state == SlotState.MIGRATING) - //if key migrating and it exists cannot server write request + // If key migrating and it exists cannot server write request if (CheckIfKeyExists(keySlice)) return new(SlotVerifiedState.MIGRATING, _slot); - //if key migrating can redirect with ask to target node + // If key migrating can redirect with ask to target node else return new(SlotVerifiedState.ASK, _slot); else @@ -97,19 +97,19 @@ ClusterSlotVerificationResult SingleKeyReadWriteSlotVerify(ClusterConfig config, } else { - //if stable state and not local redirect to PRIMARY node + // If stable state and not local redirect to PRIMARY node if (state == SlotState.STABLE) return new(SlotVerifiedState.MOVED, _slot); else if (state == SlotState.IMPORTING) { - //if importing state respond to query only if preceeded by asking + // If importing state respond to query only if preceeded by asking if (SessionAsking > 0) return new(SlotVerifiedState.OK, _slot); - // if importing state and not asking redirect to source node + // If importing state and not asking redirect to source node else return new(SlotVerifiedState.MOVED, _slot); } - //if offline respond with clusterdown + // If offline respond with clusterdown else return new(SlotVerifiedState.CLUSTERDOWN, _slot); } @@ -118,24 +118,24 @@ ClusterSlotVerificationResult SingleKeyReadWriteSlotVerify(ClusterConfig config, ClusterSlotVerificationResult ArrayCrosslotVerify(int keyCount, ref byte* ptr, byte* endPtr, bool interleavedKeys, out bool retVal, out byte* keyPtr, out int ksize) { retVal = false; - bool crossSlot = false; + var crossSlot = false; keyPtr = null; ksize = 0; byte* valPtr = null; - int vsize = 0; + var vsize = 0; if (!RespReadUtils.ReadPtrWithLengthHeader(ref keyPtr, ref ksize, ref ptr, endPtr)) return new(SlotVerifiedState.OK, 0); - //skip value if key values are interleaved + // Skip value if key values are interleaved if (interleavedKeys) if (!RespReadUtils.ReadPtrWithLengthHeader(ref valPtr, ref vsize, ref ptr, endPtr)) return new(SlotVerifiedState.OK, 0); var slot = NumUtils.HashSlot(keyPtr, ksize); - for (int c = 1; c < keyCount; c++) + for (var c = 1; c < keyCount; c++) { keyPtr = null; ksize = 0; @@ -143,7 +143,7 @@ ClusterSlotVerificationResult ArrayCrosslotVerify(int keyCount, ref byte* ptr, b if (!RespReadUtils.ReadPtrWithLengthHeader(ref keyPtr, ref ksize, ref ptr, endPtr)) return new(SlotVerifiedState.OK, slot); - //skip value if key values are interleaved + // Skip value if key values are interleaved if (interleavedKeys) if (!RespReadUtils.ReadPtrWithLengthHeader(ref valPtr, ref vsize, ref ptr, endPtr)) return new(SlotVerifiedState.OK, 0); @@ -153,10 +153,7 @@ ClusterSlotVerificationResult ArrayCrosslotVerify(int keyCount, ref byte* ptr, b } retVal = true; - if (crossSlot) - return new(SlotVerifiedState.CROSSLOT, slot); - else - return new(SlotVerifiedState.OK, slot); + return crossSlot ? new(SlotVerifiedState.CROSSLOT, slot) : new(SlotVerifiedState.OK, slot); } ClusterSlotVerificationResult KeyArraySlotVerify(ClusterConfig config, int keyCount, ref byte* ptr, byte* endPtr, bool readOnly, bool interleavedKeys, byte SessionAsking, out bool retVal) @@ -167,10 +164,11 @@ ClusterSlotVerificationResult KeyArraySlotVerify(ClusterConfig config, int keyCo if (vres.state == SlotVerifiedState.CROSSLOT) return vres; else - if (readOnly) - return SingleKeyReadSlotVerify(config, new ArgSlice(keyPtr, ksize), SessionAsking, vres.slot); - else - return SingleKeyReadWriteSlotVerify(config, new ArgSlice(keyPtr, ksize), SessionAsking, vres.slot); + { + return readOnly + ? SingleKeyReadSlotVerify(config, new ArgSlice(keyPtr, ksize), SessionAsking, vres.slot) + : SingleKeyReadWriteSlotVerify(config, new ArgSlice(keyPtr, ksize), SessionAsking, vres.slot); + } } ClusterSlotVerificationResult ArrayCrossSlotVerify(ref ArgSlice[] keys, int count) @@ -179,8 +177,8 @@ ClusterSlotVerificationResult ArrayCrossSlotVerify(ref ArgSlice[] keys, int coun var _end = count < 0 ? keys.Length : count; var slot = ArgSliceUtils.HashSlot(keys[_offset]); - bool crossSlot = false; - for (int i = _offset; i < _end; i++) + var crossSlot = false; + for (var i = _offset; i < _end; i++) { var _slot = ArgSliceUtils.HashSlot(keys[i]); @@ -191,10 +189,9 @@ ClusterSlotVerificationResult ArrayCrossSlotVerify(ref ArgSlice[] keys, int coun } } - if (crossSlot) - return new(SlotVerifiedState.CROSSLOT, slot); - else - return new(SlotVerifiedState.OK, slot); + return crossSlot + ? new(SlotVerifiedState.CROSSLOT, slot) + : new(SlotVerifiedState.OK, slot); } ClusterSlotVerificationResult KeyArraySlotVerify(ClusterConfig config, ref ArgSlice[] keys, bool readOnly, byte SessionAsking, int count) @@ -204,10 +201,9 @@ ClusterSlotVerificationResult KeyArraySlotVerify(ClusterConfig config, ref ArgSl return vres; else { - if (readOnly) - return SingleKeyReadSlotVerify(config, keys[0], SessionAsking, vres.slot); - else - return SingleKeyReadWriteSlotVerify(config, keys[0], SessionAsking, vres.slot); + return readOnly + ? SingleKeyReadSlotVerify(config, keys[0], SessionAsking, vres.slot) + : SingleKeyReadWriteSlotVerify(config, keys[0], SessionAsking, vres.slot); } } } diff --git a/libs/server/Resp/KeyAdminCommands.cs b/libs/server/Resp/KeyAdminCommands.cs index f729c5caff..c279378a8e 100644 --- a/libs/server/Resp/KeyAdminCommands.cs +++ b/libs/server/Resp/KeyAdminCommands.cs @@ -151,6 +151,10 @@ private bool NetworkEXISTS(int count, byte* ptr, ref TGarnetApi stor ptr += 12; int exists = 0; + // TODO: change to count when parser PR merges + if (NetworkArraySlotVerify(count - 1, ptr, interleavedKeys: false, readOnly: true, out bool retVal)) + return retVal; + for (int i = 0; i < count - 1; i++) { byte* keyPtr = null; @@ -159,9 +163,6 @@ private bool NetworkEXISTS(int count, byte* ptr, ref TGarnetApi stor if (!RespReadUtils.ReadPtrWithLengthHeader(ref keyPtr, ref ksize, ref ptr, recvBufferPtr + bytesRead)) return false; - if (NetworkSingleKeySlotVerify(keyPtr, ksize, true)) - return true; - ArgSlice key = new(keyPtr, ksize); var status = storageApi.EXISTS(key); if (status == GarnetStatus.OK) diff --git a/libs/server/Resp/RespServerSession.cs b/libs/server/Resp/RespServerSession.cs index 7fa4fa8f47..07fe60f0f4 100644 --- a/libs/server/Resp/RespServerSession.cs +++ b/libs/server/Resp/RespServerSession.cs @@ -846,12 +846,36 @@ private unsafe ObjectOutputHeader ProcessOutputWithHeader(SpanByteAndMemory outp return header; } + /// + /// This method is used to verify slot ownership for provided key. + /// On error this method writes to response buffer but does not drain recv buffer (caller is responsible for draining). + /// + /// Key bytes + /// Whether caller is going to perform a readonly or read/write operation. + /// True when ownernship is verified, false otherwise bool NetworkSingleKeySlotVerify(byte[] key, bool readOnly) => clusterSession != null && clusterSession.NetworkSingleKeySlotVerify(key, readOnly, SessionAsking, ref dcurr, ref dend); + /// + /// This method is used to verify slot ownership for provided key sequence. + /// On error this method writes to response buffer but does not drain recv buffer (caller is responsible for draining). + /// + /// Pointer to key bytes + /// Whether caller is going to perform a readonly or read/write operation + /// True when ownernship is verified, false otherwise bool NetworkSingleKeySlotVerify(byte* keyPtr, int ksize, bool readOnly) => clusterSession != null && clusterSession.NetworkSingleKeySlotVerify(new ArgSlice(keyPtr, ksize), readOnly, SessionAsking, ref dcurr, ref dend); + /// + /// This method is used to verify slot ownership for provided sequence of keys. + /// On error this method writes to response buffer and drains recv buffer. + /// + /// Number of keys + /// Starting poistion of RESP formatted key sequence + /// Whether the sequence of keys are interleaved (e.g. MSET [key1] [value1] [key2] [value2]...) or non-interleaved (e.g. MGET [key1] [key2] [key3]) + /// Whether caller is going to perform a readonly or read/write operation + /// Used to indicate if parsing succeeded or failed due to lack of expected data + /// True when ownernship is verified, false otherwise bool NetworkArraySlotVerify(int keyCount, byte* ptr, bool interleavedKeys, bool readOnly, out bool retVal) { retVal = false; @@ -863,6 +887,13 @@ bool NetworkArraySlotVerify(int keyCount, byte* ptr, bool interleavedKeys, bool return false; } + /// + /// This method is used to verify slot ownership for provided array of key argslices. + /// + /// Array of key ArgSlice + /// Whether caller is going to perform a readonly or read/write operation + /// Key count if different than keys array length + /// True when ownernship is verified, false otherwise bool NetworkKeyArraySlotVerify(ref ArgSlice[] keys, bool readOnly, int count = -1) => clusterSession != null && clusterSession.NetworkKeyArraySlotVerify(ref keys, readOnly, SessionAsking, ref dcurr, ref dend, count); } diff --git a/test/Garnet.test.cluster/ClusterRedirectTests.cs b/test/Garnet.test.cluster/ClusterRedirectTests.cs index 6768fc8b33..b35de0449e 100644 --- a/test/Garnet.test.cluster/ClusterRedirectTests.cs +++ b/test/Garnet.test.cluster/ClusterRedirectTests.cs @@ -172,7 +172,7 @@ private static bool ReplaceValue(ref byte[] value, string cmdStr, int i, out str //6. EXISTS new ("EXISTS", ["SET "], "EXISTS ", ["DEL "], "1", null, (TestFlags.READONLY | TestFlags.SINGLEKEY | TestFlags.KEY_EXISTS)), - new ("EXISTS", ["SET "], "EXISTS ", ["DEL "], "1", null, (TestFlags.READONLY | TestFlags.SINGLEKEY | TestFlags.KEY_EXISTS | TestFlags.ASKING)), + new ("EXISTS", ["SET "], "EXISTS ", ["DEL "], "1", null, (TestFlags.READONLY | TestFlags.SINGLEKEY | TestFlags.KEY_EXISTS | TestFlags.ASKING)), //7. INCR new ("INCR", ["SET 100"], "INCR ", ["DEL "], "101", null, (TestFlags.SINGLEKEY | TestFlags.READ_WRITE)), @@ -443,6 +443,10 @@ private static bool ReplaceValue(ref byte[] value, string cmdStr, int i, out str //2. MGET new ("MGET", ["MSET "],"MGET ", ["DEL ", "DEL ", "DEL "], null, ["", "", ""], (TestFlags.READONLY | TestFlags.MULTIKEY | TestFlags.KEY_EXISTS)), new ("MGET", ["MSET "],"MGET ", ["DEL ", "DEL ", "DEL "], null, ["", "", ""], (TestFlags.READONLY | TestFlags.MULTIKEY | TestFlags.KEY_EXISTS | TestFlags.ASKING)), + + //3. EXISTS + new ("EXISTS", ["MSET "],"EXISTS ", ["DEL ", "DEL ", "DEL "], "3", ["", "", ""], (TestFlags.READONLY | TestFlags.MULTIKEY | TestFlags.KEY_EXISTS)), + new ("EXISTS", ["MSET "],"EXISTS ", ["DEL ", "DEL ", "DEL "], "3", ["", "", ""], (TestFlags.READONLY | TestFlags.MULTIKEY | TestFlags.KEY_EXISTS | TestFlags.ASKING)), #endregion #region hllCommands