Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding ObjectStoreOutputFlags to GarnetObjectStoreOutput #923

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
6 changes: 3 additions & 3 deletions libs/cluster/Server/Migration/MigrateSessionKeys.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,11 @@ private bool MigrateKeysFromObjectStore()
continue;
}

if (!ClusterSession.Expired(ref value.garnetObject))
if (!ClusterSession.Expired(ref value.GarnetObject))
{
var objectData = GarnetObjectSerializer.Serialize(value.garnetObject);
var objectData = GarnetObjectSerializer.Serialize(value.GarnetObject);

if (!WriteOrSendObjectStoreKeyValuePair(key, objectData, value.garnetObject.Expiration))
if (!WriteOrSendObjectStoreKeyValuePair(key, objectData, value.GarnetObject.Expiration))
return false;
}
}
Expand Down
12 changes: 6 additions & 6 deletions libs/server/AOF/AofProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -344,10 +344,10 @@ static unsafe void ObjectStoreUpsert(BasicContext<byte[], IGarnetObject, ObjectI
ref var value = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader) + key.TotalSize);
var valB = garnetObjectSerializer.Deserialize(value.ToByteArray());

var output = new GarnetObjectStoreOutput { spanByteAndMemory = new(outputPtr, outputLength) };
var output = new GarnetObjectStoreOutput { SpanByteAndMemory = new(outputPtr, outputLength) };
basicContext.Upsert(ref keyB, ref valB);
if (!output.spanByteAndMemory.IsSpanByte)
output.spanByteAndMemory.Memory.Dispose();
if (!output.SpanByteAndMemory.IsSpanByte)
output.SpanByteAndMemory.Memory.Dispose();
}

