From 30e37803f74a154bc7ee6d8cd9effac19ed4c4a7 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Fri, 3 May 2024 11:17:25 -0700 Subject: [PATCH 1/4] Clear input header field (flags) for object store RMW operations (#357) * When preparing input for the object store RMW operations, we need to set the flag field to 0 as the buffer being used to create the input may not be zero'ed out. * small fix * Clean up Unit Test --------- Co-authored-by: TedHartMS <15467143+TedHartMS@users.noreply.github.com> --- libs/server/Resp/Objects/HashCommands.cs | 8 ++ libs/server/Resp/Objects/ListCommands.cs | 9 ++ libs/server/Resp/Objects/SetCommands.cs | 7 ++ .../Resp/Objects/SharedObjectCommands.cs | 1 + libs/server/Resp/Objects/SortedSetCommands.cs | 13 +++ .../Resp/Objects/SortedSetGeoCommands.cs | 2 + .../Storage/Session/ObjectStore/HashOps.cs | 9 ++ .../Storage/Session/ObjectStore/ListOps.cs | 5 ++ .../Storage/Session/ObjectStore/SetOps.cs | 8 ++ .../Session/ObjectStore/SortedSetOps.cs | 14 ++- test/Garnet.test/RespTests.cs | 85 +++++++++++++++++++ 11 files changed, 160 insertions(+), 1 deletion(-) diff --git a/libs/server/Resp/Objects/HashCommands.cs b/libs/server/Resp/Objects/HashCommands.cs index 61a90a6826d..06ce6e381c0 100644 --- a/libs/server/Resp/Objects/HashCommands.cs +++ b/libs/server/Resp/Objects/HashCommands.cs @@ -69,6 +69,7 @@ private unsafe bool HashSet(int count, byte* ptr, HashOperation hop, // Prepare header in input buffer inputPtr->header.type = GarnetObjectType.Hash; + inputPtr->header.flags = 0; inputPtr->header.HashOp = hop; inputPtr->count = inputCount; inputPtr->done = hashOpsCount; @@ -152,6 +153,7 @@ private unsafe bool HashGet(int count, byte* ptr, HashOperation op, int inputCount = op == HashOperation.HGETALL ? 0 : (op == HashOperation.HRANDFIELD ? count + 1 : count - 1); // Prepare header in input buffer inputPtr->header.type = GarnetObjectType.Hash; + inputPtr->header.flags = 0; inputPtr->header.HashOp = op; inputPtr->count = inputCount; inputPtr->done = hashItemsDoneCount; @@ -253,6 +255,7 @@ private unsafe bool HashLength(int count, byte* ptr, ref TGarnetApi // Prepare header in input buffer inputPtr->header.type = GarnetObjectType.Hash; + inputPtr->header.flags = 0; inputPtr->header.HashOp = HashOperation.HLEN; inputPtr->count = 1; inputPtr->done = 0; @@ -323,6 +326,7 @@ private unsafe bool HashStrLength(int count, byte* ptr, ref TGarnetA // Prepare header in input buffer inputPtr->header.type = GarnetObjectType.Hash; + inputPtr->header.flags = 0; inputPtr->header.HashOp = HashOperation.HSTRLEN; inputPtr->count = 1; inputPtr->done = 0; @@ -397,6 +401,7 @@ private unsafe bool HashDelete(int count, byte* ptr, ref TGarnetApi // Prepare header in input buffer inputPtr->header.type = GarnetObjectType.Hash; + inputPtr->header.flags = 0; inputPtr->header.HashOp = HashOperation.HDEL; inputPtr->count = inputCount; inputPtr->done = hashItemsDoneCount; @@ -474,6 +479,7 @@ private unsafe bool HashExists(int count, byte* ptr, ref TGarnetApi // Prepare header in input buffer inputPtr->header.type = GarnetObjectType.Hash; + inputPtr->header.flags = 0; inputPtr->header.HashOp = HashOperation.HEXISTS; inputPtr->count = 1; inputPtr->done = 0; @@ -546,6 +552,7 @@ private unsafe bool HashKeys(int count, byte* ptr, HashOperation op, // Prepare header in input buffer inputPtr->header.type = GarnetObjectType.Hash; + inputPtr->header.flags = 0; inputPtr->header.HashOp = op; inputPtr->count = count - 1; inputPtr->done = hashOpsCount; @@ -634,6 +641,7 @@ private unsafe bool HashIncrement(int count, byte* ptr, HashOperatio // Prepare header in input buffer inputPtr->header.type = GarnetObjectType.Hash; + inputPtr->header.flags = 0; inputPtr->header.HashOp = op; inputPtr->count = count + 1; inputPtr->done = 0; diff --git a/libs/server/Resp/Objects/ListCommands.cs b/libs/server/Resp/Objects/ListCommands.cs index 1279189723f..6a49a463a43 100644 --- a/libs/server/Resp/Objects/ListCommands.cs +++ b/libs/server/Resp/Objects/ListCommands.cs @@ -61,6 +61,7 @@ private unsafe bool ListPush(int count, byte* ptr, ListOperation lop // Prepare header in input buffer inputPtr->header.type = GarnetObjectType.List; + inputPtr->header.flags = 0; inputPtr->header.ListOp = lop; inputPtr->count = inputCount; inputPtr->done = listItemsDoneCount; @@ -157,6 +158,7 @@ private unsafe bool ListPop(int count, byte* ptr, ListOperation lop, // Prepare header in input buffer inputPtr->header.type = GarnetObjectType.List; + inputPtr->header.flags = 0; inputPtr->header.ListOp = lop; inputPtr->done = 0; inputPtr->count = popCount; @@ -230,6 +232,7 @@ private unsafe bool ListLength(int count, byte* ptr, ref TGarnetApi // Prepare header in input buffer inputPtr->header.type = GarnetObjectType.List; + inputPtr->header.flags = 0; inputPtr->header.ListOp = ListOperation.LLEN; inputPtr->count = count; inputPtr->done = 0; @@ -307,6 +310,7 @@ private unsafe bool ListTrim(int count, byte* ptr, ref TGarnetApi st // Prepare header in input buffer inputPtr->header.type = GarnetObjectType.List; + inputPtr->header.flags = 0; inputPtr->header.ListOp = ListOperation.LTRIM; inputPtr->count = start; inputPtr->done = stop; @@ -375,6 +379,7 @@ private unsafe bool ListRange(int count, byte* ptr, ref TGarnetApi s var inputLength = (int)(recvBufferPtr + bytesRead - (byte*)inputPtr); inputPtr->header.type = GarnetObjectType.List; + inputPtr->header.flags = 0; inputPtr->header.ListOp = ListOperation.LRANGE; inputPtr->count = start; inputPtr->done = end; @@ -449,6 +454,7 @@ private unsafe bool ListIndex(int count, byte* ptr, ref TGarnetApi s // Prepare header in input buffer inputPtr->header.type = GarnetObjectType.List; + inputPtr->header.flags = 0; inputPtr->header.ListOp = ListOperation.LINDEX; inputPtr->count = index; inputPtr->done = 0; @@ -524,6 +530,7 @@ private unsafe bool ListInsert(int count, byte* ptr, ref TGarnetApi // Prepare header in input buffer inputPtr->header.type = GarnetObjectType.List; + inputPtr->header.flags = 0; inputPtr->header.ListOp = ListOperation.LINSERT; inputPtr->done = 0; inputPtr->count = 0; @@ -610,6 +617,7 @@ private unsafe bool ListRemove(int count, byte* ptr, ref TGarnetApi // Prepare header in input buffer inputPtr->header.type = GarnetObjectType.List; + inputPtr->header.flags = 0; inputPtr->header.ListOp = ListOperation.LREM; inputPtr->count = nCount; inputPtr->done = 0; @@ -818,6 +826,7 @@ public unsafe bool ListSet(int count, byte* ptr, ref TGarnetApi stor // Prepare header in input buffer inputPtr->header.type = GarnetObjectType.List; + inputPtr->header.flags = 0; inputPtr->header.ListOp = ListOperation.LSET; inputPtr->count = 0; inputPtr->done = 0; diff --git a/libs/server/Resp/Objects/SetCommands.cs b/libs/server/Resp/Objects/SetCommands.cs index e56cfe9976e..84ca427f560 100644 --- a/libs/server/Resp/Objects/SetCommands.cs +++ b/libs/server/Resp/Objects/SetCommands.cs @@ -67,6 +67,7 @@ private unsafe bool SetAdd(int count, byte* ptr, ref TGarnetApi stor // Prepare header in input buffer inputPtr->header.type = GarnetObjectType.Set; + inputPtr->header.flags = 0; inputPtr->header.SetOp = SetOperation.SADD; inputPtr->count = inputCount; inputPtr->done = setOpsCount; @@ -249,6 +250,7 @@ private unsafe bool SetRemove(int count, byte* ptr, ref TGarnetApi s // Prepare header in input buffer inputPtr->header.type = GarnetObjectType.Set; + inputPtr->header.flags = 0; inputPtr->header.SetOp = SetOperation.SREM; inputPtr->count = inputCount; inputPtr->done = setItemsDoneCount; @@ -329,6 +331,7 @@ private unsafe bool SetLength(int count, byte* ptr, ref TGarnetApi s // Prepare header in input buffer inputPtr->header.type = GarnetObjectType.Set; + inputPtr->header.flags = 0; inputPtr->header.SetOp = SetOperation.SCARD; inputPtr->count = 1; inputPtr->done = 0; @@ -396,6 +399,7 @@ private unsafe bool SetMembers(int count, byte* ptr, ref TGarnetApi // Prepare header in input buffer inputPtr->header.type = GarnetObjectType.Set; + inputPtr->header.flags = 0; inputPtr->header.SetOp = SetOperation.SMEMBERS; inputPtr->count = count; inputPtr->done = setItemsDoneCount; @@ -466,6 +470,7 @@ private unsafe bool SetIsMember(int count, byte* ptr, ref TGarnetApi // Prepare header in input buffer inputPtr->header.type = GarnetObjectType.Set; + inputPtr->header.flags = 0; inputPtr->header.SetOp = SetOperation.SISMEMBER; inputPtr->count = count - 2; inputPtr->done = 0; @@ -543,6 +548,7 @@ private unsafe bool SetPop(int count, byte* ptr, ref TGarnetApi stor // Prepare header in input buffer inputPtr->header.type = GarnetObjectType.Set; + inputPtr->header.flags = 0; inputPtr->header.SetOp = SetOperation.SPOP; inputPtr->count = int.MinValue; @@ -726,6 +732,7 @@ private unsafe bool SetRandomMember(int count, byte* ptr, ref TGarne // Prepare header in input buffer inputPtr->header.type = GarnetObjectType.Set; + inputPtr->header.flags = 0; inputPtr->header.SetOp = SetOperation.SRANDMEMBER; inputPtr->count = Int32.MinValue; diff --git a/libs/server/Resp/Objects/SharedObjectCommands.cs b/libs/server/Resp/Objects/SharedObjectCommands.cs index 6a217b1b82d..f3cf4930492 100644 --- a/libs/server/Resp/Objects/SharedObjectCommands.cs +++ b/libs/server/Resp/Objects/SharedObjectCommands.cs @@ -68,6 +68,7 @@ private unsafe bool ObjectScan(int count, byte* ptr, GarnetObjectTyp // ObjectInputHeader (*(ObjectInputHeader*)(pcurr)).header.type = objectType; + (*(ObjectInputHeader*)(pcurr)).header.flags = 0; switch (objectType) { diff --git a/libs/server/Resp/Objects/SortedSetCommands.cs b/libs/server/Resp/Objects/SortedSetCommands.cs index 9a92c91bf4c..158ad751c87 100644 --- a/libs/server/Resp/Objects/SortedSetCommands.cs +++ b/libs/server/Resp/Objects/SortedSetCommands.cs @@ -74,6 +74,7 @@ private unsafe bool SortedSetAdd(int count, byte* ptr, ref TGarnetAp // Prepare header in input buffer inputPtr->header.type = GarnetObjectType.SortedSet; + inputPtr->header.flags = 0; inputPtr->header.SortedSetOp = SortedSetOperation.ZADD; inputPtr->count = inputCount; inputPtr->done = zaddDoneCount; @@ -143,6 +144,7 @@ private unsafe bool SortedSetRemove(int count, byte* ptr, ref TGarne // Prepare header in input buffer rmwInput->header.type = GarnetObjectType.SortedSet; + rmwInput->header.flags = 0; rmwInput->header.SortedSetOp = SortedSetOperation.ZREM; rmwInput->count = inputCount; rmwInput->done = zaddDoneCount; @@ -226,6 +228,7 @@ private unsafe bool SortedSetLength(int count, byte* ptr, ref TGarne // Prepare header in input buffer inputPtr->header.type = GarnetObjectType.SortedSet; + inputPtr->header.flags = 0; inputPtr->header.SortedSetOp = SortedSetOperation.ZCARD; inputPtr->count = 1; inputPtr->done = 0; @@ -312,6 +315,7 @@ private unsafe bool SortedSetRange(int count, byte* ptr, SortedSetOp // Prepare header in input buffer inputPtr->header.type = GarnetObjectType.SortedSet; + inputPtr->header.flags = 0; inputPtr->header.SortedSetOp = op; inputPtr->count = count - 1; inputPtr->done = 0; @@ -411,6 +415,7 @@ private unsafe bool SortedSetScore(int count, byte* ptr, ref TGarnet // Prepare header in input buffer inputPtr->header.type = GarnetObjectType.SortedSet; + inputPtr->header.flags = 0; inputPtr->header.SortedSetOp = SortedSetOperation.ZSCORE; inputPtr->count = scoreKeySize; inputPtr->done = 0; @@ -485,6 +490,7 @@ private unsafe bool SortedSetScores(int count, byte* ptr, ref TGarne // Prepare header in input buffer inputPtr->header.type = GarnetObjectType.SortedSet; + inputPtr->header.flags = 0; inputPtr->header.SortedSetOp = SortedSetOperation.ZMSCORE; inputPtr->count = inputCount; inputPtr->done = 0; @@ -572,6 +578,7 @@ private unsafe bool SortedSetPop(int count, byte* ptr, SortedSetOper // Prepare header in input buffer inputPtr->header.type = GarnetObjectType.SortedSet; + inputPtr->header.flags = 0; inputPtr->header.SortedSetOp = op; inputPtr->count = popCount; inputPtr->done = zaddDoneCount; @@ -649,6 +656,7 @@ private unsafe bool SortedSetCount(int count, byte* ptr, ref TGarnet // Prepare header in input buffer inputPtr->header.type = GarnetObjectType.SortedSet; + inputPtr->header.flags = 0; inputPtr->header.SortedSetOp = SortedSetOperation.ZCOUNT; inputPtr->count = 0; inputPtr->done = 0; @@ -739,6 +747,7 @@ private unsafe bool SortedSetLengthByValue(int count, byte* ptr, Sor // Prepare header in input buffer inputPtr->header.type = GarnetObjectType.SortedSet; + inputPtr->header.flags = 0; inputPtr->header.SortedSetOp = op; inputPtr->count = 0; inputPtr->done = 0; @@ -823,6 +832,7 @@ private unsafe bool SortedSetIncrement(int count, byte* ptr, ref TGa // Prepare header in input buffer inputPtr->header.type = GarnetObjectType.SortedSet; + inputPtr->header.flags = 0; inputPtr->header.SortedSetOp = SortedSetOperation.ZINCRBY; inputPtr->count = count - 1; inputPtr->done = 0; @@ -925,6 +935,7 @@ private unsafe bool SortedSetRank(int count, byte* ptr, SortedSetOpe // Prepare header in input buffer inputPtr->header.type = GarnetObjectType.SortedSet; + inputPtr->header.flags = 0; inputPtr->header.SortedSetOp = op; inputPtr->count = memberSize; inputPtr->done = 0; @@ -997,6 +1008,7 @@ private unsafe bool SortedSetRemoveRange(int count, byte* ptr, Sorte // Prepare header in input buffer inputPtr->header.type = GarnetObjectType.SortedSet; + inputPtr->header.flags = 0; inputPtr->header.SortedSetOp = op; inputPtr->count = 0; inputPtr->done = 0; @@ -1101,6 +1113,7 @@ private unsafe bool SortedSetRandomMember(int count, byte* ptr, ref // Prepare header in input buffer inputPtr->header.type = GarnetObjectType.SortedSet; + inputPtr->header.flags = 0; inputPtr->header.SortedSetOp = SortedSetOperation.ZRANDMEMBER; inputPtr->count = count == 1 ? 1 : paramCount; inputPtr->done = withScoresSpan.SequenceEqual(includeWithScores) ? 1 : 0; diff --git a/libs/server/Resp/Objects/SortedSetGeoCommands.cs b/libs/server/Resp/Objects/SortedSetGeoCommands.cs index 2d9295688ad..9638eccfb2b 100644 --- a/libs/server/Resp/Objects/SortedSetGeoCommands.cs +++ b/libs/server/Resp/Objects/SortedSetGeoCommands.cs @@ -53,6 +53,7 @@ private unsafe bool GeoAdd(int count, byte* ptr, ref TGarnetApi stor // Prepare header in input buffer inputPtr->header.type = GarnetObjectType.SortedSet; + inputPtr->header.flags = 0; inputPtr->header.SortedSetOp = SortedSetOperation.GEOADD; inputPtr->count = inputCount; inputPtr->done = zaddDoneCount; @@ -154,6 +155,7 @@ private unsafe bool GeoCommands(int count, byte* ptr, SortedSetOpera // Prepare header in input buffer inputPtr->header.type = GarnetObjectType.SortedSet; + inputPtr->header.flags = 0; inputPtr->header.SortedSetOp = op; inputPtr->count = inputCount; diff --git a/libs/server/Storage/Session/ObjectStore/HashOps.cs b/libs/server/Storage/Session/ObjectStore/HashOps.cs index b378f8fd3b4..486f30cfb68 100644 --- a/libs/server/Storage/Session/ObjectStore/HashOps.cs +++ b/libs/server/Storage/Session/ObjectStore/HashOps.cs @@ -41,6 +41,7 @@ public unsafe GarnetStatus HashSet(ArgSlice key, ArgSlice field, // Prepare header in input buffer var rmwInput = (ObjectInputHeader*)input.ptr; rmwInput->header.type = GarnetObjectType.Hash; + rmwInput->header.flags = 0; rmwInput->header.HashOp = nx ? HashOperation.HSETNX : HashOperation.HSET; rmwInput->count = 1; rmwInput->done = 0; @@ -73,6 +74,7 @@ public unsafe GarnetStatus HashSet(ArgSlice key, (ArgSlice field // Prepare header in buffer var rmwInput = (ObjectInputHeader*)scratchBufferManager.CreateArgSlice(ObjectInputHeader.Size).ptr; rmwInput->header.type = GarnetObjectType.Hash; + rmwInput->header.flags = 0; rmwInput->header.HashOp = HashOperation.HSET; rmwInput->count = elements.Length; rmwInput->done = 0; @@ -127,6 +129,7 @@ public unsafe GarnetStatus HashDelete(ArgSlice key, ArgSlice[] f // Prepare header in buffer var rmwInput = (ObjectInputHeader*)scratchBufferManager.CreateArgSlice(ObjectInputHeader.Size).ptr; rmwInput->header.type = GarnetObjectType.Hash; + rmwInput->header.flags = 0; rmwInput->header.HashOp = HashOperation.HDEL; rmwInput->count = fields.Length; rmwInput->done = 0; @@ -197,6 +200,7 @@ public unsafe GarnetStatus HashGet(ArgSlice key, ArgSlice[] fiel // Prepare header in input buffer var rmwInput = (ObjectInputHeader*)scratchBufferManager.CreateArgSlice(ObjectInputHeader.Size).ptr; rmwInput->header.type = GarnetObjectType.Hash; + rmwInput->header.flags = 0; rmwInput->header.HashOp = fields == default ? HashOperation.HGETALL : HashOperation.HGET; rmwInput->count = fields == default ? 0 : fields.Length; rmwInput->done = 0; @@ -247,6 +251,7 @@ public unsafe GarnetStatus HashLength(ArgSlice key, out int item // Prepare header in input buffer var rmwInput = (ObjectInputHeader*)input.ptr; rmwInput->header.type = GarnetObjectType.Hash; + rmwInput->header.flags = 0; rmwInput->header.HashOp = HashOperation.HLEN; rmwInput->count = 1; rmwInput->done = 0; @@ -278,6 +283,7 @@ public unsafe GarnetStatus HashExists(ArgSlice key, ArgSlice fie // Prepare header in input buffer var rmwInput = (ObjectInputHeader*)input.ptr; rmwInput->header.type = GarnetObjectType.Hash; + rmwInput->header.flags = 0; rmwInput->header.HashOp = HashOperation.HEXISTS; rmwInput->count = 1; rmwInput->done = 0; @@ -308,6 +314,7 @@ public unsafe GarnetStatus HashRandomField(ArgSlice key, out Arg // Prepare header in input buffer var rmwInput = (ObjectInputHeader*)scratchBufferManager.CreateArgSlice(ObjectInputHeader.Size).ptr; rmwInput->header.type = GarnetObjectType.Hash; + rmwInput->header.flags = 0; rmwInput->header.HashOp = HashOperation.HRANDFIELD; rmwInput->count = 2; rmwInput->done = 0; @@ -350,6 +357,7 @@ public unsafe GarnetStatus HashRandomField(ArgSlice key, int cou // Prepare header in input buffer var rmwInput = (ObjectInputHeader*)scratchBufferManager.CreateArgSlice(ObjectInputHeader.Size).ptr; rmwInput->header.type = GarnetObjectType.Hash; + rmwInput->header.flags = 0; rmwInput->header.HashOp = HashOperation.HRANDFIELD; rmwInput->count = 4; rmwInput->done = 0; @@ -417,6 +425,7 @@ public unsafe GarnetStatus HashScan(ArgSlice key, long cursor, s var inputSize = ObjectInputHeader.Size + sizeof(int); var rmwInput = scratchBufferManager.CreateArgSlice(inputSize).ptr; ((ObjectInputHeader*)rmwInput)->header.type = GarnetObjectType.Hash; + ((ObjectInputHeader*)rmwInput)->header.flags = 0; ((ObjectInputHeader*)rmwInput)->header.HashOp = HashOperation.HSCAN; // Number of tokens in the input after the header (match, value, count, value) diff --git a/libs/server/Storage/Session/ObjectStore/ListOps.cs b/libs/server/Storage/Session/ObjectStore/ListOps.cs index 1d551677b3b..5e5888e53e8 100644 --- a/libs/server/Storage/Session/ObjectStore/ListOps.cs +++ b/libs/server/Storage/Session/ObjectStore/ListOps.cs @@ -33,6 +33,7 @@ public unsafe GarnetStatus ListPush(ArgSlice key, ArgSlice[] ele // Prepare header in buffer var rmwInput = (ObjectInputHeader*)scratchBufferManager.CreateArgSlice(ObjectInputHeader.Size).ptr; rmwInput->header.type = GarnetObjectType.List; + rmwInput->header.flags = 0; rmwInput->header.ListOp = lop; rmwInput->count = elements.Length; rmwInput->done = 0; @@ -75,6 +76,7 @@ public unsafe GarnetStatus ListPush(ArgSlice key, ArgSlice eleme // Prepare header in input buffer var rmwInput = (ObjectInputHeader*)input.ptr; rmwInput->header.type = GarnetObjectType.List; + rmwInput->header.flags = 0; rmwInput->header.ListOp = lop; rmwInput->count = 1; rmwInput->done = 0; @@ -126,6 +128,7 @@ public unsafe GarnetStatus ListPop(ArgSlice key, int count, List // Prepare header in input buffer var rmwInput = (ObjectInputHeader*)input.ptr; rmwInput->header.type = GarnetObjectType.List; + rmwInput->header.flags = 0; rmwInput->header.ListOp = lop; rmwInput->count = count; rmwInput->done = 0; @@ -163,6 +166,7 @@ public unsafe GarnetStatus ListLength(ArgSlice key, ref TObjectC // Prepare header in input buffer var rmwInput = (ObjectInputHeader*)input.ptr; rmwInput->header.type = GarnetObjectType.List; + rmwInput->header.flags = 0; rmwInput->header.ListOp = ListOperation.LLEN; rmwInput->count = count; rmwInput->done = 0; @@ -298,6 +302,7 @@ public unsafe bool ListTrim(ArgSlice key, int start, int stop, r // Prepare header in input buffer var rmwInput = (ObjectInputHeader*)input.ptr; rmwInput->header.type = GarnetObjectType.List; + rmwInput->header.flags = 0; rmwInput->header.ListOp = ListOperation.LTRIM; rmwInput->count = start; rmwInput->done = stop; diff --git a/libs/server/Storage/Session/ObjectStore/SetOps.cs b/libs/server/Storage/Session/ObjectStore/SetOps.cs index 2e4533747bd..03ad1f93f9f 100644 --- a/libs/server/Storage/Session/ObjectStore/SetOps.cs +++ b/libs/server/Storage/Session/ObjectStore/SetOps.cs @@ -36,6 +36,7 @@ internal unsafe GarnetStatus SetAdd(ArgSlice key, ArgSlice membe // Prepare header in input buffer var rmwInput = (ObjectInputHeader*)input.ptr; rmwInput->header.type = GarnetObjectType.Set; + rmwInput->header.flags = 0; rmwInput->header.SetOp = SetOperation.SADD; rmwInput->count = 1; rmwInput->done = 0; @@ -68,6 +69,7 @@ internal unsafe GarnetStatus SetAdd(ArgSlice key, ArgSlice[] mem // Prepare header in buffer var rmwInput = (ObjectInputHeader*)scratchBufferManager.CreateArgSlice(ObjectInputHeader.Size).ptr; rmwInput->header.type = GarnetObjectType.Set; + rmwInput->header.flags = 0; rmwInput->header.SetOp = SetOperation.SADD; rmwInput->count = members.Length; rmwInput->done = 0; @@ -108,6 +110,7 @@ internal unsafe GarnetStatus SetRemove(ArgSlice key, ArgSlice me // Prepare header in input buffer var rmwInput = (ObjectInputHeader*)input.ptr; rmwInput->header.type = GarnetObjectType.Set; + rmwInput->header.flags = 0; rmwInput->header.SetOp = SetOperation.SREM; rmwInput->count = 1; rmwInput->done = 0; @@ -141,6 +144,7 @@ internal unsafe GarnetStatus SetRemove(ArgSlice key, ArgSlice[] // Prepare header in input buffer var rmwInput = (ObjectInputHeader*)scratchBufferManager.CreateArgSlice(ObjectInputHeader.Size).ptr; rmwInput->header.type = GarnetObjectType.Set; + rmwInput->header.flags = 0; rmwInput->header.SetOp = SetOperation.SREM; rmwInput->count = members.Length; rmwInput->done = 0; @@ -180,6 +184,7 @@ internal unsafe GarnetStatus SetLength(ArgSlice key, out int cou // Prepare header in input buffer var rmwInput = (ObjectInputHeader*)input.ptr; rmwInput->header.type = GarnetObjectType.Set; + rmwInput->header.flags = 0; rmwInput->header.SetOp = SetOperation.SCARD; rmwInput->count = 1; rmwInput->done = 0; @@ -210,6 +215,7 @@ internal unsafe GarnetStatus SetMembers(ArgSlice key, out ArgSli // Prepare header in input buffer var rmwInput = (ObjectInputHeader*)input.ptr; rmwInput->header.type = GarnetObjectType.Set; + rmwInput->header.flags = 0; rmwInput->header.SetOp = SetOperation.SMEMBERS; rmwInput->count = 1; rmwInput->done = 0; @@ -266,6 +272,7 @@ internal unsafe GarnetStatus SetPop(ArgSlice key, int count, out // Prepare header in input buffer var rmwInput = (ObjectInputHeader*)input.ptr; rmwInput->header.type = GarnetObjectType.Set; + rmwInput->header.flags = 0; rmwInput->header.SetOp = SetOperation.SPOP; rmwInput->count = count; rmwInput->done = 0; @@ -308,6 +315,7 @@ public unsafe GarnetStatus SetScan(ArgSlice key, long cursor, st var inputSize = ObjectInputHeader.Size + sizeof(int); var rmwInput = scratchBufferManager.CreateArgSlice(inputSize).ptr; ((ObjectInputHeader*)rmwInput)->header.type = GarnetObjectType.Set; + ((ObjectInputHeader*)rmwInput)->header.flags = 0; ((ObjectInputHeader*)rmwInput)->header.SetOp = SetOperation.SSCAN; // Number of tokens in the input after the header (match, value, count, value) diff --git a/libs/server/Storage/Session/ObjectStore/SortedSetOps.cs b/libs/server/Storage/Session/ObjectStore/SortedSetOps.cs index ff5fe77e957..ed817c47015 100644 --- a/libs/server/Storage/Session/ObjectStore/SortedSetOps.cs +++ b/libs/server/Storage/Session/ObjectStore/SortedSetOps.cs @@ -37,6 +37,7 @@ public unsafe GarnetStatus SortedSetAdd(ArgSlice key, ArgSlice s // Prepare header in input buffer var rmwInput = (ObjectInputHeader*)input.ptr; rmwInput->header.type = GarnetObjectType.SortedSet; + rmwInput->header.flags = 0; rmwInput->header.SortedSetOp = SortedSetOperation.ZADD; rmwInput->count = 1; rmwInput->done = 0; @@ -68,6 +69,7 @@ public unsafe GarnetStatus SortedSetAdd(ArgSlice key, (ArgSlice // Prepare header in buffer var rmwInput = (ObjectInputHeader*)scratchBufferManager.CreateArgSlice(ObjectInputHeader.Size).ptr; rmwInput->header.type = GarnetObjectType.SortedSet; + rmwInput->header.flags = 0; rmwInput->header.SortedSetOp = SortedSetOperation.ZADD; rmwInput->count = inputs.Length; rmwInput->done = 0; @@ -109,6 +111,7 @@ public unsafe GarnetStatus SortedSetRemove(byte[] key, ArgSlice // Prepare header in input buffer var rmwInput = (ObjectInputHeader*)_inputSlice.ptr; rmwInput->header.type = GarnetObjectType.SortedSet; + rmwInput->header.flags = 0; rmwInput->header.SortedSetOp = SortedSetOperation.ZREM; rmwInput->count = 1; rmwInput->done = 0; @@ -140,6 +143,7 @@ public unsafe GarnetStatus SortedSetRemove(byte[] key, ArgSlice[ // Prepare header in input buffer var rmwInput = (ObjectInputHeader*)scratchBufferManager.CreateArgSlice(ObjectInputHeader.Size).ptr; rmwInput->header.type = GarnetObjectType.SortedSet; + rmwInput->header.flags = 0; rmwInput->header.SortedSetOp = SortedSetOperation.ZREM; rmwInput->count = members.Length; rmwInput->done = 0; @@ -190,6 +194,7 @@ public unsafe GarnetStatus SortedSetRemoveRangeByLex(ArgSlice ke // Prepare header in input buffer var rmwInput = (ObjectInputHeader*)_inputSlice.ptr; rmwInput->header.type = GarnetObjectType.SortedSet; + rmwInput->header.flags = 0; rmwInput->header.SortedSetOp = SortedSetOperation.ZREMRANGEBYLEX; rmwInput->count = 3; rmwInput->done = 0; @@ -234,6 +239,7 @@ public unsafe GarnetStatus SortedSetRemoveRangeByScore(ArgSlice // Prepare header in input buffer var rmwInput = (ObjectInputHeader*)_inputSlice.ptr; rmwInput->header.type = GarnetObjectType.SortedSet; + rmwInput->header.flags = 0; rmwInput->header.SortedSetOp = SortedSetOperation.ZREMRANGEBYSCORE; rmwInput->count = 3; rmwInput->done = 0; @@ -278,6 +284,7 @@ public unsafe GarnetStatus SortedSetRemoveRangeByRank(ArgSlice k // Prepare header in input buffer var rmwInput = (ObjectInputHeader*)_inputSlice.ptr; rmwInput->header.type = GarnetObjectType.SortedSet; + rmwInput->header.flags = 0; rmwInput->header.SortedSetOp = SortedSetOperation.ZREMRANGEBYRANK; rmwInput->count = 3; rmwInput->done = 0; @@ -312,6 +319,7 @@ public unsafe GarnetStatus SortedSetPop(ArgSlice key, int count, var inputPtr = (ObjectInputHeader*)input.ptr; inputPtr->header.type = GarnetObjectType.SortedSet; + inputPtr->header.flags = 0; inputPtr->header.SortedSetOp = lowScoresFirst ? SortedSetOperation.ZPOPMIN : SortedSetOperation.ZPOPMAX; inputPtr->count = count; inputPtr->done = 0; @@ -357,6 +365,7 @@ public unsafe GarnetStatus SortedSetIncrement(ArgSlice key, doub // Prepare header in input buffer var rmwInput = (ObjectInputHeader*)_inputSlice.ptr; rmwInput->header.type = GarnetObjectType.SortedSet; + rmwInput->header.flags = 0; rmwInput->header.SortedSetOp = SortedSetOperation.ZINCRBY; rmwInput->count = 3; rmwInput->done = 0; @@ -400,6 +409,7 @@ public unsafe GarnetStatus SortedSetLength(ArgSlice key, out int // Prepare header in input buffer var rmwInput = (ObjectInputHeader*)input.ptr; rmwInput->header.type = GarnetObjectType.SortedSet; + rmwInput->header.flags = 0; rmwInput->header.SortedSetOp = SortedSetOperation.ZCARD; rmwInput->count = 1; rmwInput->done = 0; @@ -462,8 +472,9 @@ public unsafe GarnetStatus SortedSetRange(ArgSlice key, ArgSlice // Prepare header in input buffer var inputPtr = (ObjectInputHeader*)scratchBufferManager.CreateArgSlice(ObjectInputHeader.Size).ptr; - inputPtr->header.SortedSetOp = sortedOperation; inputPtr->header.type = GarnetObjectType.SortedSet; + inputPtr->header.flags = 0; + inputPtr->header.SortedSetOp = sortedOperation; inputPtr->count = 2 + (operation != default ? 1 : 0) + (sortedOperation != SortedSetOperation.ZREVRANGE && reverse ? 1 : 0) + (limit != default ? 3 : 0); inputPtr->done = 0; @@ -616,6 +627,7 @@ public unsafe GarnetStatus SortedSetScan(ArgSlice key, long curs var inputSize = ObjectInputHeader.Size + sizeof(int); var rmwInput = scratchBufferManager.CreateArgSlice(inputSize).ptr; ((ObjectInputHeader*)rmwInput)->header.type = GarnetObjectType.SortedSet; + ((ObjectInputHeader*)rmwInput)->header.flags = 0; ((ObjectInputHeader*)rmwInput)->header.SortedSetOp = SortedSetOperation.ZSCAN; // Number of tokens in the input after the header (match, value, count, value) diff --git a/test/Garnet.test/RespTests.cs b/test/Garnet.test/RespTests.cs index 09d97e53bc0..f777fd281de 100644 --- a/test/Garnet.test/RespTests.cs +++ b/test/Garnet.test/RespTests.cs @@ -1416,6 +1416,91 @@ public void KeyExpireOptionsTest(string command) Assert.IsTrue(time.Value.TotalMilliseconds <= (double)((int)args[1]) && time.Value.TotalMilliseconds > 0); } + [Test] + public async Task ReAddExpiredKey() + { + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var db = redis.GetDatabase(0); + + const string key = "x:expire_trap"; + + // Set + { + db.KeyDelete(key); + db.SetAdd(key, "v1"); + + Assert.IsTrue(db.KeyExists(key), $"KeyExists after initial add"); + Assert.AreEqual("1", db.Execute("EXISTS", key).ToString(), "EXISTS after initial add"); + var actualScan = db.Execute("SCAN", "0"); + Assert.AreEqual(1, ((RedisValue[])((RedisResult[])actualScan!)[1]).Length, "SCAN after initial ADD"); + + db.KeyExpire(key, TimeSpan.FromSeconds(1)); + await Task.Delay(TimeSpan.FromSeconds(2)); + + Assert.IsFalse(db.KeyExists(key), $"KeyExists after expiration"); + Assert.AreEqual("0", db.Execute("EXISTS", key).ToString(), "EXISTS after ADD expiration"); + actualScan = db.Execute("SCAN", "0"); + Assert.AreEqual(0, ((RedisValue[])((RedisResult[])actualScan!)[1]).Length, "SCAN after ADD expiration"); + + db.SetAdd(key, "v2"); + + Assert.IsTrue(db.KeyExists(key), $"KeyExists after initial re-ADD"); + Assert.AreEqual("1", db.Execute("EXISTS", key).ToString(), "EXISTS after initial re-ADD"); + actualScan = db.Execute("SCAN", "0"); + Assert.AreEqual(1, ((RedisValue[])((RedisResult[])actualScan!)[1]).Length, "SCAN after initial re-ADD"); + } + // List + { + db.KeyDelete(key); + db.ListRightPush(key, "v1"); + + Assert.IsTrue(db.KeyExists(key), $"KeyExists after initial RPUSH"); + Assert.AreEqual("1", db.Execute("EXISTS", key).ToString(), "EXISTS after initial RPUSH"); + var actualScan = db.Execute("SCAN", "0"); + Assert.AreEqual(1, ((RedisValue[])((RedisResult[])actualScan!)[1]).Length, "SCAN after initial RPUSH"); + + db.KeyExpire(key, TimeSpan.FromSeconds(1)); + await Task.Delay(TimeSpan.FromSeconds(2)); + + Assert.IsFalse(db.KeyExists(key), $"KeyExists after expiration"); + Assert.AreEqual("0", db.Execute("EXISTS", key).ToString(), "EXISTS after RPUSH expiration"); + actualScan = db.Execute("SCAN", "0"); + Assert.AreEqual(0, ((RedisValue[])((RedisResult[])actualScan!)[1]).Length, "SCAN after RPUSH expiration"); + + db.ListRightPush(key, "v2"); + + Assert.IsTrue(db.KeyExists(key), $"KeyExists after initial re-RPUSH"); + Assert.AreEqual("1", db.Execute("EXISTS", key).ToString(), "EXISTS after initial re-RPUSH"); + actualScan = db.Execute("SCAN", "0"); + Assert.AreEqual(1, ((RedisValue[])((RedisResult[])actualScan!)[1]).Length, "SCAN after initial re-RPUSH"); + } + // Hash + { + db.KeyDelete(key); + db.HashSet(key, "f1", "v1"); + + Assert.IsTrue(db.KeyExists(key), $"KeyExists after initial HSET"); + Assert.AreEqual("1", db.Execute("EXISTS", key).ToString(), "EXISTS after initial HSET"); + var actualScan = db.Execute("SCAN", "0"); + Assert.AreEqual(1, ((RedisValue[])((RedisResult[])actualScan!)[1]).Length, "SCAN after initial HSET"); + + db.KeyExpire(key, TimeSpan.FromSeconds(1)); + await Task.Delay(TimeSpan.FromSeconds(2)); + + Assert.IsFalse(db.KeyExists(key), $"KeyExists after expiration"); + Assert.AreEqual("0", db.Execute("EXISTS", key).ToString(), "EXISTS after HSET expiration"); + actualScan = db.Execute("SCAN", "0"); + Assert.AreEqual(0, ((RedisValue[])((RedisResult[])actualScan!)[1]).Length, "SCAN after HSET expiration"); + + db.HashSet(key, "f1", "v2"); + + Assert.IsTrue(db.KeyExists(key), $"KeyExists after initial re-HSET"); + Assert.AreEqual("1", db.Execute("EXISTS", key).ToString(), "EXISTS after initial re-HSET"); + actualScan = db.Execute("SCAN", "0"); + Assert.AreEqual(1, ((RedisValue[])((RedisResult[])actualScan!)[1]).Length, "SCAN after initial re-HSET"); + } + } + [Test] public void GetSliceTest() { From 42ccdf31c797855af6e36fa2e057ef26e843792d Mon Sep 17 00:00:00 2001 From: priyanjaligupta <168761763+priyanjaligupta@users.noreply.github.com> Date: Mon, 6 May 2024 21:40:17 +0530 Subject: [PATCH 2/4] bug fix Delete keys locally if _copyOption is set to false. (#352) Co-authored-by: Arvind Chandavarapu Co-authored-by: vazois <96085550+vazois@users.noreply.github.com> --- libs/cluster/Server/Migration/MigrateSessionSlots.cs | 2 +- libs/cluster/Server/Migration/MigrationDriver.cs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/libs/cluster/Server/Migration/MigrateSessionSlots.cs b/libs/cluster/Server/Migration/MigrateSessionSlots.cs index 1089a2de214..33020408a89 100644 --- a/libs/cluster/Server/Migration/MigrateSessionSlots.cs +++ b/libs/cluster/Server/Migration/MigrateSessionSlots.cs @@ -56,7 +56,7 @@ private bool MigrateSlotsDataDriver() /// public void DeleteKeysInSlot() { - if (!_copyOption) + if (_copyOption) return; ClusterManager.DeleteKeysInSlotsFromMainStore(localServerSession.BasicGarnetApi, _sslots); diff --git a/libs/cluster/Server/Migration/MigrationDriver.cs b/libs/cluster/Server/Migration/MigrationDriver.cs index e84e6e3afa5..a243a9611aa 100644 --- a/libs/cluster/Server/Migration/MigrationDriver.cs +++ b/libs/cluster/Server/Migration/MigrationDriver.cs @@ -29,8 +29,8 @@ public bool TryStartMigrationTask(out ReadOnlySpan errorMessage) return false; } - // Delete keys locally if option enabled - if (_copyOption) + // Delete keys locally if _copyOption is set to false. + if (!_copyOption) DeleteKeys(_keysWithSize); Status = MigrateState.SUCCESS; } From b8a26744446ab493352e058b997f2b27faf115ef Mon Sep 17 00:00:00 2001 From: vazois <96085550+vazois@users.noreply.github.com> Date: Mon, 6 May 2024 14:08:05 -0700 Subject: [PATCH 3/4] Cluster Mode SELECT Command (#362) * return error select not supported with cluster mode * remove second check for enable cluster --- libs/server/Resp/ArrayCommands.cs | 19 ++++++++++++++----- libs/server/Resp/CmdStrings.cs | 3 +++ 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/libs/server/Resp/ArrayCommands.cs b/libs/server/Resp/ArrayCommands.cs index 1e05e898549..d430740138e 100644 --- a/libs/server/Resp/ArrayCommands.cs +++ b/libs/server/Resp/ArrayCommands.cs @@ -668,15 +668,25 @@ private bool NetworkSELECT(byte* ptr) readHead = (int)(ptr - recvBufferPtr); - if (string.Equals(result, "0")) + if (storeWrapper.serverOptions.EnableCluster) { - while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) + // Cluster mode does not allow DBID + while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_SELECT_CLUSTER_MODE, ref dcurr, dend)) SendAndReset(); } else { - while (!RespWriteUtils.WriteError("ERR invalid database index."u8, ref dcurr, dend)) - SendAndReset(); + + if (string.Equals(result, "0")) + { + while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) + SendAndReset(); + } + else + { + while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_SELECT_INVALID_INDEX, ref dcurr, dend)) + SendAndReset(); + } } return true; } @@ -728,7 +738,6 @@ private bool NetworkKEYS(int count, byte* ptr, ref TGarnetApi storag return true; } - private bool NetworkSCAN(int count, byte* ptr, ref TGarnetApi storageApi) where TGarnetApi : IGarnetApi { diff --git a/libs/server/Resp/CmdStrings.cs b/libs/server/Resp/CmdStrings.cs index af3a386e044..9b3fd23c8b1 100644 --- a/libs/server/Resp/CmdStrings.cs +++ b/libs/server/Resp/CmdStrings.cs @@ -124,6 +124,9 @@ public static ReadOnlySpan GetConfig(ReadOnlySpan key) public static ReadOnlySpan RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER => "ERR value is not an integer or out of range."u8; public static ReadOnlySpan RESP_ERR_GENERIC_UKNOWN_SUBCOMMAND => "ERR Unknown subcommand. Try LATENCY HELP."u8; public static ReadOnlySpan RESP_ERR_GENERIC_INDEX_OUT_RANGE => "ERR index out of range"u8; + public static ReadOnlySpan RESP_ERR_GENERIC_SELECT_INVALID_INDEX => "ERR invalid database index."u8; + public static ReadOnlySpan RESP_ERR_GENERIC_SELECT_CLUSTER_MODE => "ERR SELECT is not allowed in cluster mode"u8; + /// /// Response string templates /// From a6218f9041bc84b5b7da8199302a7d5372da468f Mon Sep 17 00:00:00 2001 From: vazois <96085550+vazois@users.noreply.github.com> Date: Mon, 6 May 2024 14:48:28 -0700 Subject: [PATCH 4/4] Garnet Epoch Protection Fix & Misc (#361) * fix wait for config transition * code cleanup * add max retry on replica attach to avoid getting stuck in prod environment * update waitcheckpoint util to reflect correct check against lastSaveTime * default AOF start to 64 --- libs/cluster/Server/ClusterProvider.cs | 2 +- .../Server/Replication/CheckpointEntry.cs | 2 +- .../PrimaryOps/ReplicaSyncSession.cs | 7 +++- libs/server/StoreWrapper.cs | 19 ++++++----- .../ClusterReplicationTests.cs | 33 ++++++++++++++----- test/Garnet.test.cluster/ClusterTestUtils.cs | 31 ++++++++++++++--- 6 files changed, 69 insertions(+), 25 deletions(-) diff --git a/libs/cluster/Server/ClusterProvider.cs b/libs/cluster/Server/ClusterProvider.cs index 4a1de339702..302d65c72aa 100644 --- a/libs/cluster/Server/ClusterProvider.cs +++ b/libs/cluster/Server/ClusterProvider.cs @@ -287,7 +287,7 @@ internal bool WaitForConfigTransition() foreach (var s in sessions) { var entryEpoch = s.LocalCurrentEpoch; - if (entryEpoch != 0 && entryEpoch >= currentEpoch) + if (entryEpoch != 0 && entryEpoch < currentEpoch) goto retry; } break; diff --git a/libs/cluster/Server/Replication/CheckpointEntry.cs b/libs/cluster/Server/Replication/CheckpointEntry.cs index b20a589a3e1..a775f284440 100644 --- a/libs/cluster/Server/Replication/CheckpointEntry.cs +++ b/libs/cluster/Server/Replication/CheckpointEntry.cs @@ -43,7 +43,7 @@ public CheckpointEntry() } public long GetMinAofCoveredAddress() - => Math.Min(storeCheckpointCoveredAofAddress, objectCheckpointCoveredAofAddress); + => Math.Max(Math.Min(storeCheckpointCoveredAofAddress, objectCheckpointCoveredAofAddress), 64); /// /// Indicate addition of new reader by trying to increment reader counter diff --git a/libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSession.cs b/libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSession.cs index 44e2e02f0c6..425cb3cb54a 100644 --- a/libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSession.cs +++ b/libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSession.cs @@ -47,6 +47,7 @@ public void Dispose() public async Task SendCheckpoint() { errorMsg = default; + var retryCount = 0; var storeCkptManager = clusterProvider.GetReplicationLogCheckpointManager(StoreType.Main); var objectStoreCkptManager = clusterProvider.GetReplicationLogCheckpointManager(StoreType.Object); var current = clusterProvider.clusterManager.CurrentConfig; @@ -96,6 +97,8 @@ public async Task SendCheckpoint() { localEntry.RemoveReader(); _ = Thread.Yield(); + if (retryCount++ > 10) + throw new GarnetException("Attaching replica maximum retry count reached!"); goto retry; } } @@ -110,6 +113,8 @@ public async Task SendCheckpoint() { localEntry.RemoveReader(); _ = Thread.Yield(); + if (retryCount++ > 10) + throw new GarnetException("Attaching replica maximum retry count reached!"); goto retry; } } @@ -187,7 +192,7 @@ public async Task SendCheckpoint() var beginAddress = RecoveredReplicationOffset; if (!recoverFromRemote) { - //If replica is ahead of this primary it will force itself to forget and start syncing from RecoveredReplicationOffset + // If replica is ahead of this primary it will force itself to forget and start syncing from RecoveredReplicationOffset if (replicaAofBeginAddress > ReplicationManager.kFirstValidAofAddress && replicaAofBeginAddress > RecoveredReplicationOffset) { logger?.LogInformation( diff --git a/libs/server/StoreWrapper.cs b/libs/server/StoreWrapper.cs index 7c7efba4c2d..6bf134f3609 100644 --- a/libs/server/StoreWrapper.cs +++ b/libs/server/StoreWrapper.cs @@ -220,7 +220,8 @@ public void RecoverCheckpoint(bool recoverMainStoreFromToken = false, bool recov { storeVersion = !recoverMainStoreFromToken ? store.Recover() : store.Recover(storeIndexToken, storeHlogToken); if (objectStore != null) objectStoreVersion = !recoverObjectStoreFromToken ? objectStore.Recover() : objectStore.Recover(objectStoreIndexToken, objectStoreHlogToken); - lastSaveTime = DateTimeOffset.UtcNow; + if (storeVersion > 0 || objectStoreVersion > 0) + lastSaveTime = DateTimeOffset.UtcNow; } catch (Exception ex) { @@ -269,8 +270,8 @@ public long ReplayAOF(long untilAddress = -1) long replicationOffset = 0; try { - //When replaying AOF we do not want to write record again to AOF. - //So initialize local AofProcessor with recordToAof: false. + // When replaying AOF we do not want to write record again to AOF. + // So initialize local AofProcessor with recordToAof: false. var aofProcessor = new AofProcessor(this, recordToAof: false, logger); aofProcessor.Recover(untilAddress); aofProcessor.Dispose(); @@ -578,22 +579,22 @@ void CompleteCheckpoint() /// /// Take a checkpoint if no checkpoint was taken after the provided time offset /// - /// + /// /// - public async Task TakeOnDemandCheckpoint(DateTimeOffset afterTime) + public async Task TakeOnDemandCheckpoint(DateTimeOffset entryTime) { - //Take lock to ensure not other task will be taking a checkpoint + // Take lock to ensure no other task will be taking a checkpoint while (!StartCheckpoint()) await Task.Yield(); - //If an external task has taken a checkpoint after the provided afterTime return - if (this.lastSaveTime > afterTime) + // If an external task has taken a checkpoint beyond the provided entryTime return + if (this.lastSaveTime > entryTime) { CompleteCheckpoint(); return; } - //If no newer checkpoint was taken compared to the provided afterTime take a checkpoint + // Necessary to take a checkpoint because the latest checkpoint is before entryTime await CheckpointTask(StoreType.All, logger: logger); } diff --git a/test/Garnet.test.cluster/ClusterReplicationTests.cs b/test/Garnet.test.cluster/ClusterReplicationTests.cs index 414434d4c93..eeed0c9378d 100644 --- a/test/Garnet.test.cluster/ClusterReplicationTests.cs +++ b/test/Garnet.test.cluster/ClusterReplicationTests.cs @@ -236,6 +236,9 @@ public void ClusterSRPrimaryCheckpoint([Values] bool performRMW, [Values] bool d context.PopulatePrimary(ref context.kvPairs, keyLength, kvpairCount, 0); else context.PopulatePrimaryRMW(ref context.kvPairs, keyLength, kvpairCount, 0, addCount); + + var primaryLastSaveTime = context.clusterTestUtils.LastSave(0, logger: context.logger); + var replicaLastSaveTime = context.clusterTestUtils.LastSave(1, logger: context.logger); context.clusterTestUtils.Checkpoint(0, logger: context.logger); // Populate Primary @@ -243,8 +246,8 @@ public void ClusterSRPrimaryCheckpoint([Values] bool performRMW, [Values] bool d context.ValidateKVCollectionAgainstReplica(ref context.kvPairs, 1); context.clusterTestUtils.WaitForReplicaAofSync(0, 1, context.logger); - context.clusterTestUtils.WaitFirstCheckpoint(0, context.logger); - context.clusterTestUtils.WaitFirstCheckpoint(1, context.logger); + context.clusterTestUtils.WaitCheckpoint(0, primaryLastSaveTime, logger: context.logger); + context.clusterTestUtils.WaitCheckpoint(1, replicaLastSaveTime, logger: context.logger); // Shutdown secondary context.nodes[1].Store.CommitAOF(true); @@ -599,9 +602,11 @@ public void ClusterReplicationSimpleFailover([Values] bool performRMW, [Values] if (checkpoint) { + var primaryLastSaveTime = context.clusterTestUtils.LastSave(0, logger: context.logger); + var replicaLastSaveTime = context.clusterTestUtils.LastSave(1, logger: context.logger); context.clusterTestUtils.Checkpoint(0); - context.clusterTestUtils.WaitFirstCheckpoint(0, logger: context.logger); - context.clusterTestUtils.WaitFirstCheckpoint(1, logger: context.logger); + context.clusterTestUtils.WaitCheckpoint(0, primaryLastSaveTime, logger: context.logger); + context.clusterTestUtils.WaitCheckpoint(1, replicaLastSaveTime, logger: context.logger); } #region InitiateFailover @@ -667,8 +672,9 @@ public void ClusterFailoverAttachReplicas([Values] bool performRMW, [Values] boo if (takePrimaryCheckpoint) { + var primaryLastSaveTime = context.clusterTestUtils.LastSave(0, logger: context.logger); context.clusterTestUtils.Checkpoint(0, logger: context.logger); - context.clusterTestUtils.WaitFirstCheckpoint(0, logger: context.logger); + context.clusterTestUtils.WaitCheckpoint(0, primaryLastSaveTime, logger: context.logger); } // Wait for replication offsets to synchronize @@ -692,8 +698,9 @@ public void ClusterFailoverAttachReplicas([Values] bool performRMW, [Values] boo if (takeNewPrimaryCheckpoint) { + var newPrimaryLastSaveTime = context.clusterTestUtils.LastSave(1, logger: context.logger); context.clusterTestUtils.Checkpoint(1, logger: context.logger); - context.clusterTestUtils.WaitFirstCheckpoint(1, logger: context.logger); + context.clusterTestUtils.WaitCheckpoint(1, newPrimaryLastSaveTime, logger: context.logger); } context.clusterTestUtils.WaitForReplicaAofSync(1, 2, context.logger); @@ -912,11 +919,19 @@ void ClusterDivergentReplicasTest(bool performRMW, bool disableObjects, bool ckp } else context.PopulatePrimaryWithObjects(ref context.kvPairsObj, keyLength, kvpairCount, primaryIndex: oldPrimaryIndex, set: set); - if (ckptBeforeDivergence) context.clusterTestUtils.Checkpoint(oldPrimaryIndex, logger: context.logger); + if (ckptBeforeDivergence) + { + var oldPrimaryLastSaveTime = context.clusterTestUtils.LastSave(oldPrimaryIndex, logger: context.logger); + var newPrimaryLastSaveTime = context.clusterTestUtils.LastSave(newPrimaryIndex, logger: context.logger); + var replicaLastSaveTime = context.clusterTestUtils.LastSave(replicaIndex, logger: context.logger); + context.clusterTestUtils.Checkpoint(oldPrimaryIndex, logger: context.logger); + context.clusterTestUtils.WaitCheckpoint(oldPrimaryIndex, oldPrimaryLastSaveTime, logger: context.logger); + context.clusterTestUtils.WaitCheckpoint(newPrimaryIndex, newPrimaryLastSaveTime, logger: context.logger); + context.clusterTestUtils.WaitCheckpoint(replicaIndex, replicaLastSaveTime, logger: context.logger); + } + context.clusterTestUtils.WaitForReplicaAofSync(oldPrimaryIndex, newPrimaryIndex, context.logger); context.clusterTestUtils.WaitForReplicaAofSync(oldPrimaryIndex, replicaIndex, context.logger); - context.clusterTestUtils.WaitFirstCheckpoint(newPrimaryIndex, logger: context.logger); - context.clusterTestUtils.WaitFirstCheckpoint(replicaIndex, logger: context.logger); // Make this replica of no-one _ = context.clusterTestUtils.ReplicaOf(1, logger: context.logger); diff --git a/test/Garnet.test.cluster/ClusterTestUtils.cs b/test/Garnet.test.cluster/ClusterTestUtils.cs index 1964a462ce7..c0531be2b5a 100644 --- a/test/Garnet.test.cluster/ClusterTestUtils.cs +++ b/test/Garnet.test.cluster/ClusterTestUtils.cs @@ -2647,16 +2647,39 @@ public void Checkpoint(IPEndPoint endPoint, ILogger logger = null) } } - public void WaitFirstCheckpoint(int nodeIndex, ILogger logger = null) - => WaitCheckpoint((IPEndPoint)endpoints[nodeIndex], logger: logger); + public DateTime LastSave(int nodeIndex, ILogger logger = null) + => LastSave((IPEndPoint)endpoints[nodeIndex], logger: logger); - public void WaitCheckpoint(IPEndPoint endPoint, ILogger logger = null) + public DateTime LastSave(IPEndPoint endPoint, ILogger logger = null) { try { var server = redis.GetServer(endPoint); - while (server.LastSave().Ticks == DateTimeOffset.FromUnixTimeSeconds(0).Ticks) + return server.LastSave(); + } + catch (Exception ex) + { + logger?.LogError(ex, "An error has occurred; WaitCheckpoint"); + Assert.Fail(); + } + return default; + } + + public void WaitCheckpoint(int nodeIndex, DateTime time, ILogger logger = null) + => WaitCheckpoint((IPEndPoint)endpoints[nodeIndex], time: time, logger: logger); + + public void WaitCheckpoint(IPEndPoint endPoint, DateTime time, ILogger logger = null) + { + try + { + var server = redis.GetServer(endPoint); + while (true) + { + var lastSaveTime = server.LastSave(); + if (lastSaveTime >= time) + break; BackOff(); + } } catch (Exception ex) {