Skip to content

Commit

Permalink
feat: Add SMOVE Command (#249)
Browse files Browse the repository at this point in the history
* Added method to SetOperation enum, RespCommandsInfo dictionary, and RespInfo hash set.

* Added SMOVE to FastParseArrayCommand.

* Added skeleton code in all places where work is needed.

* Finish deciding implementation.  Storage wrapper designed.

* Added smove to server session.

* The command doesn't break the client anymore, but it doesn't perform any operation.

* SetMove now works.

* Added unit tests, working on last part in list.

* Finish implementation, remove attempted implementation in setobjectimpl.

* Remove from transaction manager.

* Style fix (Whitespace).

* SetOps fixes.

* Fix unit tests and returning notfound.

* Small changes.

* Remove using, remove set object.

* SetCommands xml summary.

* Minor indenting change.

* Add xml tag lost in merge.

* White space fix.

* Added missing bracket after merge.

* Locks.  Allow for set member of 0 length in SMOVE and SREM.

---------

Co-authored-by: Tal Zaccai <[email protected]>
  • Loading branch information
graknow and TalZaccai authored Apr 19, 2024
1 parent 9a3af00 commit 515b6c1
Show file tree
Hide file tree
Showing 11 changed files with 261 additions and 3 deletions.
4 changes: 4 additions & 0 deletions libs/server/API/GarnetApiObjectCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,9 @@ public GarnetStatus SetScan(ArgSlice key, long cursor, string match, int count,
=> storageSession.SetScan(key, cursor, match, count, out items, ref objectContext);

/// <inheritdoc />
public GarnetStatus SetMove(ArgSlice sourceKey, ArgSlice destinationKey, ArgSlice member, out int smoveResult)
=> storageSession.SetMove(sourceKey, destinationKey, member, out smoveResult);

public GarnetStatus SetUnion(ArgSlice[] keys, out HashSet<byte[]> output)
=> storageSession.SetUnion(keys, out output, ref objectContext);

Expand All @@ -315,6 +318,7 @@ public GarnetStatus SetDiff(ArgSlice[] keys, out HashSet<byte[]> members)
/// <inheritdoc />
public GarnetStatus SetDiffStore(byte[] key, ArgSlice[] keys, out int count)
=> storageSession.SetDiffStore(key, keys, out count);

#endregion

#region Hash Methods
Expand Down
12 changes: 12 additions & 0 deletions libs/server/API/IGarnetApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,18 @@ public interface IGarnetApi : IGarnetReadApi, IGarnetAdvancedApi
/// <returns></returns>
GarnetStatus SetPop(byte[] key, ArgSlice input, ref GarnetObjectStoreOutput outputFooter);

/// <summary>
/// Moves a member from a source set to a destination set.
/// If the move was performed, this command returns 1.
/// If the member was not found in the source set, or if no operation was performed, this command returns 0.
/// </summary>
/// <param name="sourceKey"></param>
/// <param name="destinationKey"></param>
/// <param name="member"></param>
/// <param name="smoveResult"></param>
/// <returns></returns>
GarnetStatus SetMove(ArgSlice sourceKey, ArgSlice destinationKey, ArgSlice member, out int smoveResult);

/// <summary>
/// When called with just the key argument, return a random element from the set value stored at key.
/// If the provided count argument is positive, return an array of distinct elements.
Expand Down
1 change: 1 addition & 0 deletions libs/server/Objects/Set/SetObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public enum SetOperation : byte
SMEMBERS,
SCARD,
SSCAN,
SMOVE,
SRANDMEMBER,
SISMEMBER,
SUNION,
Expand Down
65 changes: 65 additions & 0 deletions libs/server/Resp/Objects/SetCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,71 @@ private unsafe bool SetPop<TGarnetApi>(int count, byte* ptr, ref TGarnetApi stor
return true;
}

/// <summary>
/// Moves a member from a source set to a destination set.
/// If the move was performed, this command returns 1.
/// If the member was not found in the source set, or if no operation was performed, this command returns 0.
/// </summary>
/// <typeparam name="TGarnetApi"></typeparam>
/// <param name="count"></param>
/// <param name="ptr"></param>
/// <param name="storageApi"></param>
/// <returns></returns>
private unsafe bool SetMove<TGarnetApi>(int count, byte* ptr, ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
if (count != 3)
{
setItemsDoneCount = setOpsCount = 0;
return AbortWithWrongNumberOfArguments("SMOVE", count);
}

ArgSlice sourceKey = default;
ArgSlice destinationKey = default;
ArgSlice sourceMember = default;

// Get the source key
if (!RespReadUtils.ReadPtrWithLengthHeader(ref sourceKey.ptr, ref sourceKey.length, ref ptr, recvBufferPtr + bytesRead))
return false;

// Get the destination key
if (!RespReadUtils.ReadPtrWithLengthHeader(ref destinationKey.ptr, ref destinationKey.length, ref ptr, recvBufferPtr + bytesRead))
return false;

// Get the member to move
if (!RespReadUtils.ReadPtrWithLengthHeader(ref sourceMember.ptr, ref sourceMember.length, ref ptr, recvBufferPtr + bytesRead))
return false;

var keys = new ArgSlice[2] { sourceKey, destinationKey };

if (NetworkKeyArraySlotVerify(ref keys, false))
{
var bufSpan = new ReadOnlySpan<byte>(recvBufferPtr, bytesRead);
if (!DrainCommands(bufSpan, count)) return false;
return true;
}

var status = storageApi.SetMove(sourceKey, destinationKey, sourceMember, out var output);

if (status == GarnetStatus.NOTFOUND)
{
while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_RETURN_VAL_0, ref dcurr, dend))
SendAndReset();
}
else
{
while (!RespWriteUtils.WriteInteger(output, ref dcurr, dend))
SendAndReset();
}

// Reset session counters
setItemsDoneCount = setOpsCount = 0;

// Move input head
readHead = (int)(ptr - recvBufferPtr);
return true;
}

