Skip to content

Commit

Permalink
API Coverage - Implement SINTER and SINTERSTORE (#180) (#334)
Browse files Browse the repository at this point in the history
* API Coverage - Implement SINTER and SINTERSTORE (#180)

* Fix forrmating

* Changes after review

* fix formating

---------

Co-authored-by: Karol Łapiński <[email protected]>
Co-authored-by: Karol <lapkarol.gmail.com>
  • Loading branch information
lapkarol and Karol Łapiński authored May 20, 2024
1 parent 8ef45f9 commit b1891d1
Show file tree
Hide file tree
Showing 14 changed files with 590 additions and 2 deletions.
8 changes: 8 additions & 0 deletions libs/server/API/GarnetApiObjectCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,14 @@ public GarnetStatus SetDiff(ArgSlice[] keys, out HashSet<byte[]> members)
public GarnetStatus SetDiffStore(byte[] key, ArgSlice[] keys, out int count)
=> storageSession.SetDiffStore(key, keys, out count);

/// <inheritdoc />
public GarnetStatus SetIntersect(ArgSlice[] keys, out HashSet<byte[]> output)
=> storageSession.SetIntersect(keys, out output);

/// <inheritdoc />
public GarnetStatus SetIntersectStore(byte[] key, ArgSlice[] keys, out int count)
=> storageSession.SetIntersectStore(key, keys, out count);

#endregion

#region Hash Methods
Expand Down
9 changes: 9 additions & 0 deletions libs/server/API/GarnetWatchApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,15 @@ public GarnetStatus SetUnion(ArgSlice[] keys, out HashSet<byte[]> output)
return garnetApi.SetUnion(keys, out output);
}

/// <inheritdoc />
public GarnetStatus SetIntersect(ArgSlice[] keys, out HashSet<byte[]> output)
{
foreach (var key in keys)
{
garnetApi.WATCH(key, StoreType.Object);
}
return garnetApi.SetIntersect(keys, out output);
}

/// <inheritdoc />
public GarnetStatus SetDiff(ArgSlice[] keys, out HashSet<byte[]> output)
Expand Down
19 changes: 19 additions & 0 deletions libs/server/API/IGarnetApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,16 @@ public interface IGarnetApi : IGarnetReadApi, IGarnetAdvancedApi
/// <returns></returns>
GarnetStatus SetUnionStore(byte[] key, ArgSlice[] keys, out int count);

/// <summary>
/// This command is equal to SINTER, but instead of returning the resulting set, it is stored in destination.
/// If destination already exists, it is overwritten.
/// </summary>
/// <param name="key"></param>
/// <param name="keys"></param>
/// <param name="count"></param>
/// <returns></returns>
GarnetStatus SetIntersectStore(byte[] key, ArgSlice[] keys, out int count);

/// <summary>
/// This command is equal to SDIFF, but instead of returning the resulting set, it is stored in destination.
/// If destination already exists, it is overwritten.
Expand Down Expand Up @@ -1248,6 +1258,15 @@ public interface IGarnetReadApi
/// <returns></returns>
GarnetStatus SetUnion(ArgSlice[] keys, out HashSet<byte[]> output);

/// <summary>
/// Returns the members of the set resulting from the intersection of all the given sets.
/// Keys that do not exist are considered to be empty sets.
/// </summary>
/// <param name="keys"></param>
/// <param name="output"></param>
/// <returns></returns>
GarnetStatus SetIntersect(ArgSlice[] keys, out HashSet<byte[]> output);

/// <summary>
/// Returns the members of the set resulting from the difference between the first set and all the successive sets.
/// </summary>
Expand Down
2 changes: 2 additions & 0 deletions libs/server/Objects/Set/SetObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public enum SetOperation : byte
SUNIONSTORE,
SDIFF,
SDIFFSTORE,
SINTER,
SINTERSTORE
}


Expand Down
123 changes: 123 additions & 0 deletions libs/server/Resp/Objects/SetCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,129 @@ private unsafe bool SetAdd<TGarnetApi>(int count, byte* ptr, ref TGarnetApi stor
return true;
}