static unsafe void ObjectStoreRMW(BasicContext<byte[], IGarnetObject, ObjectInput, GarnetObjectStoreOutput, long, ObjectSessionFunctions, ObjectStoreFunctions, ObjectStoreAllocator> basicContext,
Expand All @@ -364,12 +364,12 @@ static unsafe void ObjectStoreRMW(BasicContext<byte[], IGarnetObject, ObjectInpu
objectStoreInput.DeserializeFrom(curr);

// Call RMW with the reconstructed key & ObjectInput
var output = new GarnetObjectStoreOutput { spanByteAndMemory = new(outputPtr, outputLength) };
var output = new GarnetObjectStoreOutput { SpanByteAndMemory = new(outputPtr, outputLength) };
if (basicContext.RMW(ref keyB, ref objectStoreInput, ref output).IsPending)
basicContext.CompletePending(true);

if (!output.spanByteAndMemory.IsSpanByte)
output.spanByteAndMemory.Memory.Dispose();
if (!output.SpanByteAndMemory.IsSpanByte)
output.SpanByteAndMemory.Memory.Dispose();
}

static unsafe void ObjectStoreDelete(BasicContext<byte[], IGarnetObject, ObjectInput, GarnetObjectStoreOutput, long, ObjectSessionFunctions, ObjectStoreFunctions, ObjectStoreAllocator> basicContext, byte* ptr)
Expand Down
16 changes: 8 additions & 8 deletions libs/server/Custom/CustomObjectBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,32 +68,32 @@ public sealed override void DoSerialize(BinaryWriter writer)
public abstract override void Dispose();

/// <inheritdoc />
public sealed override unsafe bool Operate(ref ObjectInput input, ref SpanByteAndMemory output, out long sizeChange, out bool removeKey)
public sealed override unsafe bool Operate(ref ObjectInput input, ref GarnetObjectStoreOutput output)
{
sizeChange = 0;
removeKey = false;
output.SizeChange = 0;

switch (input.header.cmd)
{
// Scan Command
case RespCommand.COSCAN:
if (ObjectUtils.ReadScanInput(ref input, ref output, out var cursorInput, out var pattern,
out var patternLength, out var limitCount, out bool _, out var error))
if (ObjectUtils.ReadScanInput(ref input, ref output.SpanByteAndMemory, out var cursorInput, out var pattern,
out var patternLength, out var limitCount, out _, out var error))
{
Scan(cursorInput, out var items, out var cursorOutput, count: limitCount, pattern: pattern,
patternLength: patternLength);
ObjectUtils.WriteScanOutput(items, cursorOutput, ref output);
ObjectUtils.WriteScanOutput(items, cursorOutput, ref output.SpanByteAndMemory);
}
else
{
ObjectUtils.WriteScanError(error, ref output);
ObjectUtils.WriteScanError(error, ref output.SpanByteAndMemory);
}
break;
default:
if ((byte)input.header.type != this.type)
{
// Indicates an incorrect type of key
output.Length = 0;
output.OutputFlags |= ObjectStoreOutputFlags.WrongType;
output.SpanByteAndMemory.Length = 0;
return true;
}
break;
Expand Down
32 changes: 16 additions & 16 deletions libs/server/Custom/CustomRespCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,14 @@ private bool TryCustomObjectCommand<TGarnetApi>(GarnetObjectType objType, byte s
var header = new RespInputHeader(objType) { SubId = subid };
var input = new ObjectInput(header, ref parseState, startIdx: 1);

var output = new GarnetObjectStoreOutput { spanByteAndMemory = new SpanByteAndMemory(null) };
var output = new GarnetObjectStoreOutput { SpanByteAndMemory = new SpanByteAndMemory(null) };

GarnetStatus status;

if (type == CommandType.ReadModifyWrite)
{
status = storageApi.RMW_ObjectStore(ref keyBytes, ref input, ref output);
Debug.Assert(!output.spanByteAndMemory.IsSpanByte);
Debug.Assert(!output.SpanByteAndMemory.IsSpanByte);

switch (status)
{
Expand All @@ -158,8 +158,8 @@ private bool TryCustomObjectCommand<TGarnetApi>(GarnetObjectType objType, byte s
SendAndReset();
break;
default:
if (output.spanByteAndMemory.Memory != null)
SendAndReset(output.spanByteAndMemory.Memory, output.spanByteAndMemory.Length);
if (output.SpanByteAndMemory.Memory != null)
SendAndReset(output.SpanByteAndMemory.Memory, output.SpanByteAndMemory.Length);
else
while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend))
SendAndReset();
Expand All @@ -169,19 +169,19 @@ private bool TryCustomObjectCommand<TGarnetApi>(GarnetObjectType objType, byte s
else
{
status = storageApi.Read_ObjectStore(ref keyBytes, ref input, ref output);
Debug.Assert(!output.spanByteAndMemory.IsSpanByte);
Debug.Assert(!output.SpanByteAndMemory.IsSpanByte);

switch (status)
{
case GarnetStatus.OK:
if (output.spanByteAndMemory.Memory != null)
SendAndReset(output.spanByteAndMemory.Memory, output.spanByteAndMemory.Length);
if (output.SpanByteAndMemory.Memory != null)
SendAndReset(output.SpanByteAndMemory.Memory, output.SpanByteAndMemory.Length);
else
while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend))
SendAndReset();
break;
case GarnetStatus.NOTFOUND:
Debug.Assert(output.spanByteAndMemory.Memory == null);
Debug.Assert(output.SpanByteAndMemory.Memory == null);
while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_ERRNOTFOUND, ref dcurr, dend))
SendAndReset();
break;
Expand Down Expand Up @@ -295,21 +295,21 @@ public bool InvokeCustomObjectCommand<TGarnetApi>(ref TGarnetApi storageApi, Cus
customCommandParseState.InitializeWithArguments(args);
var input = new ObjectInput(header, ref customCommandParseState);

var _output = new GarnetObjectStoreOutput { spanByteAndMemory = new SpanByteAndMemory(null) };
var _output = new GarnetObjectStoreOutput { SpanByteAndMemory = new SpanByteAndMemory(null) };
GarnetStatus status;
if (customObjCommand.type == CommandType.ReadModifyWrite)
{
status = storageApi.RMW_ObjectStore(ref keyBytes, ref input, ref _output);
Debug.Assert(!_output.spanByteAndMemory.IsSpanByte);
Debug.Assert(!_output.SpanByteAndMemory.IsSpanByte);

switch (status)
{
case GarnetStatus.WRONGTYPE:
output = scratchBufferManager.CreateArgSlice(CmdStrings.RESP_ERR_WRONG_TYPE);
break;
default:
if (_output.spanByteAndMemory.Memory != null)
output = scratchBufferManager.FormatScratch(0, _output.spanByteAndMemory.AsReadOnlySpan());
if (_output.SpanByteAndMemory.Memory != null)
output = scratchBufferManager.FormatScratch(0, _output.SpanByteAndMemory.AsReadOnlySpan());
else
output = scratchBufferManager.CreateArgSlice(CmdStrings.RESP_OK);
break;
Expand All @@ -318,18 +318,18 @@ public bool InvokeCustomObjectCommand<TGarnetApi>(ref TGarnetApi storageApi, Cus
else
{
status = storageApi.Read_ObjectStore(ref keyBytes, ref input, ref _output);
Debug.Assert(!_output.spanByteAndMemory.IsSpanByte);
Debug.Assert(!_output.SpanByteAndMemory.IsSpanByte);

switch (status)
{
case GarnetStatus.OK:
if (_output.spanByteAndMemory.Memory != null)
output = scratchBufferManager.FormatScratch(0, _output.spanByteAndMemory.AsReadOnlySpan());
if (_output.SpanByteAndMemory.Memory != null)
output = scratchBufferManager.FormatScratch(0, _output.SpanByteAndMemory.AsReadOnlySpan());
else
output = scratchBufferManager.CreateArgSlice(CmdStrings.RESP_OK);
break;
case GarnetStatus.NOTFOUND:
Debug.Assert(_output.spanByteAndMemory.Memory == null);
Debug.Assert(_output.SpanByteAndMemory.Memory == null);
output = scratchBufferManager.CreateArgSlice(CmdStrings.RESP_ERRNOTFOUND);
break;
case GarnetStatus.WRONGTYPE:
Expand Down
62 changes: 32 additions & 30 deletions libs/server/Objects/Hash/HashObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -155,101 +155,103 @@ public override void Dispose() { }
public override GarnetObjectBase Clone() => new HashObject(hash, expirationTimes, expirationQueue, Expiration, Size);

/// <inheritdoc />
public override unsafe bool Operate(ref ObjectInput input, ref SpanByteAndMemory output, out long sizeChange, out bool removeKey)
public override unsafe bool Operate(ref ObjectInput input, ref GarnetObjectStoreOutput output)
{
removeKey = false;
output.SizeChange = 0;

fixed (byte* _output = output.SpanByte.AsSpan())
fixed (byte* outputSpan = output.SpanByteAndMemory.SpanByte.AsSpan())
{
if (input.header.type != GarnetObjectType.Hash)
{
//Indicates when there is an incorrect type
output.Length = 0;
sizeChange = 0;
output.OutputFlags |= ObjectStoreOutputFlags.WrongType;
output.SpanByteAndMemory.Length = 0;
return true;
}

var previousSize = this.Size;
switch (input.header.HashOp)
{
case HashOperation.HSET:
HashSet(ref input, _output);
HashSet(ref input, outputSpan);
break;
case HashOperation.HMSET:
HashSet(ref input, _output);
HashSet(ref input, outputSpan);
break;
case HashOperation.HGET:
HashGet(ref input, ref output);
HashGet(ref input, ref output.SpanByteAndMemory);
break;
case HashOperation.HMGET:
HashMultipleGet(ref input, ref output);
HashMultipleGet(ref input, ref output.SpanByteAndMemory);
break;
case HashOperation.HGETALL:
HashGetAll(ref input, ref output);
HashGetAll(ref input, ref output.SpanByteAndMemory);
break;
case HashOperation.HDEL:
HashDelete(ref input, _output);
HashDelete(ref input, outputSpan);
break;
case HashOperation.HLEN:
HashLength(_output);
HashLength(outputSpan);
break;
case HashOperation.HSTRLEN:
HashStrLength(ref input, _output);
HashStrLength(ref input, outputSpan);
break;
case HashOperation.HEXISTS:
HashExists(ref input, _output);
HashExists(ref input, outputSpan);
break;
case HashOperation.HEXPIRE:
HashExpire(ref input, ref output);
HashExpire(ref input, ref output.SpanByteAndMemory);
break;
case HashOperation.HTTL:
HashTimeToLive(ref input, ref output);
HashTimeToLive(ref input, ref output.SpanByteAndMemory);
break;
case HashOperation.HPERSIST:
HashPersist(ref input, ref output);
HashPersist(ref input, ref output.SpanByteAndMemory);
break;
case HashOperation.HKEYS:
HashGetKeysOrValues(ref input, ref output);
HashGetKeysOrValues(ref input, ref output.SpanByteAndMemory);
break;
case HashOperation.HVALS:
HashGetKeysOrValues(ref input, ref output);
HashGetKeysOrValues(ref input, ref output.SpanByteAndMemory);
break;
case HashOperation.HINCRBY:
HashIncrement(ref input, ref output);
HashIncrement(ref input, ref output.SpanByteAndMemory);
break;
case HashOperation.HINCRBYFLOAT:
HashIncrement(ref input, ref output);
HashIncrement(ref input, ref output.SpanByteAndMemory);
break;
case HashOperation.HSETNX:
HashSet(ref input, _output);
HashSet(ref input, outputSpan);
break;
case HashOperation.HRANDFIELD:
HashRandomField(ref input, ref output);
HashRandomField(ref input, ref output.SpanByteAndMemory);
break;
case HashOperation.HCOLLECT:
HashCollect(ref input, _output);
HashCollect(ref input, outputSpan);
break;
case HashOperation.HSCAN:
if (ObjectUtils.ReadScanInput(ref input, ref output, out var cursorInput, out var pattern,
out var patternLength, out var limitCount, out bool isNoValue, out var error))
if (ObjectUtils.ReadScanInput(ref input, ref output.SpanByteAndMemory, out var cursorInput, out var pattern,
out var patternLength, out var limitCount, out var isNoValue, out var error))
{
Scan(cursorInput, out var items, out var cursorOutput, count: limitCount, pattern: pattern,
patternLength: patternLength, isNoValue);
ObjectUtils.WriteScanOutput(items, cursorOutput, ref output);
ObjectUtils.WriteScanOutput(items, cursorOutput, ref output.SpanByteAndMemory);
}
else
{
ObjectUtils.WriteScanError(error, ref output);
ObjectUtils.WriteScanError(error, ref output.SpanByteAndMemory);
}
break;
default:
throw new GarnetException($"Unsupported operation {input.header.HashOp} in HashObject.Operate");
}

sizeChange = this.Size - previousSize;
output.SizeChange = this.Size - previousSize;
}

removeKey = hash.Count == 0;
if (hash.Count == 0)
output.OutputFlags |= ObjectStoreOutputFlags.RemoveKey;

return true;
}

Expand Down
4 changes: 2 additions & 2 deletions libs/server/Objects/ItemBroker/CollectionItemBroker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -462,12 +462,12 @@ private unsafe bool TryGetResult(byte[] key, StorageSession storageSession, Resp
{
arrDstKey = dstKey.ToArray();
var dstStatusOp = storageSession.GET(arrDstKey, out var osDstObject, ref objectLockableContext);
if (dstStatusOp != GarnetStatus.NOTFOUND) dstObj = osDstObject.garnetObject;
if (dstStatusOp != GarnetStatus.NOTFOUND) dstObj = osDstObject.GarnetObject;
}

// Check for type match between the observer and the actual object type
// If types match, get next item based on item type
switch (osObject.garnetObject)
switch (osObject.GarnetObject)
{
case ListObject listObj:
currCount = listObj.LnkList.Count;
Expand Down
Loading
Loading