From b8a26744446ab493352e058b997f2b27faf115ef Mon Sep 17 00:00:00 2001 From: vazois <96085550+vazois@users.noreply.github.com> Date: Mon, 6 May 2024 14:08:05 -0700 Subject: [PATCH 1/3] Cluster Mode SELECT Command (#362) * return error select not supported with cluster mode * remove second check for enable cluster --- libs/server/Resp/ArrayCommands.cs | 19 ++++++++++++++----- libs/server/Resp/CmdStrings.cs | 3 +++ 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/libs/server/Resp/ArrayCommands.cs b/libs/server/Resp/ArrayCommands.cs index 1e05e89854..d430740138 100644 --- a/libs/server/Resp/ArrayCommands.cs +++ b/libs/server/Resp/ArrayCommands.cs @@ -668,15 +668,25 @@ private bool NetworkSELECT(byte* ptr) readHead = (int)(ptr - recvBufferPtr); - if (string.Equals(result, "0")) + if (storeWrapper.serverOptions.EnableCluster) { - while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) + // Cluster mode does not allow DBID + while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_SELECT_CLUSTER_MODE, ref dcurr, dend)) SendAndReset(); } else { - while (!RespWriteUtils.WriteError("ERR invalid database index."u8, ref dcurr, dend)) - SendAndReset(); + + if (string.Equals(result, "0")) + { + while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) + SendAndReset(); + } + else + { + while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_SELECT_INVALID_INDEX, ref dcurr, dend)) + SendAndReset(); + } } return true; } @@ -728,7 +738,6 @@ private bool NetworkKEYS(int count, byte* ptr, ref TGarnetApi storag return true; } - private bool NetworkSCAN(int count, byte* ptr, ref TGarnetApi storageApi) where TGarnetApi : IGarnetApi { diff --git a/libs/server/Resp/CmdStrings.cs b/libs/server/Resp/CmdStrings.cs index af3a386e04..9b3fd23c8b 100644 --- a/libs/server/Resp/CmdStrings.cs +++ b/libs/server/Resp/CmdStrings.cs @@ -124,6 +124,9 @@ public static ReadOnlySpan GetConfig(ReadOnlySpan key) public static ReadOnlySpan RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER => "ERR value is not an integer or out of range."u8; public static ReadOnlySpan RESP_ERR_GENERIC_UKNOWN_SUBCOMMAND => "ERR Unknown subcommand. Try LATENCY HELP."u8; public static ReadOnlySpan RESP_ERR_GENERIC_INDEX_OUT_RANGE => "ERR index out of range"u8; + public static ReadOnlySpan RESP_ERR_GENERIC_SELECT_INVALID_INDEX => "ERR invalid database index."u8; + public static ReadOnlySpan RESP_ERR_GENERIC_SELECT_CLUSTER_MODE => "ERR SELECT is not allowed in cluster mode"u8; + /// /// Response string templates /// From a6218f9041bc84b5b7da8199302a7d5372da468f Mon Sep 17 00:00:00 2001 From: vazois <96085550+vazois@users.noreply.github.com> Date: Mon, 6 May 2024 14:48:28 -0700 Subject: [PATCH 2/3] Garnet Epoch Protection Fix & Misc (#361) * fix wait for config transition * code cleanup * add max retry on replica attach to avoid getting stuck in prod environment * update waitcheckpoint util to reflect correct check against lastSaveTime * default AOF start to 64 --- libs/cluster/Server/ClusterProvider.cs | 2 +- .../Server/Replication/CheckpointEntry.cs | 2 +- .../PrimaryOps/ReplicaSyncSession.cs | 7 +++- libs/server/StoreWrapper.cs | 19 ++++++----- .../ClusterReplicationTests.cs | 33 ++++++++++++++----- test/Garnet.test.cluster/ClusterTestUtils.cs | 31 ++++++++++++++--- 6 files changed, 69 insertions(+), 25 deletions(-) diff --git a/libs/cluster/Server/ClusterProvider.cs b/libs/cluster/Server/ClusterProvider.cs index 4a1de33970..302d65c72a 100644 --- a/libs/cluster/Server/ClusterProvider.cs +++ b/libs/cluster/Server/ClusterProvider.cs @@ -287,7 +287,7 @@ internal bool WaitForConfigTransition() foreach (var s in sessions) { var entryEpoch = s.LocalCurrentEpoch; - if (entryEpoch != 0 && entryEpoch >= currentEpoch) + if (entryEpoch != 0 && entryEpoch < currentEpoch) goto retry; } break; diff --git a/libs/cluster/Server/Replication/CheckpointEntry.cs b/libs/cluster/Server/Replication/CheckpointEntry.cs index b20a589a3e..a775f28444 100644 --- a/libs/cluster/Server/Replication/CheckpointEntry.cs +++ b/libs/cluster/Server/Replication/CheckpointEntry.cs @@ -43,7 +43,7 @@ public CheckpointEntry() } public long GetMinAofCoveredAddress() - => Math.Min(storeCheckpointCoveredAofAddress, objectCheckpointCoveredAofAddress); + => Math.Max(Math.Min(storeCheckpointCoveredAofAddress, objectCheckpointCoveredAofAddress), 64); /// /// Indicate addition of new reader by trying to increment reader counter diff --git a/libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSession.cs b/libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSession.cs index 44e2e02f0c..425cb3cb54 100644 --- a/libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSession.cs +++ b/libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSession.cs @@ -47,6 +47,7 @@ public void Dispose() public async Task SendCheckpoint() { errorMsg = default; + var retryCount = 0; var storeCkptManager = clusterProvider.GetReplicationLogCheckpointManager(StoreType.Main); var objectStoreCkptManager = clusterProvider.GetReplicationLogCheckpointManager(StoreType.Object); var current = clusterProvider.clusterManager.CurrentConfig; @@ -96,6 +97,8 @@ public async Task SendCheckpoint() { localEntry.RemoveReader(); _ = Thread.Yield(); + if (retryCount++ > 10) + throw new GarnetException("Attaching replica maximum retry count reached!"); goto retry; } } @@ -110,6 +113,8 @@ public async Task SendCheckpoint() { localEntry.RemoveReader(); _ = Thread.Yield(); + if (retryCount++ > 10) + throw new GarnetException("Attaching replica maximum retry count reached!"); goto retry; } } @@ -187,7 +192,7 @@ public async Task SendCheckpoint() var beginAddress = RecoveredReplicationOffset; if (!recoverFromRemote) { - //If replica is ahead of this primary it will force itself to forget and start syncing from RecoveredReplicationOffset + // If replica is ahead of this primary it will force itself to forget and start syncing from RecoveredReplicationOffset if (replicaAofBeginAddress > ReplicationManager.kFirstValidAofAddress && replicaAofBeginAddress > RecoveredReplicationOffset) { logger?.LogInformation( diff --git a/libs/server/StoreWrapper.cs b/libs/server/StoreWrapper.cs index 7c7efba4c2..6bf134f360 100644 --- a/libs/server/StoreWrapper.cs +++ b/libs/server/StoreWrapper.cs @@ -220,7 +220,8 @@ public void RecoverCheckpoint(bool recoverMainStoreFromToken = false, bool recov { storeVersion = !recoverMainStoreFromToken ? store.Recover() : store.Recover(storeIndexToken, storeHlogToken); if (objectStore != null) objectStoreVersion = !recoverObjectStoreFromToken ? objectStore.Recover() : objectStore.Recover(objectStoreIndexToken, objectStoreHlogToken); - lastSaveTime = DateTimeOffset.UtcNow; + if (storeVersion > 0 || objectStoreVersion > 0) + lastSaveTime = DateTimeOffset.UtcNow; } catch (Exception ex) { @@ -269,8 +270,8 @@ public long ReplayAOF(long untilAddress = -1) long replicationOffset = 0; try { - //When replaying AOF we do not want to write record again to AOF. - //So initialize local AofProcessor with recordToAof: false. + // When replaying AOF we do not want to write record again to AOF. + // So initialize local AofProcessor with recordToAof: false. var aofProcessor = new AofProcessor(this, recordToAof: false, logger); aofProcessor.Recover(untilAddress); aofProcessor.Dispose(); @@ -578,22 +579,22 @@ void CompleteCheckpoint() /// /// Take a checkpoint if no checkpoint was taken after the provided time offset /// - /// + /// /// - public async Task TakeOnDemandCheckpoint(DateTimeOffset afterTime) + public async Task TakeOnDemandCheckpoint(DateTimeOffset entryTime) { - //Take lock to ensure not other task will be taking a checkpoint + // Take lock to ensure no other task will be taking a checkpoint while (!StartCheckpoint()) await Task.Yield(); - //If an external task has taken a checkpoint after the provided afterTime return - if (this.lastSaveTime > afterTime) + // If an external task has taken a checkpoint beyond the provided entryTime return + if (this.lastSaveTime > entryTime) { CompleteCheckpoint(); return; } - //If no newer checkpoint was taken compared to the provided afterTime take a checkpoint + // Necessary to take a checkpoint because the latest checkpoint is before entryTime await CheckpointTask(StoreType.All, logger: logger); } diff --git a/test/Garnet.test.cluster/ClusterReplicationTests.cs b/test/Garnet.test.cluster/ClusterReplicationTests.cs index 414434d4c9..eeed0c9378 100644 --- a/test/Garnet.test.cluster/ClusterReplicationTests.cs +++ b/test/Garnet.test.cluster/ClusterReplicationTests.cs @@ -236,6 +236,9 @@ public void ClusterSRPrimaryCheckpoint([Values] bool performRMW, [Values] bool d context.PopulatePrimary(ref context.kvPairs, keyLength, kvpairCount, 0); else context.PopulatePrimaryRMW(ref context.kvPairs, keyLength, kvpairCount, 0, addCount); + + var primaryLastSaveTime = context.clusterTestUtils.LastSave(0, logger: context.logger); + var replicaLastSaveTime = context.clusterTestUtils.LastSave(1, logger: context.logger); context.clusterTestUtils.Checkpoint(0, logger: context.logger); // Populate Primary @@ -243,8 +246,8 @@ public void ClusterSRPrimaryCheckpoint([Values] bool performRMW, [Values] bool d context.ValidateKVCollectionAgainstReplica(ref context.kvPairs, 1); context.clusterTestUtils.WaitForReplicaAofSync(0, 1, context.logger); - context.clusterTestUtils.WaitFirstCheckpoint(0, context.logger); - context.clusterTestUtils.WaitFirstCheckpoint(1, context.logger); + context.clusterTestUtils.WaitCheckpoint(0, primaryLastSaveTime, logger: context.logger); + context.clusterTestUtils.WaitCheckpoint(1, replicaLastSaveTime, logger: context.logger); // Shutdown secondary context.nodes[1].Store.CommitAOF(true); @@ -599,9 +602,11 @@ public void ClusterReplicationSimpleFailover([Values] bool performRMW, [Values] if (checkpoint) { + var primaryLastSaveTime = context.clusterTestUtils.LastSave(0, logger: context.logger); + var replicaLastSaveTime = context.clusterTestUtils.LastSave(1, logger: context.logger); context.clusterTestUtils.Checkpoint(0); - context.clusterTestUtils.WaitFirstCheckpoint(0, logger: context.logger); - context.clusterTestUtils.WaitFirstCheckpoint(1, logger: context.logger); + context.clusterTestUtils.WaitCheckpoint(0, primaryLastSaveTime, logger: context.logger); + context.clusterTestUtils.WaitCheckpoint(1, replicaLastSaveTime, logger: context.logger); } #region InitiateFailover @@ -667,8 +672,9 @@ public void ClusterFailoverAttachReplicas([Values] bool performRMW, [Values] boo if (takePrimaryCheckpoint) { + var primaryLastSaveTime = context.clusterTestUtils.LastSave(0, logger: context.logger); context.clusterTestUtils.Checkpoint(0, logger: context.logger); - context.clusterTestUtils.WaitFirstCheckpoint(0, logger: context.logger); + context.clusterTestUtils.WaitCheckpoint(0, primaryLastSaveTime, logger: context.logger); } // Wait for replication offsets to synchronize @@ -692,8 +698,9 @@ public void ClusterFailoverAttachReplicas([Values] bool performRMW, [Values] boo if (takeNewPrimaryCheckpoint) { + var newPrimaryLastSaveTime = context.clusterTestUtils.LastSave(1, logger: context.logger); context.clusterTestUtils.Checkpoint(1, logger: context.logger); - context.clusterTestUtils.WaitFirstCheckpoint(1, logger: context.logger); + context.clusterTestUtils.WaitCheckpoint(1, newPrimaryLastSaveTime, logger: context.logger); } context.clusterTestUtils.WaitForReplicaAofSync(1, 2, context.logger); @@ -912,11 +919,19 @@ void ClusterDivergentReplicasTest(bool performRMW, bool disableObjects, bool ckp } else context.PopulatePrimaryWithObjects(ref context.kvPairsObj, keyLength, kvpairCount, primaryIndex: oldPrimaryIndex, set: set); - if (ckptBeforeDivergence) context.clusterTestUtils.Checkpoint(oldPrimaryIndex, logger: context.logger); + if (ckptBeforeDivergence) + { + var oldPrimaryLastSaveTime = context.clusterTestUtils.LastSave(oldPrimaryIndex, logger: context.logger); + var newPrimaryLastSaveTime = context.clusterTestUtils.LastSave(newPrimaryIndex, logger: context.logger); + var replicaLastSaveTime = context.clusterTestUtils.LastSave(replicaIndex, logger: context.logger); + context.clusterTestUtils.Checkpoint(oldPrimaryIndex, logger: context.logger); + context.clusterTestUtils.WaitCheckpoint(oldPrimaryIndex, oldPrimaryLastSaveTime, logger: context.logger); + context.clusterTestUtils.WaitCheckpoint(newPrimaryIndex, newPrimaryLastSaveTime, logger: context.logger); + context.clusterTestUtils.WaitCheckpoint(replicaIndex, replicaLastSaveTime, logger: context.logger); + } + context.clusterTestUtils.WaitForReplicaAofSync(oldPrimaryIndex, newPrimaryIndex, context.logger); context.clusterTestUtils.WaitForReplicaAofSync(oldPrimaryIndex, replicaIndex, context.logger); - context.clusterTestUtils.WaitFirstCheckpoint(newPrimaryIndex, logger: context.logger); - context.clusterTestUtils.WaitFirstCheckpoint(replicaIndex, logger: context.logger); // Make this replica of no-one _ = context.clusterTestUtils.ReplicaOf(1, logger: context.logger); diff --git a/test/Garnet.test.cluster/ClusterTestUtils.cs b/test/Garnet.test.cluster/ClusterTestUtils.cs index 1964a462ce..c0531be2b5 100644 --- a/test/Garnet.test.cluster/ClusterTestUtils.cs +++ b/test/Garnet.test.cluster/ClusterTestUtils.cs @@ -2647,16 +2647,39 @@ public void Checkpoint(IPEndPoint endPoint, ILogger logger = null) } } - public void WaitFirstCheckpoint(int nodeIndex, ILogger logger = null) - => WaitCheckpoint((IPEndPoint)endpoints[nodeIndex], logger: logger); + public DateTime LastSave(int nodeIndex, ILogger logger = null) + => LastSave((IPEndPoint)endpoints[nodeIndex], logger: logger); - public void WaitCheckpoint(IPEndPoint endPoint, ILogger logger = null) + public DateTime LastSave(IPEndPoint endPoint, ILogger logger = null) { try { var server = redis.GetServer(endPoint); - while (server.LastSave().Ticks == DateTimeOffset.FromUnixTimeSeconds(0).Ticks) + return server.LastSave(); + } + catch (Exception ex) + { + logger?.LogError(ex, "An error has occurred; WaitCheckpoint"); + Assert.Fail(); + } + return default; + } + + public void WaitCheckpoint(int nodeIndex, DateTime time, ILogger logger = null) + => WaitCheckpoint((IPEndPoint)endpoints[nodeIndex], time: time, logger: logger); + + public void WaitCheckpoint(IPEndPoint endPoint, DateTime time, ILogger logger = null) + { + try + { + var server = redis.GetServer(endPoint); + while (true) + { + var lastSaveTime = server.LastSave(); + if (lastSaveTime >= time) + break; BackOff(); + } } catch (Exception ex) { From 5cce676bc6550a427fde584bef2aa492b34186ec Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Tue, 7 May 2024 17:56:56 -0700 Subject: [PATCH 3/3] GenericAllocator flush cleanup (#365) --- .../cs/src/core/Allocator/GenericAllocator.cs | 353 ++++++++++-------- 1 file changed, 194 insertions(+), 159 deletions(-) diff --git a/libs/storage/Tsavorite/cs/src/core/Allocator/GenericAllocator.cs b/libs/storage/Tsavorite/cs/src/core/Allocator/GenericAllocator.cs index b96081735d..497e6f2fbe 100644 --- a/libs/storage/Tsavorite/cs/src/core/Allocator/GenericAllocator.cs +++ b/libs/storage/Tsavorite/cs/src/core/Allocator/GenericAllocator.cs @@ -420,204 +420,239 @@ private void WriteAsync(long flushPage, ulong alignedDestinationAddres // This is the in-memory buffer page to be written var src = values[flushPage % BufferSize]; - // Temporary storage to hold the image "template" we'll write to disk: It will have RecordInfos and object pointers that will be overwritten by addresses - // when writing to the main log (both object pointers and addresses are 8 bytes). - var buffer = bufferPool.Get((int)numBytesToWrite); - - if (aligned_start < start && (KeyHasObjects() || ValueHasObjects())) + // We create a shadow copy of the page if we are under epoch protection. + // This copy ensures that object references are kept valid even if the original page is reclaimed. + // We suspend epoch during the actual flush as that can take a long time. + bool epochProtected = false; + if (epoch.ThisInstanceProtected()) + { + epochProtected = true; + src = new Record[values[flushPage % BufferSize].Length]; + Array.Copy(values[flushPage % BufferSize], src, values[flushPage % BufferSize].Length); + epoch.Suspend(); + } + try { - // Do not read back the invalid header of page 0 - if ((flushPage > 0) || (start > GetFirstValidLogicalAddress(flushPage))) + // Temporary storage to hold the image "template" we'll write to disk: It will have RecordInfos and object pointers that will be overwritten by addresses + // when writing to the main log (both object pointers and addresses are 8 bytes). + var buffer = bufferPool.Get((int)numBytesToWrite); + + if (aligned_start < start && (KeyHasObjects() || ValueHasObjects())) { - // Get the overlapping HLOG from disk as we wrote it with object pointers previously. This avoids object reserialization - PageAsyncReadResult result = new() + // Do not read back the invalid header of page 0 + if ((flushPage > 0) || (start > GetFirstValidLogicalAddress(flushPage))) { - handle = new CountdownEvent(1) - }; - device.ReadAsync(alignedDestinationAddress + (ulong)aligned_start, (IntPtr)buffer.aligned_pointer + aligned_start, - (uint)sectorSize, AsyncReadPageCallback, result); - result.handle.Wait(); - } - fixed (RecordInfo* pin = &src[0].info) - { - // Write all the RecordInfos on one operation. This also includes object pointers, but for valid records we will overwrite those below. - Debug.Assert(buffer.aligned_pointer + numBytesToWrite <= (byte*)Unsafe.AsPointer(ref buffer.buffer[0]) + buffer.buffer.Length); + // Get the overlapping HLOG from disk as we wrote it with object pointers previously. This avoids object reserialization + PageAsyncReadResult result = new() + { + handle = new CountdownEvent(1) + }; + device.ReadAsync(alignedDestinationAddress + (ulong)aligned_start, (IntPtr)buffer.aligned_pointer + aligned_start, + (uint)sectorSize, AsyncReadPageCallback, result); + result.handle.Wait(); + } + fixed (RecordInfo* pin = &src[0].info) + { + // Write all the RecordInfos on one operation. This also includes object pointers, but for valid records we will overwrite those below. + Debug.Assert(buffer.aligned_pointer + numBytesToWrite <= (byte*)Unsafe.AsPointer(ref buffer.buffer[0]) + buffer.buffer.Length); - Buffer.MemoryCopy((void*)((long)Unsafe.AsPointer(ref src[0]) + start), buffer.aligned_pointer + start, - numBytesToWrite - start, numBytesToWrite - start); + Buffer.MemoryCopy((void*)((long)Unsafe.AsPointer(ref src[0]) + start), buffer.aligned_pointer + start, + numBytesToWrite - start, numBytesToWrite - start); + } } - } - else - { - fixed (RecordInfo* pin = &src[0].info) + else { - // Write all the RecordInfos on one operation. This also includes object pointers, but for valid records we will overwrite those below. - Debug.Assert(buffer.aligned_pointer + numBytesToWrite <= (byte*)Unsafe.AsPointer(ref buffer.buffer[0]) + buffer.buffer.Length); + fixed (RecordInfo* pin = &src[0].info) + { + // Write all the RecordInfos on one operation. This also includes object pointers, but for valid records we will overwrite those below. + Debug.Assert(buffer.aligned_pointer + numBytesToWrite <= (byte*)Unsafe.AsPointer(ref buffer.buffer[0]) + buffer.buffer.Length); - Buffer.MemoryCopy((void*)((long)Unsafe.AsPointer(ref src[0]) + aligned_start), buffer.aligned_pointer + aligned_start, - numBytesToWrite - aligned_start, numBytesToWrite - aligned_start); + Buffer.MemoryCopy((void*)((long)Unsafe.AsPointer(ref src[0]) + aligned_start), buffer.aligned_pointer + aligned_start, + numBytesToWrite - aligned_start, numBytesToWrite - aligned_start); + } } - } - // In the main log, we write addresses to pages in the object log. This array saves the addresses of the key and/or value fields in 'buffer', - // which again is the image we're building from the 'values' "page" for this write. The "addresses into 'buffer'" are cast below to AddressInfo - // structures and stored in the sequence we'll write them: alternating series of key then value if both are object types, else keys or values only. - List addr = new List(); - asyncResult.freeBuffer1 = buffer; + // In the main log, we write addresses to pages in the object log. This array saves the addresses of the key and/or value fields in 'buffer', + // which again is the image we're building from the 'values' "page" for this write. The "addresses into 'buffer'" are cast below to AddressInfo + // structures and stored in the sequence we'll write them: alternating series of key then value if both are object types, else keys or values only. + List addr = new List(); + asyncResult.freeBuffer1 = buffer; - // Object keys and values are serialized into this MemoryStream. - MemoryStream ms = new(); - IObjectSerializer keySerializer = null; - IObjectSerializer valueSerializer = null; + // Object keys and values are serialized into this MemoryStream. + MemoryStream ms = new(); + IObjectSerializer keySerializer = null; + IObjectSerializer valueSerializer = null; - if (KeyHasObjects()) - { - keySerializer = SerializerSettings.keySerializer(); - keySerializer.BeginSerialize(ms); - } - if (ValueHasObjects()) - { - valueSerializer = SerializerSettings.valueSerializer(); - valueSerializer.BeginSerialize(ms); - } + if (KeyHasObjects()) + { + keySerializer = SerializerSettings.keySerializer(); + keySerializer.BeginSerialize(ms); + } + if (ValueHasObjects()) + { + valueSerializer = SerializerSettings.valueSerializer(); + valueSerializer.BeginSerialize(ms); + } - // Track the size to be written to the object log. - long endPosition = 0; + // Track the size to be written to the object log. + long endPosition = 0; - for (int i = start / RecordSize; i < end / RecordSize; i++) - { - if (!src[i].info.Invalid) + for (int i = start / RecordSize; i < end / RecordSize; i++) { - // Calculate the logical address of the 'values' page currently being written. - var address = (flushPage << LogPageSizeBits) + i * RecordSize; + byte* recordPtr = buffer.aligned_pointer + i * RecordSize; - // Do not write v+1 records (e.g. during a checkpoint) - if (address < fuzzyStartLogicalAddress || !src[i].info.IsInNewVersion) - { - if (KeyHasObjects()) - { - long pos = ms.Position; - keySerializer.Serialize(ref src[i].key); - - // Store the key address into the 'buffer' AddressInfo image as an offset into 'ms'. - var key_address = GetKeyAddressInfo((long)(buffer.aligned_pointer + i * RecordSize)); - key_address->Address = pos; - key_address->Size = (int)(ms.Position - pos); - addr.Add((long)key_address); - endPosition = pos + key_address->Size; - } + // Retrieve reference to record struct + ref var record = ref Unsafe.AsRef>(recordPtr); + AddressInfo* key_address = null, value_address = null; - if (ValueHasObjects() && !src[i].info.Tombstone) - { - long pos = ms.Position; - valueSerializer.Serialize(ref src[i].value); - - // Store the value address into the 'buffer' AddressInfo image as an offset into 'ms'. - var value_address = GetValueAddressInfo((long)(buffer.aligned_pointer + i * RecordSize)); - value_address->Address = pos; - value_address->Size = (int)(ms.Position - pos); - addr.Add((long)value_address); - endPosition = pos + value_address->Size; - } - } - else + // Zero out object reference addresses (AddressInfo) in the planned disk image + if (KeyHasObjects()) { - // Mark v+1 records as invalid to avoid deserializing them on recovery - ref var record = ref Unsafe.AsRef>(buffer.aligned_pointer + i * RecordSize); - record.info.SetInvalid(); + key_address = GetKeyAddressInfo((long)recordPtr); + *key_address = default; } - } - - // If this record's serialized size surpassed ObjectBlockSize or it's the last record to be written, write to the object log. - if (endPosition > ObjectBlockSize || i == (end / RecordSize) - 1) - { - var memoryStreamActualLength = ms.Position; - var memoryStreamTotalLength = (int)endPosition; - endPosition = 0; - - if (KeyHasObjects()) - keySerializer.EndSerialize(); if (ValueHasObjects()) - valueSerializer.EndSerialize(); - ms.Close(); - - // Get the total serialized length rounded up to sectorSize - var _alignedLength = (memoryStreamTotalLength + (sectorSize - 1)) & ~(sectorSize - 1); - - // Reserve the current address in the object log segment offsets for this chunk's write operation. - var _objAddr = Interlocked.Add(ref localSegmentOffsets[(long)(alignedDestinationAddress >> LogSegmentSizeBits) % SegmentBufferSize], _alignedLength) - _alignedLength; + { + value_address = GetValueAddressInfo((long)recordPtr); + *value_address = default; + } - // Allocate the object-log buffer to build the image we'll write to disk, then copy to it from the memory stream. - SectorAlignedMemory _objBuffer = null; - if (memoryStreamTotalLength > 0) + // Now fill in AddressInfo data for the valid records + if (!record.info.Invalid) { - _objBuffer = bufferPool.Get(memoryStreamTotalLength); + // Calculate the logical address of the 'values' page currently being written. + var address = (flushPage << LogPageSizeBits) + i * RecordSize; - fixed (void* src_ = ms.GetBuffer()) - Buffer.MemoryCopy(src_, _objBuffer.aligned_pointer, memoryStreamTotalLength, memoryStreamActualLength); - } + // Do not write v+1 records (e.g. during a checkpoint) + if (address < fuzzyStartLogicalAddress || !record.info.IsInNewVersion) + { + if (KeyHasObjects()) + { + long pos = ms.Position; + keySerializer.Serialize(ref src[i].key); + + // Store the key address into the 'buffer' AddressInfo image as an offset into 'ms'. + key_address->Address = pos; + key_address->Size = (int)(ms.Position - pos); + addr.Add((long)key_address); + endPosition = pos + key_address->Size; + } - // Each address we calculated above is now an offset to objAddr; convert to the actual address. - foreach (var address in addr) - ((AddressInfo*)address)->Address += _objAddr; + if (ValueHasObjects() && !record.info.Tombstone) + { + long pos = ms.Position; + valueSerializer.Serialize(ref src[i].value); + + // Store the value address into the 'buffer' AddressInfo image as an offset into 'ms'. + value_address->Address = pos; + value_address->Size = (int)(ms.Position - pos); + addr.Add((long)value_address); + endPosition = pos + value_address->Size; + } + } + else + { + // Mark v+1 records as invalid to avoid deserializing them on recovery + record.info.SetInvalid(); + } + } - // If we have not written all records, prepare for the next chunk of records to be written. - if (i < (end / RecordSize) - 1) + // If this record's serialized size surpassed ObjectBlockSize or it's the last record to be written, write to the object log. + if (endPosition > ObjectBlockSize || i == (end / RecordSize) - 1) { - // Create a new MemoryStream for the next chunk of records to be written. - ms = new MemoryStream(); + var memoryStreamActualLength = ms.Position; + var memoryStreamTotalLength = (int)endPosition; + endPosition = 0; + if (KeyHasObjects()) - keySerializer.BeginSerialize(ms); + keySerializer.EndSerialize(); if (ValueHasObjects()) - valueSerializer.BeginSerialize(ms); - - // Reset address list for the next chunk of records to be written. - addr = new List(); - - // Write this chunk of records to the object log device. - asyncResult.done = new AutoResetEvent(false); - Debug.Assert(memoryStreamTotalLength > 0); - objlogDevice.WriteAsync( - (IntPtr)_objBuffer.aligned_pointer, - (int)(alignedDestinationAddress >> LogSegmentSizeBits), - (ulong)_objAddr, (uint)_alignedLength, AsyncFlushPartialObjectLogCallback, asyncResult); - - // Wait for write to complete before resuming next write - asyncResult.done.WaitOne(); - _objBuffer.Return(); - } - else - { - // We have written all records in this 'values' "page". + valueSerializer.EndSerialize(); + ms.Close(); + + // Get the total serialized length rounded up to sectorSize + var _alignedLength = (memoryStreamTotalLength + (sectorSize - 1)) & ~(sectorSize - 1); + + // Reserve the current address in the object log segment offsets for this chunk's write operation. + var _objAddr = Interlocked.Add(ref localSegmentOffsets[(long)(alignedDestinationAddress >> LogSegmentSizeBits) % SegmentBufferSize], _alignedLength) - _alignedLength; + + // Allocate the object-log buffer to build the image we'll write to disk, then copy to it from the memory stream. + SectorAlignedMemory _objBuffer = null; if (memoryStreamTotalLength > 0) { - // Increment the count because we need to write both page and object cache. - Interlocked.Increment(ref asyncResult.count); + _objBuffer = bufferPool.Get(memoryStreamTotalLength); + + fixed (void* src_ = ms.GetBuffer()) + Buffer.MemoryCopy(src_, _objBuffer.aligned_pointer, memoryStreamTotalLength, memoryStreamActualLength); + } + + // Each address we calculated above is now an offset to objAddr; convert to the actual address. + foreach (var address in addr) + ((AddressInfo*)address)->Address += _objAddr; - asyncResult.freeBuffer2 = _objBuffer; + // If we have not written all records, prepare for the next chunk of records to be written. + if (i < (end / RecordSize) - 1) + { + // Create a new MemoryStream for the next chunk of records to be written. + ms = new MemoryStream(); + if (KeyHasObjects()) + keySerializer.BeginSerialize(ms); + if (ValueHasObjects()) + valueSerializer.BeginSerialize(ms); + + // Reset address list for the next chunk of records to be written. + addr = new List(); + + // Write this chunk of records to the object log device. + asyncResult.done = new AutoResetEvent(false); + Debug.Assert(memoryStreamTotalLength > 0); objlogDevice.WriteAsync( (IntPtr)_objBuffer.aligned_pointer, (int)(alignedDestinationAddress >> LogSegmentSizeBits), - (ulong)_objAddr, (uint)_alignedLength, callback, asyncResult); + (ulong)_objAddr, (uint)_alignedLength, AsyncFlushPartialObjectLogCallback, asyncResult); + + // Wait for write to complete before resuming next write + asyncResult.done.WaitOne(); + _objBuffer.Return(); + } + else + { + // We have written all records in this 'values' "page". + if (memoryStreamTotalLength > 0) + { + // Increment the count because we need to write both page and object cache. + Interlocked.Increment(ref asyncResult.count); + + asyncResult.freeBuffer2 = _objBuffer; + objlogDevice.WriteAsync( + (IntPtr)_objBuffer.aligned_pointer, + (int)(alignedDestinationAddress >> LogSegmentSizeBits), + (ulong)_objAddr, (uint)_alignedLength, callback, asyncResult); + } } } } - } - if (asyncResult.partial) - { - // We're writing only a subset of the page, so update our count of bytes to write. - var aligned_end = (int)(asyncResult.untilAddress - (asyncResult.page << LogPageSizeBits)); - aligned_end = (aligned_end + (sectorSize - 1)) & ~(sectorSize - 1); - numBytesToWrite = (uint)(aligned_end - aligned_start); - } + if (asyncResult.partial) + { + // We're writing only a subset of the page, so update our count of bytes to write. + var aligned_end = (int)(asyncResult.untilAddress - (asyncResult.page << LogPageSizeBits)); + aligned_end = (aligned_end + (sectorSize - 1)) & ~(sectorSize - 1); + numBytesToWrite = (uint)(aligned_end - aligned_start); + } - // Round up the number of byte to write to sector alignment. - var alignedNumBytesToWrite = (uint)((numBytesToWrite + (sectorSize - 1)) & ~(sectorSize - 1)); + // Round up the number of byte to write to sector alignment. + var alignedNumBytesToWrite = (uint)((numBytesToWrite + (sectorSize - 1)) & ~(sectorSize - 1)); - // Finally write the hlog page - device.WriteAsync((IntPtr)buffer.aligned_pointer + aligned_start, alignedDestinationAddress + (ulong)aligned_start, - alignedNumBytesToWrite, callback, asyncResult); + // Finally write the hlog page + device.WriteAsync((IntPtr)buffer.aligned_pointer + aligned_start, alignedDestinationAddress + (ulong)aligned_start, + alignedNumBytesToWrite, callback, asyncResult); + } + finally + { + if (epochProtected) + epoch.Resume(); + } } private void AsyncReadPageCallback(uint errorCode, uint numBytes, object context)