/// <summary>
/// Returns the members of the set resulting from the intersection of all the given sets.
/// Keys that do not exist are considered to be empty sets.
/// </summary>
/// <param name="count"></param>
/// <param name="ptr"></param>
/// <param name="storageApi"></param>
/// <typeparam name="TGarnetApi"></typeparam>
/// <returns></returns>
private bool SetIntersect<TGarnetApi>(int count, byte* ptr, ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
if (count < 1)
{
return AbortWithWrongNumberOfArguments("SINTER", count);
}

// Read all the keys
ArgSlice[] keys = new ArgSlice[count];

for (int i = 0; i < keys.Length; i++)
{
keys[i] = default;
if (!RespReadUtils.ReadPtrWithLengthHeader(ref keys[i].ptr, ref keys[i].length, ref ptr, recvBufferPtr + bytesRead))
return false;
}

if (NetworkKeyArraySlotVerify(ref keys, true))
{
var bufSpan = new ReadOnlySpan<byte>(recvBufferPtr, bytesRead);
if (!DrainCommands(bufSpan, count)) return false;
return true;
}

var status = storageApi.SetIntersect(keys, out var result);

if (status == GarnetStatus.OK)
{
// write the size of result
int resultCount = 0;
if (result != null)
{
resultCount = result.Count;
while (!RespWriteUtils.WriteArrayLength(resultCount, ref dcurr, dend))
SendAndReset();

foreach (var item in result)
{
while (!RespWriteUtils.WriteBulkString(item, ref dcurr, dend))
SendAndReset();
}
}
else
{
while (!RespWriteUtils.WriteArrayLength(resultCount, ref dcurr, dend))
SendAndReset();
}
}

// update read pointers
readHead = (int)(ptr - recvBufferPtr);
return true;
}

/// <summary>
/// This command is equal to SINTER, but instead of returning the resulting set, it is stored in destination.
/// If destination already exists, it is overwritten.
/// </summary>
/// <typeparam name="TGarnetApi"></typeparam>
/// <param name="count"></param>
/// <param name="ptr"></param>
/// <param name="storageApi"></param>
/// <returns></returns>
private bool SetIntersectStore<TGarnetApi>(int count, byte* ptr, ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
if (count < 2)
{
return AbortWithWrongNumberOfArguments("SINTERSTORE", count);
}

// Get the key
if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var key, ref ptr, recvBufferPtr + bytesRead))
return false;

if (NetworkSingleKeySlotVerify(key, false))
{
var bufSpan = new ReadOnlySpan<byte>(recvBufferPtr, bytesRead);
if (!DrainCommands(bufSpan, count))
return false;
return true;
}

var keys = new ArgSlice[count - 1];
for (var i = 0; i < count - 1; i++)
{
keys[i] = default;
if (!RespReadUtils.ReadPtrWithLengthHeader(ref keys[i].ptr, ref keys[i].length, ref ptr, recvBufferPtr + bytesRead))
return false;
}

if (NetworkKeyArraySlotVerify(ref keys, true))
{
var bufSpan = new ReadOnlySpan<byte>(recvBufferPtr, bytesRead);
if (!DrainCommands(bufSpan, count)) return false;
return true;
}

var status = storageApi.SetIntersectStore(key, keys, out var output);

if (status == GarnetStatus.OK)
{
while (!RespWriteUtils.WriteInteger(output, ref dcurr, dend))
SendAndReset();
}

// Move input head
readHead = (int)(ptr - recvBufferPtr);

return true;
}


/// <summary>
/// Returns the members of the set resulting from the union of all the given sets.
/// Keys that do not exist are considered to be empty sets.
Expand Down
8 changes: 8 additions & 0 deletions libs/server/Resp/RespCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,10 @@ static RespCommand MatchedNone(RespServerSession session, int oldReadHead)
{
return (RespCommand.Set, (byte)SetOperation.SUNION);
}
else if (*(ulong*)(ptr + 4) == MemoryMarshal.Read<ulong>("SINTER\r\n"u8))
{
return (RespCommand.Set, (byte)SetOperation.SINTER);
}
break;