/// <summary>
/// When called with just the key argument, return a random element from the set value stored at key.
/// If the provided count argument is positive, return an array of distinct elements.
Expand Down
4 changes: 4 additions & 0 deletions libs/server/Resp/RespCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,10 @@ static RespCommand MatchedNone(RespServerSession session, int oldReadHead)
{
return (RespCommand.Set, (byte)SetOperation.SSCAN);
}
else if (*(ulong*)(ptr + 3) == MemoryMarshal.Read<ulong>("\nSMOVE\r\n"u8))
{
return (RespCommand.Set, (byte)SetOperation.SMOVE);
}
else if (*(ulong*)(ptr + 3) == MemoryMarshal.Read<ulong>("\nSDIFF\r\n"u8))
{
return (RespCommand.Set, (byte)SetOperation.SDIFF);
Expand Down
1 change: 1 addition & 0 deletions libs/server/Resp/RespCommandsInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ public static RespCommandsInfo findCommand(RespCommand cmd, byte subCmd = 0)
{(byte)SetOperation.SRANDMEMBER,new RespCommandsInfo("SRANDMEMBER", RespCommand.Set, -2, null, (byte)SetOperation.SRANDMEMBER)},
{(byte)SetOperation.SPOP, new RespCommandsInfo("SPOP", RespCommand.Set, -1, null, (byte)SetOperation.SPOP) },
{(byte)SetOperation.SSCAN, new RespCommandsInfo("SSCAN", RespCommand.Set, -2, null, (byte)SetOperation.SSCAN) },
{(byte)SetOperation.SMOVE, new RespCommandsInfo("SMOVE", RespCommand.Set, 3, null, (byte)SetOperation.SMOVE) },
{(byte)SetOperation.SISMEMBER, new RespCommandsInfo("SISMEMBER",RespCommand.Set, 2, null, (byte)SetOperation.SISMEMBER) },
{(byte)SetOperation.SUNION, new RespCommandsInfo("SUNION", RespCommand.Set, -1, null, (byte)SetOperation.SUNION) },
{(byte)SetOperation.SDIFF, new RespCommandsInfo("SDIFF", RespCommand.Set, -1, null, (byte)SetOperation.SDIFF) },
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Resp/RespInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public static HashSet<string> GetCommands()
// Pub/sub
"PUBLISH", "SUBSCRIBE", "PSUBSCRIBE", "UNSUBSCRIBE", "PUNSUBSCRIBE",
// Set
"SADD", "SREM", "SPOP", "SMEMBERS", "SCARD", "SSCAN", "SRANDMEMBER", "SISMEMBER", "SUNION", "SDIFF", "SDIFFSTORE",
"SADD", "SREM", "SPOP", "SMEMBERS", "SCARD", "SSCAN", "SRANDMEMBER", "SISMEMBER", "SUNION", "SDIFF", "SDIFFSTORE", "SMOVE",
//Scan ops
"DBSIZE", "KEYS","SCAN",
// Geospatial commands
Expand Down
1 change: 1 addition & 0 deletions libs/server/Resp/RespServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ private bool ProcessArrayCommands<TGarnetApi>(RespCommand cmd, byte subcmd, int
(RespCommand.Set, (byte)SetOperation.SPOP) => SetPop(count, ptr, ref storageApi),
(RespCommand.Set, (byte)SetOperation.SRANDMEMBER) => SetRandomMember(count, ptr, ref storageApi),
(RespCommand.Set, (byte)SetOperation.SSCAN) => ObjectScan(count, ptr, GarnetObjectType.Set, ref storageApi),
(RespCommand.Set, (byte)SetOperation.SMOVE) => SetMove(count, ptr, ref storageApi),
(RespCommand.Set, (byte)SetOperation.SUNION) => SetUnion(count, ptr, ref storageApi),
(RespCommand.Set, (byte)SetOperation.SDIFF) => SetDiff(count, ptr, ref storageApi),
(RespCommand.Set, (byte)SetOperation.SDIFFSTORE) => SetDiffStore(count, ptr, ref storageApi),
Expand Down
62 changes: 60 additions & 2 deletions libs/server/Storage/Session/ObjectStore/SetOps.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ internal unsafe GarnetStatus SetRemove<TObjectContext>(ArgSlice key, ArgSlice me
{
sremCount = 0;

if (key.Length == 0 || member.Length == 0)
if (key.Length == 0)
return GarnetStatus.OK;

var input = scratchBufferManager.FormatScratchAsResp(ObjectInputHeader.Size, member);
Expand Down Expand Up @@ -363,6 +363,65 @@ public unsafe GarnetStatus SetScan<TObjectContext>(ArgSlice key, long cursor, st

}

/// <summary>
/// Moves a member from a source set to a destination set.
/// If the move was performed, this command returns 1.
/// If the member was not found in the source set, or if no operation was performed, this command returns 0.
/// </summary>
/// <param name="sourceKey"></param>
/// <param name="destinationKey"></param>
/// <param name="member"></param>
/// <param name="smoveResult"></param>
internal unsafe GarnetStatus SetMove(ArgSlice sourceKey, ArgSlice destinationKey, ArgSlice member, out int smoveResult)
{
smoveResult = 0;

if (sourceKey.Length == 0 || destinationKey.Length == 0)
return GarnetStatus.OK;

// If the keys are the same, no operation is performed.
var sameKey = sourceKey.ReadOnlySpan.SequenceEqual(destinationKey.ReadOnlySpan);
if (sameKey)
{
return GarnetStatus.OK;
}

bool createTransaction = false;
if (txnManager.state != TxnState.Running)
{
createTransaction = true;
txnManager.SaveKeyEntryToLock(sourceKey, true, LockType.Exclusive);
txnManager.SaveKeyEntryToLock(destinationKey, true, LockType.Exclusive);
txnManager.Run(true);
}

var objectLockableContext = txnManager.ObjectStoreLockableContext;

try
{
var sremStatus = SetRemove(sourceKey, member, out var sremOps, ref objectLockableContext);

if (sremStatus == GarnetStatus.NOTFOUND)
{
return GarnetStatus.NOTFOUND;
}

if (sremOps != 1)
{
return GarnetStatus.OK;
}

SetAdd(destinationKey, member, out smoveResult, ref objectLockableContext);
}
finally
{
if (createTransaction)
txnManager.Commit(true);
}

return GarnetStatus.OK;
}

/// <summary>
/// Returns the members of the set resulting from the union of all the given sets.
/// Keys that do not exist are considered to be empty sets.
Expand Down Expand Up @@ -414,7 +473,6 @@ public GarnetStatus SetUnion<TObjectContext>(ArgSlice[] keys, out HashSet<byte[]
return GarnetStatus.OK;
}


/// <summary>
/// Adds the specified members to the set at key.
/// Specified members that are already a member of this set are ignored.
Expand Down
1 change: 1 addition & 0 deletions libs/server/Transaction/TxnKeyManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ private int SetObjectKeys(byte subCommand, int inputCount)
(byte)SetOperation.SUNION => ListKeys(inputCount, true, LockType.Shared),
(byte)SetOperation.SDIFF => ListKeys(inputCount, true, LockType.Shared),
(byte)SetOperation.SDIFFSTORE => XSTOREKeys(inputCount, true),
(byte)SetOperation.SMOVE => ListKeys(inputCount, true, LockType.Exclusive),
_ => -1
};
}
Expand Down
111 changes: 111 additions & 0 deletions test/Garnet.test/RespSetTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Garnet.server;
using NUnit.Framework;
using StackExchange.Redis;
Expand Down Expand Up @@ -675,6 +676,116 @@ public void CanDoSPOPWithMoreCountThanSetSizeCommandLC()
Assert.AreEqual(expectedResponse, strResponse);
}

[Test]
public void CanDoSMOVECommandLC()
{
using var lightClientRequest = TestUtils.CreateRequest();

// source set
lightClientRequest.SendCommand("SADD \"mySourceSet\" \"oneS\"");
lightClientRequest.SendCommand("SADD \"mySourceSet\" \"twoS\"");
lightClientRequest.SendCommand("SADD \"mySourceSet\" \"threeS\"");
lightClientRequest.SendCommand("SADD \"mySourceSet\" \"fourS\"");
lightClientRequest.SendCommand("SADD \"mySourceSet\" \"common\"");

// destination set
lightClientRequest.SendCommand("SADD \"myDestinationSet\" \"oneD\"");
lightClientRequest.SendCommand("SADD \"myDestinationSet\" \"twoD\"");
lightClientRequest.SendCommand("SADD \"myDestinationSet\" \"threeD\"");
lightClientRequest.SendCommand("SADD \"myDestinationSet\" \"fourD\"");
lightClientRequest.SendCommand("SADD \"myDestinationSet\" \"common\"");

var expectedSuccessfulResponse = ":1\r\n";
var expectedFailureResponse = ":0\r\n";

// Successful move
var response = lightClientRequest.SendCommand("SMOVE \"mySourceSet\" \"myDestinationSet\" \"oneS\"");
var strResponse = Encoding.ASCII.GetString(response).Substring(0, expectedSuccessfulResponse.Length);
Assert.AreEqual(expectedSuccessfulResponse, strResponse);

response = lightClientRequest.SendCommand("SISMEMBER \"mySourceSet\" \"oneS\"");
var mySourceSetContainsMember = Encoding.ASCII.GetString(response).Substring(0, expectedFailureResponse.Length);

response = lightClientRequest.SendCommand("SISMEMBER \"myDestinationSet\" \"oneS\"");
var myDestinationSetContainsMember = Encoding.ASCII.GetString(response).Substring(0, expectedSuccessfulResponse.Length);

Assert.AreEqual(expectedFailureResponse, mySourceSetContainsMember);
Assert.AreEqual(expectedSuccessfulResponse, myDestinationSetContainsMember);

// Source set doesn't exist
response = lightClientRequest.SendCommand("SMOVE \"someRandomSet\" \"mySourceSet\" \"twoS\"");
strResponse = Encoding.ASCII.GetString(response).Substring(0, expectedFailureResponse.Length);
Assert.AreEqual(expectedFailureResponse, strResponse);

// Destination set doesn't exist
response = lightClientRequest.SendCommand("SMOVE \"mySourceSet\" \"someRandomSet\" \"twoS\"");
strResponse = Encoding.ASCII.GetString(response).Substring(0, expectedSuccessfulResponse.Length);
Assert.AreEqual(expectedSuccessfulResponse, strResponse);

// Value not in source
response = lightClientRequest.SendCommand("SMOVE \"mySourceSet\" \"mySourceSet\" \"notAValue\"");
strResponse = Encoding.ASCII.GetString(response).Substring(0, expectedFailureResponse.Length);
Assert.AreEqual(expectedFailureResponse, strResponse);

// Move into self
response = lightClientRequest.SendCommand("SMOVE \"mySourceSet\" \"mySourceSet\" \"twoS\"");
strResponse = Encoding.ASCII.GetString(response).Substring(0, expectedFailureResponse.Length);
Assert.AreEqual(expectedFailureResponse, strResponse);

// Common value
response = lightClientRequest.SendCommand("SMOVE \"mySourceSet\" \"myDestinationSet\" \"common\"");
strResponse = Encoding.ASCII.GetString(response).Substring(0, expectedSuccessfulResponse.Length);
Assert.AreEqual(expectedSuccessfulResponse, strResponse);

response = lightClientRequest.SendCommand("SISMEMBER \"mySourceSet\" \"common\"");
mySourceSetContainsMember = Encoding.ASCII.GetString(response).Substring(0, expectedFailureResponse.Length);

response = lightClientRequest.SendCommand("SISMEMBER \"myDestinationSet\" \"common\"");
myDestinationSetContainsMember = Encoding.ASCII.GetString(response).Substring(0, expectedSuccessfulResponse.Length);

Assert.AreEqual(expectedFailureResponse, mySourceSetContainsMember);
Assert.AreEqual(expectedSuccessfulResponse, myDestinationSetContainsMember);
}

[Test]
public async Task CanDoSMOVECommandGC()
{
using var db = TestUtils.GetGarnetClient();
db.Connect();

//If set doesn't exist, then return 0.
var response = await db.ExecuteForLongResultAsync("SMOVE", new string[] { "sourceSet", "destinationSet", "value" });
Assert.AreEqual(response, 0);
await db.ExecuteForStringResultAsync("SADD", new string[] { "sourceSet", "sourceValue", "commonValue" });
await db.ExecuteForStringResultAsync("SADD", new string[] { "destinationSet", "destinationValue", "commonValue" });

//Same key.
response = await db.ExecuteForLongResultAsync("SMOVE", new string[] { "sourceSet", "sourceSet", "sourceValue" });
Assert.AreEqual(response, 0);

//Move non-common member.
response = await db.ExecuteForLongResultAsync("SMOVE", new string[] { "sourceSet", "destinationSet", "sourceValue" });
Assert.AreEqual(response, 1);
Assert.AreEqual(await db.ExecuteForLongResultAsync("SCARD", new string[] { "sourceSet" }), 1);
Assert.AreEqual(await db.ExecuteForLongResultAsync("SCARD", new string[] { "destinationSet" }), 3);

var sourceSetMembers = await db.ExecuteForStringArrayResultAsync("SMEMBERS", new string[] { "sourceSet" });
var destinationSetMembers = await db.ExecuteForStringArrayResultAsync("SMEMBERS", new string[] { "destinationSet" });
Assert.IsFalse(sourceSetMembers.Contains("sourceValue"));
Assert.IsTrue(destinationSetMembers.Contains("sourceValue"));

//Move common member.
response = await db.ExecuteForLongResultAsync("SMOVE", new string[] { "sourceSet", "destinationSet", "commonValue" });
Assert.AreEqual(response, 1);
Assert.AreEqual(await db.ExecuteForLongResultAsync("SCARD", new string[] { "sourceSet" }), 0);
Assert.AreEqual(await db.ExecuteForLongResultAsync("SCARD", new string[] { "destinationSet" }), 3);

sourceSetMembers = await db.ExecuteForStringArrayResultAsync("SMEMBERS", new string[] { "sourceSet" });
destinationSetMembers = await db.ExecuteForStringArrayResultAsync("SMEMBERS", new string[] { "destinationSet" });
Assert.IsFalse(sourceSetMembers.Contains("commonValue"));
Assert.IsTrue(destinationSetMembers.Contains("commonValue"));
}

[Test]
public void MultiWithNonExistingSet()
{
Expand Down

0 comments on commit 515b6c1

Please sign in to comment.