case 'U':
Expand Down Expand Up @@ -915,6 +919,10 @@ static RespCommand MatchedNone(RespServerSession session, int oldReadHead)
{
return (RespCommand.Set, (byte)SetOperation.SUNIONSTORE);
}
else if (*(ulong*)(ptr + 2) == MemoryMarshal.Read<ulong>("1\r\nSINTE"u8) && *(ulong*)(ptr + 10) == MemoryMarshal.Read<ulong>("RSTORE\r\n"u8))
{
return (RespCommand.Set, (byte)SetOperation.SINTERSTORE);
}
break;

case 12:
Expand Down
76 changes: 76 additions & 0 deletions libs/server/Resp/RespCommandsInfo.json
Original file line number Diff line number Diff line change
Expand Up @@ -3887,6 +3887,38 @@
],
"SubCommands": null
},
{
"Command": "Set",
"ArrayCommand": 13,
"Name": "SINTER",
"IsInternal": false,
"Arity": -2,
"Flags": "ReadOnly",
"FirstKey": 1,
"LastKey": -1,
"Step": 1,
"AclCategories": "Read, Set, Slow",
"Tips": [
"nondeterministic_output_order"
],
"KeySpecifications": [
{
"BeginSearch": {
"TypeDiscriminator": "BeginSearchIndex",
"Index": 1
},
"FindKeys": {
"TypeDiscriminator": "FindKeysRange",
"LastKey": -1,
"KeyStep": 1,
"Limit": 0
},
"Notes": null,
"Flags": "RO, Access"
}
],
"SubCommands": null
},
{
"Command": "Set",
"ArrayCommand": 10,
Expand Down Expand Up @@ -3931,6 +3963,50 @@
],
"SubCommands": null
},
{
"Command": "Set",
"ArrayCommand": 14,
"Name": "SINTERSTORE",
"IsInternal": false,
"Arity": -3,
"Flags": "DenyOom, Write",
"FirstKey": 1,
"LastKey": -1,
"Step": 1,
"AclCategories": "Set, Slow, Write",
"Tips": null,
"KeySpecifications": [
{
"BeginSearch": {
"TypeDiscriminator": "BeginSearchIndex",
"Index": 1
},
"FindKeys": {
"TypeDiscriminator": "FindKeysRange",
"LastKey": 0,
"KeyStep": 1,
"Limit": 0
},
"Notes": null,
"Flags": "OW, Update"
},
{
"BeginSearch": {
"TypeDiscriminator": "BeginSearchIndex",
"Index": 2
},
"FindKeys": {
"TypeDiscriminator": "FindKeysRange",
"LastKey": -1,
"KeyStep": 1,
"Limit": 0
},
"Notes": null,
"Flags": "RO, Access"
}
],
"SubCommands": null
},
{
"Command": "TIME",
"ArrayCommand": null,
Expand Down
2 changes: 2 additions & 0 deletions libs/server/Resp/RespServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,8 @@ private bool ProcessArrayCommands<TGarnetApi>(RespCommand cmd, byte subcmd, int
(RespCommand.Set, (byte)SetOperation.SUNIONSTORE) => SetUnionStore(count, ptr, ref storageApi),
(RespCommand.Set, (byte)SetOperation.SDIFF) => SetDiff(count, ptr, ref storageApi),
(RespCommand.Set, (byte)SetOperation.SDIFFSTORE) => SetDiffStore(count, ptr, ref storageApi),
(RespCommand.Set, (byte)SetOperation.SINTER) => SetIntersect(count, ptr, ref storageApi),
(RespCommand.Set, (byte)SetOperation.SINTERSTORE) => SetIntersectStore(count, ptr, ref storageApi),
_ => ProcessOtherCommands(cmd, subcmd, count, ref storageApi),
};
return success;
Expand Down
Loading

0 comments on commit b1891d1

Please sign in to comment.