diff --git a/Garnet.slnx b/Garnet.slnx
index 9208610c2ed..4ab7db32f9f 100644
--- a/Garnet.slnx
+++ b/Garnet.slnx
@@ -88,6 +88,7 @@
+
diff --git a/libs/cluster/Server/Replication/ReplicaOps/AOFReplay/ReplicaReplayDriver.cs b/libs/cluster/Server/Replication/ReplicaOps/AOFReplay/ReplicaReplayDriver.cs
index 4c59009ba00..c380b89fd18 100644
--- a/libs/cluster/Server/Replication/ReplicaOps/AOFReplay/ReplicaReplayDriver.cs
+++ b/libs/cluster/Server/Replication/ReplicaOps/AOFReplay/ReplicaReplayDriver.cs
@@ -132,7 +132,8 @@ private unsafe void ConsumeSchedulePage(byte* record, int recordLength, long cur
replicationManager.SetSublogReplicationOffset(physicalSublogIdx, currentAddress);
// Wait for replay to complete.
- replayBatchContext.LeaderFollowerBarrier.WaitCompleted(serverOptions.ReplicaSyncTimeout, cts.Token);
+ if (!replayBatchContext.LeaderFollowerBarrier.WaitCompleted(serverOptions.ReplicaSyncTimeout, cts.Token))
+ ExceptionUtils.ThrowException(new GarnetException("Timed out waiting for parallel replay tasks to complete", LogLevel.Warning, clientResponse: false));
// Release participants for next cycle
replayBatchContext.LeaderFollowerBarrier.Release();
@@ -159,11 +160,13 @@ private unsafe void ConsumeScheduleChannel(byte* record, int recordLength, long
if (payloadLength > 0)
{
var entryPtr = ptr + entryLength;
+ var logAddressSequenceNumber = currentAddress + (ptr - record);
var replayTaskIdx = replicationManager.AofProcessor.GetReplayTaskIdx(entryPtr);
replayTasks[replayTaskIdx].AddRecord(new ReplayRecord()
{
entryPtr = entryPtr,
- payloadLength = payloadLength
+ payloadLength = payloadLength,
+ logAddressSequenceNumber = logAddressSequenceNumber
});
entryLength += TsavoriteLog.UnsafeAlign(payloadLength);
}
@@ -229,7 +232,8 @@ private unsafe void ConsumeDirect(byte* record, int recordLength, long currentAd
var payloadLength = physicalSublog.UnsafeGetLength(ptr);
if (payloadLength > 0)
{
- replicationManager.AofProcessor.ProcessAofRecordInternal(physicalSublogIdx, ptr + entryLength, payloadLength, true, out var isCheckpointStart);
+ var logAddressSequenceNumber = currentAddress + (ptr - record);
+ replicationManager.AofProcessor.ProcessAofRecordInternal(physicalSublogIdx, ptr + entryLength, payloadLength, true, out var isCheckpointStart, logAddressSequenceNumber);
// Encountered checkpoint start marker, log the ReplicationCheckpointStartOffset so we know the correct AOF truncation
// point when we take a checkpoint at the checkpoint end marker
if (isCheckpointStart)
diff --git a/libs/cluster/Server/Replication/ReplicaOps/AOFReplay/ReplicaReplayTask.cs b/libs/cluster/Server/Replication/ReplicaOps/AOFReplay/ReplicaReplayTask.cs
index b144483d30a..ad820a08df9 100644
--- a/libs/cluster/Server/Replication/ReplicaOps/AOFReplay/ReplicaReplayTask.cs
+++ b/libs/cluster/Server/Replication/ReplicaOps/AOFReplay/ReplicaReplayTask.cs
@@ -18,6 +18,7 @@ internal unsafe struct ReplayRecord
{
public byte* entryPtr;
public int payloadLength;
+ public long logAddressSequenceNumber;
}
internal sealed class ReplicaReplayTask(
@@ -62,6 +63,10 @@ internal async Task FullPageBasedBackgroundReplayAsync()
{
await replayBatchContext.LeaderFollowerBarrier.WaitReadyWorkAsync(cancellationToken: cts.Token).ConfigureAwait(false);
}
+ catch (TaskCanceledException) when (cts.Token.IsCancellationRequested)
+ {
+ // Suppress the exception if the task was cancelled because of store wrapper disposal
+ }
catch (Exception ex)
{
logger?.LogError(ex, "{method} failed at WaitAsync", nameof(FullPageBasedBackgroundReplayAsync));
@@ -69,6 +74,11 @@ internal async Task FullPageBasedBackgroundReplayAsync()
break;
}
+ // Guard: if cancellation happened during WaitReadyWorkAsync, exit cleanly
+ // without falling through to the processing block (which would issue a spurious SignalCompleted)
+ if (cts.Token.IsCancellationRequested)
+ break;
+
try
{
unsafe
@@ -80,9 +90,6 @@ internal async Task FullPageBasedBackgroundReplayAsync()
var isProtected = replayBatchContext.IsProtected;
var ptr = record;
- var maxSequenceNumber = 0L;
-
- // logger?.LogError("[{sublogIdx},{replayIdx}] = {currentAddress} -> {nextAddress}", sublogIdx, replayIdx, currentAddress, nextAddress);
while (ptr < record + recordLength)
{
cts.Token.ThrowIfCancellationRequested();
@@ -91,24 +98,23 @@ internal async Task FullPageBasedBackgroundReplayAsync()
if (payloadLength > 0)
{
var entryPtr = ptr + entryLength;
- if (replicationManager.AofProcessor.CanReplay(entryPtr, replayTaskIdx, out var sequenceNumber))
+ var logAddressSequenceNumber = currentAddress + (ptr - record);
+ if (replicationManager.AofProcessor.CanReplay(entryPtr, replayTaskIdx, logAddressSequenceNumber, out _))
{
- replicationManager.AofProcessor.ProcessAofRecordInternal(virtualSublogIdx, entryPtr, payloadLength, true, out var isCheckpointStart);
+ replicationManager.AofProcessor.ProcessAofRecordInternal(virtualSublogIdx, entryPtr, payloadLength, true, out var isCheckpointStart, logAddressSequenceNumber);
// Encountered checkpoint start marker, log the ReplicationCheckpointStartOffset so we know the correct AOF truncation
// point when we take a checkpoint at the checkpoint end marker
if (isCheckpointStart)
{
- // logger?.LogError("[{sublogIdx}] CheckpointStart {address}", sublogIdx, clusterProvider.replicationManager.GetSublogReplicationOffset(sublogIdx));
replicationManager.ReplicationCheckpointStartOffset[physicalSublogIdx] = replicationManager.GetSublogReplicationOffset(physicalSublogIdx);
}
}
- maxSequenceNumber = Math.Max(sequenceNumber, maxSequenceNumber);
entryLength += TsavoriteLog.UnsafeAlign(payloadLength);
}
else if (payloadLength < 0)
{
if (!clusterProvider.serverOptions.EnableFastCommit)
- throw new GarnetException("Received FastCommit request at replica AOF processor, but FastCommit is not enabled", clientResponse: false);
+ ExceptionUtils.ThrowException(new GarnetException("Received FastCommit request at replica AOF processor, but FastCommit is not enabled", clientResponse: false));
// Only a single thread should commit metadata
if (replayTaskIdx == 0)
@@ -122,10 +128,16 @@ internal async Task FullPageBasedBackgroundReplayAsync()
ptr += entryLength;
}
- // Update max sequence number for this virtual sublog which is mapped
- appendOnlyFile.readConsistencyManager.UpdateVirtualSublogMaxSequenceNumber(virtualSublogIdx, maxSequenceNumber);
+ // Advance frontier to nextAddress (past all entries in this page).
+ // This ensures the read consistency protocol (which waits for frontier > sessionSeq)
+ // can proceed once all writes in the page are complete.
+ appendOnlyFile.readConsistencyManager.UpdateVirtualSublogMaxSequenceNumber(virtualSublogIdx, nextAddress);
}
}
+ catch (TaskCanceledException) when (cts.Token.IsCancellationRequested)
+ {
+ // Suppress the exception if the task was cancelled because of store wrapper disposal
+ }
catch (Exception ex)
{
logger?.LogError(ex, "{method} failed at replaying", nameof(FullPageBasedBackgroundReplayAsync));
@@ -156,7 +168,7 @@ internal async Task ChannelBasedBackgroundReplayAsync()
{
unsafe
{
- replicationManager.AofProcessor.ProcessAofRecordInternal(virtualSublogIdx, record.entryPtr, record.payloadLength, true, out var isCheckpointStart);
+ replicationManager.AofProcessor.ProcessAofRecordInternal(virtualSublogIdx, record.entryPtr, record.payloadLength, true, out var isCheckpointStart, record.logAddressSequenceNumber);
// Encountered checkpoint start marker, log the ReplicationCheckpointStartOffset so we know the correct AOF truncation
// point when we take a checkpoint at the checkpoint end marker
diff --git a/libs/common/Synchronization/LeaderFollowerBarrier.cs b/libs/common/Synchronization/LeaderFollowerBarrier.cs
index 88cd0d7807e..60007faf3af 100644
--- a/libs/common/Synchronization/LeaderFollowerBarrier.cs
+++ b/libs/common/Synchronization/LeaderFollowerBarrier.cs
@@ -40,7 +40,7 @@ public bool WaitCompleted(TimeSpan timeout = default, CancellationToken cancella
var waitTimeout = ProcessTimeSpan(timeout);
for (var i = 0; i < participantCount; i++)
{
- if (!workCompleted.Wait(waitTimeout, cancellationToken))
+ if (!AsyncUtils.BlockingWait(workCompleted.WaitAsync(waitTimeout, cancellationToken)))
return false;
}
diff --git a/libs/server/AOF/AofEntryType.cs b/libs/server/AOF/AofEntryType.cs
index f1a99ac7054..6c8af82a1be 100644
--- a/libs/server/AOF/AofEntryType.cs
+++ b/libs/server/AOF/AofEntryType.cs
@@ -106,4 +106,26 @@ internal enum AofStoreType : byte
CheckpointType = 0x4,
FlushDbType = 0x5,
}
+
+ internal static class AofEntryTypeExtensions
+ {
+ ///
+ /// Returns true if the entry type carries a key payload after the header.
+ /// Keyless entries (transactions, checkpoints, flush, stored procedures) have no key.
+ ///
+ internal static bool HasKey(this AofEntryType opType) => opType switch
+ {
+ AofEntryType.StoreUpsert or
+ AofEntryType.StoreRMW or
+ AofEntryType.StoreDelete or
+ AofEntryType.ObjectStoreUpsert or
+ AofEntryType.ObjectStoreRMW or
+ AofEntryType.ObjectStoreDelete or
+ AofEntryType.UnifiedStoreStringUpsert or
+ AofEntryType.UnifiedStoreObjectUpsert or
+ AofEntryType.UnifiedStoreRMW or
+ AofEntryType.UnifiedStoreDelete => true,
+ _ => false,
+ };
+ }
}
\ No newline at end of file
diff --git a/libs/server/AOF/AofHeader.cs b/libs/server/AOF/AofHeader.cs
index 64a94b3e72a..d1a27b1d83d 100644
--- a/libs/server/AOF/AofHeader.cs
+++ b/libs/server/AOF/AofHeader.cs
@@ -7,18 +7,42 @@
namespace Garnet.server
{
+ // AOF Header Hierarchy
+ //
+ // The header type determines the wire format of each AOF entry based on the log topology:
+ //
+ // AofHeader (16B) — Base header for all entries. Used standalone with single-log mode.
+ // │
+ // ├── AofShardedHeader (24B) = AofHeader + sequenceNumber
+ // │ Used for per-key entries in multi-physical-log (sharded) mode.
+ // │ The sequence number enables cross-sublog ordering.
+ // │
+ // ├── AofSingleLogTransactionHeader (50B) = AofHeader + participantCount + replayTaskAccessVector
+ // │ Used for coordinated/broadcast operations (transactions, checkpoints, flush)
+ // │ in single-physical-log + multi-replay mode. Uses log addresses for ordering
+ // │ instead of embedded sequence numbers, saving 8B per entry.
+ // │
+ // └── AofShardedLogTransactionHeader (58B) = AofShardedHeader + participantCount + replayTaskAccessVector
+ // Used for coordinated/broadcast operations in multi-physical-log (sharded) mode.
+ // Embeds a sequence number (via AofShardedHeader) for cross-sublog ordering.
+ //
+ // Selection logic:
+ // Single log (1 physical, 1 replay task) → BasicHeader
+ // Single physical log, multi-replay → BasicHeader (per-key), SingleLogTransactionHeader (broadcast)
+ // Multi physical log, multi-replay → ShardedHeader (per-key), ShardedLogTransactionHeader (broadcast)
internal enum AofHeaderType : byte
{
BasicHeader = 0,
ShardedHeader = 1,
- TransactionHeader = 2
+ SingleLogTransactionHeader = 2,
+ ShardedLogTransactionHeader = 3,
}
///
/// Used for coordinated operations
///
[StructLayout(LayoutKind.Explicit, Size = TotalSize)]
- unsafe struct AofTransactionHeader
+ unsafe struct AofShardedLogTransactionHeader
{
public const int TotalSize = AofShardedHeader.TotalSize + 2 + 32;
// maximum 256 replay tasks per physical sublog, hence 32 bytes bitmap
@@ -44,6 +68,35 @@ unsafe struct AofTransactionHeader
public fixed byte replayTaskAccessVector[ReplayTaskAccessVectorBytes];
}
+ ///
+ /// Used for single-physical-log with multi-replay to carry transaction participant info
+ /// without embedding a sequence number (log addresses are used instead).
+ ///
+ [StructLayout(LayoutKind.Explicit, Size = TotalSize)]
+ unsafe struct AofSingleLogTransactionHeader
+ {
+ public const int TotalSize = AofHeader.TotalSize + 2 + AofShardedLogTransactionHeader.ReplayTaskAccessVectorBytes;
+
+ ///
+ /// Basic AOF header
+ ///
+ [FieldOffset(0)]
+ public AofHeader basicHeader;
+
+ ///
+ /// Used for synchronizing virtual sublog replay
+ /// NOTE: This stores the total number of replay tasks that participate in a given transaction.
+ ///
+ [FieldOffset(AofHeader.TotalSize)]
+ public short participantCount;
+
+ ///
+ /// Used to track replay task participating in the txn
+ ///
+ [FieldOffset(AofHeader.TotalSize + 2)]
+ public fixed byte replayTaskAccessVector[AofShardedLogTransactionHeader.ReplayTaskAccessVectorBytes];
+ }
+
///
/// Used for sharded log to add a k
///
@@ -79,7 +132,8 @@ struct AofHeader
{
AofHeaderType.BasicHeader => entryPtr + TotalSize,
AofHeaderType.ShardedHeader => entryPtr + AofShardedHeader.TotalSize,
- AofHeaderType.TransactionHeader => entryPtr + AofTransactionHeader.TotalSize,
+ AofHeaderType.ShardedLogTransactionHeader => entryPtr + AofShardedLogTransactionHeader.TotalSize,
+ AofHeaderType.SingleLogTransactionHeader => entryPtr + AofSingleLogTransactionHeader.TotalSize,
_ => throw new GarnetException($"Type not supported {headerType}"),
};
}
diff --git a/libs/server/AOF/AofProcessor.cs b/libs/server/AOF/AofProcessor.cs
index 90731ee09db..3e368a0813b 100644
--- a/libs/server/AOF/AofProcessor.cs
+++ b/libs/server/AOF/AofProcessor.cs
@@ -20,12 +20,12 @@ unsafe ref struct PreparedParameters
interface IPreprocessKey
{
- public abstract unsafe void PrepareKey(int virtualSublogIdx, byte* entryPtr, out PreparedParameters preparedParameters);
+ public abstract unsafe void PrepareKey(int virtualSublogIdx, byte* entryPtr, long logAddressSequenceNumber, out PreparedParameters preparedParameters);
}
struct SingleLogPreprocessKey : IPreprocessKey
{
- public unsafe void PrepareKey(int virtualSublogIdx, byte* entryPtr, out PreparedParameters preparedParameters)
+ public unsafe void PrepareKey(int virtualSublogIdx, byte* entryPtr, long logAddressSequenceNumber, out PreparedParameters preparedParameters)
{
var keyPtr = entryPtr + sizeof(AofHeader);
preparedParameters = new()
@@ -37,11 +37,33 @@ public unsafe void PrepareKey(int virtualSublogIdx, byte* entryPtr, out Prepared
}
}
+ struct SinglePhysicalLogPreprocessKey : IPreprocessKey
+ {
+ public GarnetAppendOnlyFile appendOnlyFile;
+
+ public unsafe void PrepareKey(int virtualSublogIdx, byte* entryPtr, long logAddressSequenceNumber, out PreparedParameters preparedParameters)
+ {
+ // Single physical log + multi-replay: entries use BasicHeader, ordering via log address
+ var keyPtr = entryPtr + sizeof(AofHeader);
+ preparedParameters = new()
+ {
+ Key = SpanByte.FromLengthPrefixedPinnedPointer(keyPtr)
+ };
+ preparedParameters.KeyHash = GarnetLog.HASH(preparedParameters.Key);
+ preparedParameters.PayloadPtr = keyPtr + preparedParameters.Key.TotalSize();
+ Debug.Assert(logAddressSequenceNumber > 0, "Entry address must be positive for single-physical-log consistency updates");
+ appendOnlyFile.readConsistencyManager.UpdateVirtualSublogKeySequenceNumber(
+ virtualSublogIdx,
+ preparedParameters.KeyHash,
+ logAddressSequenceNumber);
+ }
+ }
+
struct ShardedLogPreprocessKey : IPreprocessKey
{
public GarnetAppendOnlyFile appendOnlyFile;
- public unsafe void PrepareKey(int virtualSublogIdx, byte* entryPtr, out PreparedParameters preparedParameters)
+ public unsafe void PrepareKey(int virtualSublogIdx, byte* entryPtr, long logAddressSequenceNumber, out PreparedParameters preparedParameters)
{
var shardedHeader = *(AofShardedHeader*)entryPtr;
var keyPtr = entryPtr + sizeof(AofShardedHeader);
@@ -89,7 +111,9 @@ public void SetReadWriteSession()
readonly ILogger logger;
readonly bool usingShardedLog;
+ readonly bool usingSinglePhysicalLogMultiReplay;
SingleLogPreprocessKey singleLogPreprocessKey;
+ SinglePhysicalLogPreprocessKey singlePhysicalLogPreprocessKey;
ShardedLogPreprocessKey shardedLogPreprocessKey;
///
@@ -107,8 +131,11 @@ public AofProcessor(
this.activeDbId = 0;
this.usingShardedLog = storeWrapper.serverOptions.AofPhysicalSublogCount > 1 || storeWrapper.serverOptions.AofReplayTaskCount > 1;
- if (usingShardedLog)
+ this.usingSinglePhysicalLogMultiReplay = storeWrapper.serverOptions.AofPhysicalSublogCount == 1 && storeWrapper.serverOptions.AofReplayTaskCount > 1;
+ if (storeWrapper.serverOptions.AofPhysicalSublogCount > 1)
this.shardedLogPreprocessKey = new ShardedLogPreprocessKey() { appendOnlyFile = storeWrapper.appendOnlyFile };
+ else if (usingSinglePhysicalLogMultiReplay)
+ this.singlePhysicalLogPreprocessKey = new SinglePhysicalLogPreprocessKey() { appendOnlyFile = storeWrapper.appendOnlyFile };
else
this.singleLogPreprocessKey = new SingleLogPreprocessKey();
this.obtainServerSession = () => new(0, networkSender: null, storeWrapper: replayAofStoreWrapper, subscribeBroker: null, authenticator: null, enableScripts: false, clusterProvider: clusterProvider);
@@ -161,6 +188,37 @@ private void SwitchActiveDatabaseContext(GarnetDatabase db, bool initialSetup =
public void WaitForVectorOperationsToComplete()
=> activeVectorManager?.WaitForVectorOperationsToComplete();
+ ///
+ /// Extracts sequence number and participant count from a transaction header entry.
+ /// For single-physical-log + multi-replay, uses entry address; for multi-physical-log, uses embedded sequence number.
+ ///
+ void GetSynchronizedOperationParams(byte* ptr, long entryAddress, out long sequenceNumber, out short participantCount)
+ {
+ var headerType = (*(AofHeader*)ptr).HeaderType;
+ switch (headerType)
+ {
+ case AofHeaderType.SingleLogTransactionHeader:
+ sequenceNumber = entryAddress;
+ participantCount = (*(AofSingleLogTransactionHeader*)ptr).participantCount;
+ break;
+ case AofHeaderType.ShardedLogTransactionHeader:
+ var txnHeader = *(AofShardedLogTransactionHeader*)ptr;
+ sequenceNumber = txnHeader.shardedHeader.sequenceNumber;
+ participantCount = txnHeader.participantCount;
+ break;
+ case AofHeaderType.BasicHeader:
+ sequenceNumber = entryAddress;
+ participantCount = (short)storeWrapper.serverOptions.AofReplayTaskCount;
+ break;
+ case AofHeaderType.ShardedHeader:
+ sequenceNumber = (*(AofShardedHeader*)ptr).sequenceNumber;
+ participantCount = (short)storeWrapper.serverOptions.AofReplayTaskCount;
+ break;
+ default:
+ throw new GarnetException($"Unsupported header type: {headerType}");
+ }
+ }
+
///
/// Process AOF record internal
/// NOTE: This method is shared between recover replay and replication replay
@@ -170,10 +228,10 @@ public void WaitForVectorOperationsToComplete()
///
///
///
- public void ProcessAofRecordInternal(int virtualSublogIdx, byte* ptr, int length, bool asReplica, out bool isCheckpointStart)
+ ///
+ public void ProcessAofRecordInternal(int virtualSublogIdx, byte* ptr, int length, bool asReplica, out bool isCheckpointStart, long logAddressSequenceNumber = 0)
{
var header = *(AofHeader*)ptr;
- var shardedHeader = default(AofShardedHeader);
var replayContext = aofReplayCoordinator.GetReplayContext(virtualSublogIdx);
isCheckpointStart = false;
@@ -186,7 +244,7 @@ public void ProcessAofRecordInternal(int virtualSublogIdx, byte* ptr, int length
}
// Handle transactions
- if (aofReplayCoordinator.AddOrReplayTransactionOperation(virtualSublogIdx, ptr, length, asReplica))
+ if (aofReplayCoordinator.AddOrReplayTransactionOperation(virtualSublogIdx, ptr, length, asReplica, logAddressSequenceNumber))
return;
switch (header.opType)
@@ -207,8 +265,9 @@ public void ProcessAofRecordInternal(int virtualSublogIdx, byte* ptr, int length
if (usingShardedLog)
{
- shardedHeader = *(AofShardedHeader*)ptr;
- storeWrapper.appendOnlyFile.readConsistencyManager.UpdateVirtualSublogMaxSequenceNumber(virtualSublogIdx, shardedHeader.sequenceNumber);
+ // For single-physical-log + multi-replay, use entry address; otherwise use embedded sequence number
+ var checkpointSequenceNumber = usingSinglePhysicalLogMultiReplay ? logAddressSequenceNumber : (*(AofShardedHeader*)ptr).sequenceNumber;
+ storeWrapper.appendOnlyFile.readConsistencyManager.UpdateVirtualSublogMaxSequenceNumber(virtualSublogIdx, checkpointSequenceNumber);
}
break;
case AofEntryType.CheckpointEndCommit:
@@ -232,9 +291,11 @@ public void ProcessAofRecordInternal(int virtualSublogIdx, byte* ptr, int length
}
else
{
+ GetSynchronizedOperationParams(ptr, logAddressSequenceNumber, out var seqNum, out var partCount);
aofReplayCoordinator.ProcessSynchronizedOperation(
virtualSublogIdx,
- ptr,
+ seqNum,
+ partCount,
(int)LeaderBarrierType.CHECKPOINT,
() => storeWrapper.TakeCheckpointAsync(background: false, logger: logger));
}
@@ -257,9 +318,11 @@ public void ProcessAofRecordInternal(int virtualSublogIdx, byte* ptr, int length
}
else
{
+ GetSynchronizedOperationParams(ptr, logAddressSequenceNumber, out var seqNum, out var partCount);
aofReplayCoordinator.ProcessSynchronizedOperation(
virtualSublogIdx,
- ptr,
+ seqNum,
+ partCount,
(int)LeaderBarrierType.STREAMING_CHECKPOINT,
() => { storeWrapper.store.SetVersion(header.storeVersion); return Task.CompletedTask; }
);
@@ -271,8 +334,9 @@ public void ProcessAofRecordInternal(int virtualSublogIdx, byte* ptr, int length
Debug.Assert(storeWrapper.serverOptions.ReplicaDisklessSync);
if (usingShardedLog)
{
- shardedHeader = *(AofShardedHeader*)ptr;
- storeWrapper.appendOnlyFile.readConsistencyManager.UpdateVirtualSublogMaxSequenceNumber(virtualSublogIdx, shardedHeader.sequenceNumber);
+ // For single-physical-log + multi-replay, use entry address; otherwise use embedded sequence number
+ var streamingSequenceNumber = usingSinglePhysicalLogMultiReplay ? logAddressSequenceNumber : (*(AofShardedHeader*)ptr).sequenceNumber;
+ storeWrapper.appendOnlyFile.readConsistencyManager.UpdateVirtualSublogMaxSequenceNumber(virtualSublogIdx, streamingSequenceNumber);
}
break;
case AofEntryType.FlushAll:
@@ -282,9 +346,11 @@ public void ProcessAofRecordInternal(int virtualSublogIdx, byte* ptr, int length
}
else
{
+ GetSynchronizedOperationParams(ptr, logAddressSequenceNumber, out var seqNum, out var partCount);
aofReplayCoordinator.ProcessSynchronizedOperation(
virtualSublogIdx,
- ptr,
+ seqNum,
+ partCount,
(int)LeaderBarrierType.FLUSH_DB_ALL,
() => { storeWrapper.FlushAllDatabases(unsafeTruncateLog: header.UnsafeTruncateLog); return Task.CompletedTask; }
);
@@ -297,19 +363,21 @@ public void ProcessAofRecordInternal(int virtualSublogIdx, byte* ptr, int length
}
else
{
+ GetSynchronizedOperationParams(ptr, logAddressSequenceNumber, out var seqNum, out var partCount);
aofReplayCoordinator.ProcessSynchronizedOperation(
virtualSublogIdx,
- ptr,
+ seqNum,
+ partCount,
(int)LeaderBarrierType.FLUSH_DB,
() => { storeWrapper.FlushDatabase(unsafeTruncateLog: header.UnsafeTruncateLog, dbId: header.databaseId); return Task.CompletedTask; }
);
}
break;
case AofEntryType.StoredProcedure:
- aofReplayCoordinator.ReplayStoredProc(virtualSublogIdx, header.procedureId, ptr);
+ aofReplayCoordinator.ReplayStoredProc(virtualSublogIdx, header.procedureId, ptr, logAddressSequenceNumber);
break;
case AofEntryType.TxnCommit:
- aofReplayCoordinator.ProcessFuzzyRegionTransactionGroup(virtualSublogIdx, ptr, asReplica);
+ aofReplayCoordinator.ProcessFuzzyRegionTransactionGroup(virtualSublogIdx, ptr, asReplica, logAddressSequenceNumber);
break;
default:
_ = ReplayOpDispatch(
@@ -321,7 +389,8 @@ public void ProcessAofRecordInternal(int virtualSublogIdx, byte* ptr, int length
replayContext.UnifiedBasicContext,
ptr,
length,
- asReplica);
+ asReplica,
+ logAddressSequenceNumber);
break;
}
}
@@ -335,15 +404,18 @@ internal bool ReplayOpDispatch(
TUnifiedContext unifiedContext,
byte* entryPtr,
int length,
- bool asReplica)
+ bool asReplica,
+ long logAddressSequenceNumber = 0)
where TStringContext : ITsavoriteContext
where TObjectContext : ITsavoriteContext
where TUnifiedContext : ITsavoriteContext
{
- if (usingShardedLog)
- return ReplayOp(virtualSublogIdx, header, replayContext, shardedLogPreprocessKey, stringContext, objectContext, unifiedContext, entryPtr, length, asReplica);
+ if (storeWrapper.serverOptions.AofPhysicalSublogCount > 1)
+ return ReplayOp(virtualSublogIdx, header, replayContext, shardedLogPreprocessKey, stringContext, objectContext, unifiedContext, entryPtr, length, asReplica, logAddressSequenceNumber);
+ else if (usingSinglePhysicalLogMultiReplay)
+ return ReplayOp(virtualSublogIdx, header, replayContext, singlePhysicalLogPreprocessKey, stringContext, objectContext, unifiedContext, entryPtr, length, asReplica, logAddressSequenceNumber);
else
- return ReplayOp(virtualSublogIdx, header, replayContext, singleLogPreprocessKey, stringContext, objectContext, unifiedContext, entryPtr, length, asReplica);
+ return ReplayOp(virtualSublogIdx, header, replayContext, singleLogPreprocessKey, stringContext, objectContext, unifiedContext, entryPtr, length, asReplica, logAddressSequenceNumber);
}
private bool ReplayOp(
@@ -356,7 +428,8 @@ private bool ReplayOp
where TObjectContext : ITsavoriteContext
@@ -376,7 +449,7 @@ private bool ReplayOp
///
///
- ///
+ /// Log address of the entry, used as ordering value for single-physical-log parallel replay mode
+ ///
///
///
- public bool CanReplay(byte* ptr, int replayTaskIdx, out long sequenceNumber)
+ public bool CanReplay(byte* ptr, int replayTaskIdx, long entryAddress, out long logAddressSequenceNumber)
{
var header = *(AofHeader*)ptr;
var replayHeaderType = header.HeaderType;
- sequenceNumber = 0L;
+ logAddressSequenceNumber = 0L;
switch (replayHeaderType)
{
- // Check if should replay entry by inspecting key
+ // Single-physical-log + multi-replay: BasicHeader entries, use entry address for ordering
+ case AofHeaderType.BasicHeader:
+ logAddressSequenceNumber = entryAddress;
+ // Keyless entries (transactions, checkpoints, flush, stored procedures) are processed by all tasks
+ // because they may participate in barriers via ProcessSynchronizedOperation
+ if (!header.opType.HasKey())
+ return true;
+ var basicCurr = AofHeader.SkipHeader(ptr);
+ var basicKey = PinnedSpanByte.FromLengthPrefixedPinnedPointer(basicCurr).ReadOnlySpan;
+ return replayTaskIdx == storeWrapper.appendOnlyFile.Log.GetReplayTaskIdx(basicKey);
+ // Multi-physical-log: ShardedHeader entries with embedded sequence number
case AofHeaderType.ShardedHeader:
var shardedHeader = *(AofShardedHeader*)ptr;
- sequenceNumber = shardedHeader.sequenceNumber;
+ logAddressSequenceNumber = shardedHeader.sequenceNumber;
+ // Keyless entries are processed by task 0 only
+ if (!header.opType.HasKey())
+ return replayTaskIdx == 0;
var curr = AofHeader.SkipHeader(ptr);
var key = PinnedSpanByte.FromLengthPrefixedPinnedPointer(curr).ReadOnlySpan;
- var _replayTaskIdx = storeWrapper.appendOnlyFile.Log.GetReplayTaskIdx(key);
- return replayTaskIdx == _replayTaskIdx;
- // If no key to inspect, check bit vector for participating replay tasks in the transaction
- // NOTE: HeaderType transactions include MULTI-EXEC transactions, custom txn procedures, and any operation that executes across physical and virtual sublogs (e.g. checkpoint, flushdb)
- case AofHeaderType.TransactionHeader:
- var txnHeader = *(AofTransactionHeader*)ptr;
- sequenceNumber = txnHeader.shardedHeader.sequenceNumber;
- var bitVector = BitVector.CopyFrom(new Span(txnHeader.replayTaskAccessVector, AofTransactionHeader.ReplayTaskAccessVectorBytes));
+ return replayTaskIdx == storeWrapper.appendOnlyFile.Log.GetReplayTaskIdx(key);
+ // Single-physical-log + multi-replay: transaction header without sequence number
+ case AofHeaderType.SingleLogTransactionHeader:
+ var singleLogTxnHeader = *(AofSingleLogTransactionHeader*)ptr;
+ logAddressSequenceNumber = entryAddress;
+ var singleLogBitVector = BitVector.CopyFrom(new Span(singleLogTxnHeader.replayTaskAccessVector, AofShardedLogTransactionHeader.ReplayTaskAccessVectorBytes));
+ return singleLogBitVector.IsSet(replayTaskIdx);
+ // Multi-physical-log: transaction header with embedded sequence number
+ case AofHeaderType.ShardedLogTransactionHeader:
+ var txnHeader = *(AofShardedLogTransactionHeader*)ptr;
+ logAddressSequenceNumber = txnHeader.shardedHeader.sequenceNumber;
+ var bitVector = BitVector.CopyFrom(new Span(txnHeader.replayTaskAccessVector, AofShardedLogTransactionHeader.ReplayTaskAccessVectorBytes));
return bitVector.IsSet(replayTaskIdx);
default:
throw new GarnetException($"Replay header type {replayHeaderType} not supported!");
@@ -670,16 +761,19 @@ public int GetReplayTaskIdx(byte* ptr)
var replayHeaderType = header.HeaderType;
switch (replayHeaderType)
{
- // Check if should replay entry by inspecting key
+ // Single-physical-log + multi-replay: BasicHeader entries
+ case AofHeaderType.BasicHeader:
+ var basicCurr = AofHeader.SkipHeader(ptr);
+ var basicKey = PinnedSpanByte.FromLengthPrefixedPinnedPointer(basicCurr).ReadOnlySpan;
+ return storeWrapper.appendOnlyFile.Log.GetReplayTaskIdx(basicKey);
+ // Multi-physical-log: ShardedHeader entries
case AofHeaderType.ShardedHeader:
- var shardedHeader = *(AofShardedHeader*)ptr;
var curr = AofHeader.SkipHeader(ptr);
var key = PinnedSpanByte.FromLengthPrefixedPinnedPointer(curr).ReadOnlySpan;
- var _replayTaskIdx = storeWrapper.appendOnlyFile.Log.GetReplayTaskIdx(key);
- return _replayTaskIdx;
- // If no key to inspect, check bit vector for participating replay tasks in the transaction
- // NOTE: HeaderType transactions include MULTI-EXEC transactions, custom txn procedures, and any operation that executes across physical and virtual sublogs (e.g. checkpoint, flushdb)
- case AofHeaderType.TransactionHeader:
+ return storeWrapper.appendOnlyFile.Log.GetReplayTaskIdx(key);
+ // Transaction headers (both types) don't have a single key for task assignment
+ case AofHeaderType.ShardedLogTransactionHeader:
+ case AofHeaderType.SingleLogTransactionHeader:
return -1;
default:
throw new GarnetException($"Replay header type {replayHeaderType} not supported!");
@@ -687,30 +781,37 @@ public int GetReplayTaskIdx(byte* ptr)
}
///
- /// Determines whether the specified log entry should be skipped during replay based on its sequence number.
+ /// Determines whether the specified log entry should be skipped during replay based on its sequence number or address.
///
- /// A pointer to the start of the log entry header in memory. Must point to a valid header structure.
- /// The sequence number threshold. Entries with a sequence number greater than this value will be skipped.
+ /// A pointer to the start of the log entry header in memory.
+ /// The sequence number/address threshold. Entries beyond this value will be skipped.
/// Specify -1 to skip all entries.
- /// When this method returns, contains the sequence number of the current log entry, or -1 if unavailable.
+ /// Log address of the entry, used for single-physical-log mode.
+ /// When this method returns, contains the sequence number/address of the current log entry, or -1 if unavailable.
/// true if the log entry should be skipped; otherwise, false.
/// Thrown if the log entry header type is not supported.
- public bool SkipReplay(byte* ptr, long untilSequenceNumber, out long entrySequenceNumber)
+ public bool SkipReplay(byte* ptr, long untilSequenceNumber, long logAddressSequenceNumber, out long sequenceNumber)
{
- entrySequenceNumber = -1;
+ sequenceNumber = -1;
if (untilSequenceNumber == -1)
return true;
var header = *(AofHeader*)ptr;
var replayHeaderType = header.HeaderType;
switch (replayHeaderType)
{
+ // Single-physical-log + multi-replay: use entry address
+ case AofHeaderType.BasicHeader:
+ case AofHeaderType.SingleLogTransactionHeader:
+ sequenceNumber = logAddressSequenceNumber;
+ return logAddressSequenceNumber > untilSequenceNumber;
+ // Multi-physical-log: use embedded sequence number
case AofHeaderType.ShardedHeader:
var shardedHeader = *(AofShardedHeader*)ptr;
- entrySequenceNumber = shardedHeader.sequenceNumber;
+ sequenceNumber = shardedHeader.sequenceNumber;
return shardedHeader.sequenceNumber > untilSequenceNumber;
- case AofHeaderType.TransactionHeader:
- var txnHeader = *(AofTransactionHeader*)ptr;
- entrySequenceNumber = txnHeader.shardedHeader.sequenceNumber;
+ case AofHeaderType.ShardedLogTransactionHeader:
+ var txnHeader = *(AofShardedLogTransactionHeader*)ptr;
+ sequenceNumber = txnHeader.shardedHeader.sequenceNumber;
return txnHeader.shardedHeader.sequenceNumber > untilSequenceNumber;
default:
throw new GarnetException($"Replay header type {replayHeaderType} not supported!");
diff --git a/libs/server/AOF/GarnetAppendOnlyFile.cs b/libs/server/AOF/GarnetAppendOnlyFile.cs
index b7d7c159daf..ce7967d89c8 100644
--- a/libs/server/AOF/GarnetAppendOnlyFile.cs
+++ b/libs/server/AOF/GarnetAppendOnlyFile.cs
@@ -66,7 +66,9 @@ public GarnetAppendOnlyFile(GarnetServerOptions serverOptions, TsavoriteLogSetti
InvalidAofAddress = AofAddress.Create(length: serverOptions.AofPhysicalSublogCount, value: -1);
MaxAofAddress = AofAddress.Create(length: serverOptions.AofPhysicalSublogCount, value: long.MaxValue);
CreateOrUpdateKeySequenceManager();
- if (serverOptions.MultiLogEnabled)
+ // Only create sequence number generator for multi-physical-log (sharded) mode.
+ // Single-physical-log + multi-replay uses log addresses instead.
+ if (serverOptions.AofPhysicalSublogCount > 1)
seqNumGen = new SequenceNumberGenerator(0);
this.logger = logger;
Log = new(this, serverOptions, logSettings, logger);
@@ -108,10 +110,12 @@ public void CreateOrUpdateKeySequenceManager()
///
/// Reset sequence number generator.
/// NOTE: We need to update starting offset when recovering or failing over to ensure time moves forward.
+ /// Only applicable for multi-physical-log (sharded) mode.
///
public void ResetSequenceNumberGenerator()
{
- if (!serverOptions.MultiLogEnabled)
+ // Only reset for multi-physical-log mode; single-physical-log uses log addresses
+ if (serverOptions.AofPhysicalSublogCount <= 1)
return;
var physicalSublogMaxReplayedSequenceNumber = readConsistencyManager.GetPhysicalSublogMaxReplayedSequenceNumber();
var start = physicalSublogMaxReplayedSequenceNumber.Max();
diff --git a/libs/server/AOF/GarnetLog.cs b/libs/server/AOF/GarnetLog.cs
index b065e7d405b..ce12dd05153 100644
--- a/libs/server/AOF/GarnetLog.cs
+++ b/libs/server/AOF/GarnetLog.cs
@@ -27,9 +27,8 @@ public sealed class GarnetLog
readonly Func cookieGeneratorCallback;
readonly bool usingSingleLog;
readonly bool usingSinglePhysicalLog;
- readonly uint physicalSublogMask;
- readonly uint physicalSublogCountUpperBound;
- readonly uint replayTaskCountMask;
+ readonly int physicalSublogCount;
+ readonly int replayTaskCount;
public static unsafe long GetSequenceNumberFromCookie(byte[] cookie)
{
@@ -71,10 +70,8 @@ public GarnetLog(GarnetAppendOnlyFile appendOnlyFile, GarnetServerOptions server
this.shardedLog = new ShardedLog(serverOptions.AofPhysicalSublogCount, logSettings, logger: logger);
}
- var roundUp = BitOperations.RoundUpToPowerOf2((uint)serverOptions.AofPhysicalSublogCount);
- physicalSublogMask = roundUp - 1;
- physicalSublogCountUpperBound = (uint)BitOperations.PopCount(physicalSublogMask);
- replayTaskCountMask = BitOperations.RoundUpToPowerOf2((uint)serverOptions.AofReplayTaskCount) - 1;
+ physicalSublogCount = serverOptions.AofPhysicalSublogCount;
+ replayTaskCount = serverOptions.AofReplayTaskCount;
}
public TsavoriteLog SingleLog => singleLog.log;
@@ -92,11 +89,11 @@ public static long HASH(ReadOnlySpan key)
=> GarnetKeyComparer.StaticGetHashCode64((FixedSpanByteKey)key);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- public int GetPhysicalSublogIdx(long hash) => (int)((ulong)hash & physicalSublogMask);
+ public int GetPhysicalSublogIdx(long hash) => (int)((ulong)hash % (uint)physicalSublogCount);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- public int GetReplayTaskIdx(long hash) => (byte)(((ulong)hash >> (int)physicalSublogCountUpperBound) & replayTaskCountMask);
+ public int GetReplayTaskIdx(long hash) => (int)(((ulong)hash / (uint)physicalSublogCount) % (uint)replayTaskCount);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- public int GetVirtualSublogIdx(long hash) => GetPhysicalSublogIdx(hash) * serverOptions.AofReplayTaskCount + GetReplayTaskIdx(hash);
+ public int GetVirtualSublogIdx(long hash) => GetPhysicalSublogIdx(hash) * replayTaskCount + GetReplayTaskIdx(hash);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public int GetPhysicalSublogIdx(ReadOnlySpan key) => GetPhysicalSublogIdx(HASH(key));
@@ -600,7 +597,9 @@ internal void Enqueue(AofEntryType opType, long version,
where TInput : IStoreInput
where TEpochAccessor : IEpochAccessor
{
- if (usingSingleLog)
+ // Single physical log (covers both single-log and single-physical-log + multi-replay)
+ // Uses BasicHeader — log addresses provide ordering for multi-replay consistency
+ if (usingSinglePhysicalLog)
{
var header = new AofHeader
{
@@ -618,6 +617,7 @@ internal void Enqueue(AofEntryType opType, long version,
epochAccessor,
out logicalAddress);
}
+ // Multi physical sublogs and multi-replay support
else
{
var shardedHeader = new AofShardedHeader
@@ -632,31 +632,17 @@ internal void Enqueue(AofEntryType opType, long version,
sequenceNumber = appendOnlyFile.seqNumGen.GetSequenceNumber()
};
- // Multi-replay only support
- if (usingSinglePhysicalLog)
- {
- singleLog.log.Enqueue(shardedHeader,
- key,
- value,
- ref input,
- epochAccessor,
- out logicalAddress);
- }
- // Multi physical sublogs and multi-replay support
- else
- {
- var physicalSublogIdx = GetPhysicalSublogIdx(key);
- shardedLog.sublog[physicalSublogIdx].Enqueue(
- shardedHeader,
- key,
- value,
- ref input,
- epochAccessor,
- out logicalAddress);
-
- if (serverOptions.AofAutoCommit)
- Commit();
- }
+ var physicalSublogIdx = GetPhysicalSublogIdx(key);
+ shardedLog.sublog[physicalSublogIdx].Enqueue(
+ shardedHeader,
+ key,
+ value,
+ ref input,
+ epochAccessor,
+ out logicalAddress);
+
+ if (serverOptions.AofAutoCommit)
+ Commit();
}
}
@@ -664,7 +650,8 @@ internal void Enqueue(AofEntryType opType, long version,
where TInput : IStoreInput
where TEpochAccessor : IEpochAccessor
{
- if (usingSingleLog)
+ // Single physical log (covers both single-log and single-physical-log + multi-replay)
+ if (usingSinglePhysicalLog)
{
var header = new AofHeader
{
@@ -681,6 +668,7 @@ internal void Enqueue(AofEntryType opType, long version,
epochAccessor,
out logicalAddress);
}
+ // Multi physical sublogs and multi-replay support
else
{
var shardedHeader = new AofShardedHeader
@@ -695,36 +683,24 @@ internal void Enqueue(AofEntryType opType, long version,
sequenceNumber = appendOnlyFile.seqNumGen.GetSequenceNumber()
};
- // Multi-replay only support
- if (usingSinglePhysicalLog)
- {
- singleLog.log.Enqueue(shardedHeader,
- key,
- ref input,
- epochAccessor,
- out logicalAddress);
- }
- // Multi physical sublogs and multi-replay support
- else
- {
- var physicalSublogIdx = GetPhysicalSublogIdx(key);
- shardedLog.sublog[physicalSublogIdx].Enqueue(
- shardedHeader,
- key,
- ref input,
- epochAccessor,
- out logicalAddress);
-
- if (serverOptions.AofAutoCommit)
- Commit();
- }
+ var physicalSublogIdx = GetPhysicalSublogIdx(key);
+ shardedLog.sublog[physicalSublogIdx].Enqueue(
+ shardedHeader,
+ key,
+ ref input,
+ epochAccessor,
+ out logicalAddress);
+
+ if (serverOptions.AofAutoCommit)
+ Commit();
}
}
internal void Enqueue(AofEntryType opType, long version, int sessionId, ReadOnlySpan key, ReadOnlySpan value, TEpochAccessor epochAccessor, out long logicalAddress)
where TEpochAccessor : IEpochAccessor
{
- if (usingSingleLog)
+ // Single physical log (covers both single-log and single-physical-log + multi-replay)
+ if (usingSinglePhysicalLog)
{
var header = new AofHeader
{
@@ -741,6 +717,7 @@ internal void Enqueue(AofEntryType opType, long version, int ses
epochAccessor,
out logicalAddress);
}
+ // Multi physical sublogs and multi-replay support
else
{
var shardedHeader = new AofShardedHeader
@@ -755,29 +732,16 @@ internal void Enqueue(AofEntryType opType, long version, int ses
sequenceNumber = appendOnlyFile.seqNumGen.GetSequenceNumber()
};
- // Multi-replay only support
- if (usingSinglePhysicalLog)
- {
- singleLog.log.Enqueue(shardedHeader,
- key,
- value,
- epochAccessor,
- out logicalAddress);
- }
- // Multi physical sublogs and multi-replay support
- else
- {
- var physicalSublogIdx = GetPhysicalSublogIdx(key);
- shardedLog.sublog[physicalSublogIdx].Enqueue(
- shardedHeader,
- key,
- value,
- epochAccessor,
- out logicalAddress);
-
- if (serverOptions.AofAutoCommit)
- Commit();
- }
+ var physicalSublogIdx = GetPhysicalSublogIdx(key);
+ shardedLog.sublog[physicalSublogIdx].Enqueue(
+ shardedHeader,
+ key,
+ value,
+ epochAccessor,
+ out logicalAddress);
+
+ if (serverOptions.AofAutoCommit)
+ Commit();
}
}
@@ -795,15 +759,35 @@ internal unsafe void EnqueueStoredProc(AofEntryType opType, byte procedureId, lo
};
singleLog.log.Enqueue(header, ref procInput, out _);
}
+ else if (usingSinglePhysicalLog)
+ {
+ // Single physical log + multi-replay: use lightweight header without sequence number
+ var singleLogTxnHeader = new AofSingleLogTransactionHeader
+ {
+ basicHeader = new AofHeader
+ {
+ HeaderType = AofHeaderType.SingleLogTransactionHeader,
+ opType = opType,
+ procedureId = procedureId,
+ storeVersion = txnVersion,
+ sessionID = sessionId,
+ },
+ participantCount = (short)proc.virtualSublogParticipantCount
+ };
+
+ proc.replayTaskAccessVector[0].CopyTo(
+ new Span(singleLogTxnHeader.replayTaskAccessVector, AofShardedLogTransactionHeader.ReplayTaskAccessVectorBytes));
+ singleLog.log.Enqueue(singleLogTxnHeader, ref procInput, out _);
+ }
else
{
- var txnHeader = new AofTransactionHeader
+ var txnHeader = new AofShardedLogTransactionHeader
{
shardedHeader = new AofShardedHeader
{
basicHeader = new AofHeader
{
- HeaderType = AofHeaderType.TransactionHeader,
+ HeaderType = AofHeaderType.ShardedLogTransactionHeader,
opType = opType,
procedureId = procedureId,
storeVersion = txnVersion,
@@ -814,42 +798,27 @@ internal unsafe void EnqueueStoredProc(AofEntryType opType, byte procedureId, lo
participantCount = (short)proc.virtualSublogParticipantCount
};
- // Multi-replay only support
- if (usingSinglePhysicalLog)
- {
- // Update corresponding sublog participating vector before enqueue to related physical sublog
- proc.replayTaskAccessVector[0].CopyTo(
- new Span(txnHeader.replayTaskAccessVector, AofTransactionHeader.ReplayTaskAccessVectorBytes));
- // Single log with multi-replay enabled needs to add sharderHeader to implement read protocol.
- // Cookie generator and hence manual commit not needed because single physical sublog commit marker is read consistent when flushed.
- singleLog.log.Enqueue(txnHeader, ref procInput, out _);
- }
- // Multi physical sublogs and multi-replay support
- else
+ try
{
- try
+ if (serverOptions.AofPhysicalSublogCount > 1)
+ LockSublogs(proc.physicalSublogAccessVector);
+ var _physicalSublogAccessVector = proc.physicalSublogAccessVector;
+ while (_physicalSublogAccessVector > 0)
{
- if (serverOptions.AofPhysicalSublogCount > 1)
- LockSublogs(proc.physicalSublogAccessVector);
- var _physicalSublogAccessVector = proc.physicalSublogAccessVector;
- while (_physicalSublogAccessVector > 0)
- {
- var physicalSublogIdx = _physicalSublogAccessVector.GetNextOffset();
- // Update corresponding sublog participating vector before enqueue to related physical sublog
- proc.replayTaskAccessVector[physicalSublogIdx].CopyTo(
- new Span(txnHeader.replayTaskAccessVector, AofTransactionHeader.ReplayTaskAccessVectorBytes));
- shardedLog.sublog[physicalSublogIdx].Enqueue(txnHeader, ref procInput, out _);
- }
+ var physicalSublogIdx = _physicalSublogAccessVector.GetNextOffset();
+ proc.replayTaskAccessVector[physicalSublogIdx].CopyTo(
+ new Span(txnHeader.replayTaskAccessVector, AofShardedLogTransactionHeader.ReplayTaskAccessVectorBytes));
+ shardedLog.sublog[physicalSublogIdx].Enqueue(txnHeader, ref procInput, out _);
}
- finally
- {
- if (serverOptions.AofPhysicalSublogCount > 1)
- UnlockSublogs(proc.physicalSublogAccessVector);
- }
-
- if (serverOptions.AofAutoCommit)
- Commit();
}
+ finally
+ {
+ if (serverOptions.AofPhysicalSublogCount > 1)
+ UnlockSublogs(proc.physicalSublogAccessVector);
+ }
+
+ if (serverOptions.AofAutoCommit)
+ Commit();
}
}
@@ -866,134 +835,70 @@ internal unsafe void EnqueueTxn(AofEntryType opType, long txnVersion, int sessio
};
appendOnlyFile.Log.SingleLog.Enqueue(header, out _);
}
- else
+ else if (usingSinglePhysicalLog)
{
- var txnHeader = new AofTransactionHeader
+ // Single physical log + multi-replay: use lightweight header without sequence number
+ var singleLogTxnHeader = new AofSingleLogTransactionHeader
{
- shardedHeader = new AofShardedHeader
+ basicHeader = new AofHeader
{
- basicHeader = new AofHeader
- {
- HeaderType = AofHeaderType.TransactionHeader,
- opType = opType,
- storeVersion = txnVersion,
- sessionID = sessionId,
- },
- sequenceNumber = appendOnlyFile.seqNumGen.GetSequenceNumber(),
+ HeaderType = AofHeaderType.SingleLogTransactionHeader,
+ opType = opType,
+ storeVersion = txnVersion,
+ sessionID = sessionId,
},
participantCount = (short)virtualSublogParticipantCount
};
- // Multi-replay only support
- if (usingSinglePhysicalLog)
- {
- virtualSublogAccessVector[0].CopyTo(new Span(txnHeader.replayTaskAccessVector, AofTransactionHeader.ReplayTaskAccessVectorBytes));
- // Single log with multi-replay enabled needs to add sharderHeader to implement read protocol.
- // Cookie generator and hence manual commit not needed because single physical sublog commit marker is read consistent when flushed.
- singleLog.log.Enqueue(txnHeader, out _);
- }
- // Multi physical sublogs and multi-replay support
- else
- {
- try
- {
- if (serverOptions.AofPhysicalSublogCount > 1)
- LockSublogs(physicalSublogAccessVector);
- var _physicalSublogAccessVector = physicalSublogAccessVector;
- while (_physicalSublogAccessVector > 0)
- {
- var physicalSublogIdx = _physicalSublogAccessVector.GetNextOffset();
- // Update corresponding sublog participating vector before enqueue to related physical sublog
- virtualSublogAccessVector[physicalSublogIdx].CopyTo(new Span(txnHeader.replayTaskAccessVector, AofTransactionHeader.ReplayTaskAccessVectorBytes));
- shardedLog.sublog[physicalSublogIdx].Enqueue(txnHeader, out _);
- }
- }
- finally
- {
- if (serverOptions.AofPhysicalSublogCount > 1)
- UnlockSublogs(physicalSublogAccessVector);
- }
-
- if (serverOptions.AofAutoCommit)
- Commit();
- }
- }
- }
-
- internal void EnqueueDatabaseCommit(AofEntryType opType, long version)
- {
- if (usingSingleLog)
- {
- var header = new AofHeader
- {
- HeaderType = AofHeaderType.BasicHeader,
- opType = opType,
- storeVersion = version,
- sessionID = -1
- };
- singleLog.log.Enqueue(header, out _);
+ virtualSublogAccessVector[0].CopyTo(new Span(singleLogTxnHeader.replayTaskAccessVector, AofShardedLogTransactionHeader.ReplayTaskAccessVectorBytes));
+ singleLog.log.Enqueue(singleLogTxnHeader, out _);
}
else
{
- var header = new AofTransactionHeader
+ var txnHeader = new AofShardedLogTransactionHeader
{
shardedHeader = new AofShardedHeader
{
basicHeader = new AofHeader
{
- HeaderType = AofHeaderType.TransactionHeader,
+ HeaderType = AofHeaderType.ShardedLogTransactionHeader,
opType = opType,
- storeVersion = version,
- sessionID = -1
+ storeVersion = txnVersion,
+ sessionID = sessionId,
},
- sequenceNumber = appendOnlyFile.seqNumGen.GetSequenceNumber()
+ sequenceNumber = appendOnlyFile.seqNumGen.GetSequenceNumber(),
},
- participantCount = (short)appendOnlyFile.serverOptions.AofVirtualSublogCount
+ participantCount = (short)virtualSublogParticipantCount
};
- unsafe
- {
- new Span(header.replayTaskAccessVector, AofTransactionHeader.ReplayTaskAccessVectorBytes).Fill(0xFF);
- }
- // Multi-replay only support
- if (usingSinglePhysicalLog)
- {
- // Single log with multi-replay enabled needs to add sharderHeader to implement read protocol.
- // Cookie generator and hence manual commit not needed because single physical sublog commit marker is read consistent when flushed.
- singleLog.log.Enqueue(header, out _);
- }
- // Multi physical sublogs and multi-replay support
- else
+ try
{
- var physicalSublogAccessVector = AllLogsBitmask();
- try
- {
- if (serverOptions.AofPhysicalSublogCount > 1)
- LockSublogs(physicalSublogAccessVector);
- var _physicalSublogAccessVector = physicalSublogAccessVector;
-
- while (_physicalSublogAccessVector > 0)
- {
- var physicalSublogIdx = _physicalSublogAccessVector.GetNextOffset();
- shardedLog.sublog[physicalSublogIdx].Enqueue(header, out _);
- }
- }
- finally
+ if (serverOptions.AofPhysicalSublogCount > 1)
+ LockSublogs(physicalSublogAccessVector);
+ var _physicalSublogAccessVector = physicalSublogAccessVector;
+ while (_physicalSublogAccessVector > 0)
{
- if (serverOptions.AofPhysicalSublogCount > 1)
- UnlockSublogs(physicalSublogAccessVector);
+ var physicalSublogIdx = _physicalSublogAccessVector.GetNextOffset();
+ virtualSublogAccessVector[physicalSublogIdx].CopyTo(new Span(txnHeader.replayTaskAccessVector, AofShardedLogTransactionHeader.ReplayTaskAccessVectorBytes));
+ shardedLog.sublog[physicalSublogIdx].Enqueue(txnHeader, out _);
}
-
- if (serverOptions.AofAutoCommit)
- Commit();
}
+ finally
+ {
+ if (serverOptions.AofPhysicalSublogCount > 1)
+ UnlockSublogs(physicalSublogAccessVector);
+ }
+
+ if (serverOptions.AofAutoCommit)
+ Commit();
}
}
- public void Enqueue(AofEntryType opType, long version, int sessionId, ReadOnlySpan key, ref TInput input, out long logicalAddress)
+ internal void Enqueue(AofEntryType opType, long version, int sessionId, ReadOnlySpan key, ref TInput input, out long logicalAddress)
where TInput : IStoreInput
{
- if (usingSingleLog)
+ // Single physical log (covers both single-log and single-physical-log + multi-replay)
+ if (usingSinglePhysicalLog)
{
var header = new AofHeader
{
@@ -1009,6 +914,7 @@ public void Enqueue(AofEntryType opType, long version, int sessionId, Re
ref input,
out logicalAddress);
}
+ // Multi physical sublogs and multi-replay support
else
{
var shardedHeader = new AofShardedHeader
@@ -1023,79 +929,103 @@ public void Enqueue(AofEntryType opType, long version, int sessionId, Re
sequenceNumber = appendOnlyFile.seqNumGen.GetSequenceNumber()
};
- // Multi-replay only support
- if (usingSinglePhysicalLog)
- {
- singleLog.log.Enqueue(shardedHeader,
- key,
- ref input,
- out logicalAddress);
- }
- // Multi physical sublogs and multi-replay support
- else
- {
- var physicalSublogIdx = GetPhysicalSublogIdx(key);
- shardedLog.sublog[physicalSublogIdx].Enqueue(
- shardedHeader,
- key,
- ref input,
- out logicalAddress);
-
- if (serverOptions.AofAutoCommit)
- Commit();
- }
+ var physicalSublogIdx = GetPhysicalSublogIdx(key);
+ shardedLog.sublog[physicalSublogIdx].Enqueue(
+ shardedHeader,
+ key,
+ ref input,
+ out logicalAddress);
+
+ if (serverOptions.AofAutoCommit)
+ Commit();
}
}
- internal unsafe void EnqueueSafeFlushAOF(AofEntryType opType, bool unsafeTruncateLog, int dbId)
+ ///
+ /// Enqueues an entry to all physical sublogs (broadcast). Used for markers that must
+ /// be visible to all replay tasks (e.g., database commit, safe-flush, checkpoint).
+ ///
+ private unsafe void EnqueueBroadcastEntry(AofHeader basicHeader)
{
if (usingSingleLog)
{
- var header = new AofHeader
+ singleLog.log.Enqueue(basicHeader, out _);
+ }
+ else if (usingSinglePhysicalLog)
+ {
+ // Single physical log + multi-replay: use lightweight header without sequence number
+ basicHeader.HeaderType = AofHeaderType.SingleLogTransactionHeader;
+ var singleLogTxnHeader = new AofSingleLogTransactionHeader
{
- HeaderType = AofHeaderType.BasicHeader,
- opType = opType,
- storeVersion = 0,
- sessionID = -1,
- UnsafeTruncateLog = unsafeTruncateLog,
- databaseId = (byte)dbId
+ basicHeader = basicHeader,
+ participantCount = (short)appendOnlyFile.serverOptions.AofVirtualSublogCount
};
- singleLog.log.Enqueue(header, out _);
+ new Span(singleLogTxnHeader.replayTaskAccessVector, AofShardedLogTransactionHeader.ReplayTaskAccessVectorBytes).Fill(0xFF);
+
+ singleLog.log.Enqueue(singleLogTxnHeader, out _);
}
else
{
- var header = new AofTransactionHeader
+ basicHeader.HeaderType = AofHeaderType.ShardedLogTransactionHeader;
+ var header = new AofShardedLogTransactionHeader
{
shardedHeader = new AofShardedHeader
{
- basicHeader = new AofHeader
- {
- HeaderType = AofHeaderType.TransactionHeader,
- opType = opType,
- storeVersion = 0,
- sessionID = -1,
- UnsafeTruncateLog = unsafeTruncateLog,
- databaseId = (byte)dbId
- },
+ basicHeader = basicHeader,
sequenceNumber = appendOnlyFile.seqNumGen.GetSequenceNumber()
},
participantCount = (short)appendOnlyFile.serverOptions.AofVirtualSublogCount
};
- new Span(header.replayTaskAccessVector, AofTransactionHeader.ReplayTaskAccessVectorBytes).Fill(0xFF);
+ new Span(header.replayTaskAccessVector, AofShardedLogTransactionHeader.ReplayTaskAccessVectorBytes).Fill(0xFF);
- // Multi-replay only support
- if (usingSinglePhysicalLog)
+ var physicalSublogAccessVector = AllLogsBitmask();
+ try
{
- // Single log with multi-replay enabled needs to add sharderHeader to implement read protocol.
- // Cookie generator and hence manual commit not needed because single physical sublog commit marker is read consistent when flushed.
- singleLog.log.Enqueue(header, out _);
+ if (serverOptions.AofPhysicalSublogCount > 1)
+ LockSublogs(physicalSublogAccessVector);
+ var _physicalSublogAccessVector = physicalSublogAccessVector;
+
+ while (_physicalSublogAccessVector > 0)
+ {
+ var physicalSublogIdx = _physicalSublogAccessVector.GetNextOffset();
+ shardedLog.sublog[physicalSublogIdx].Enqueue(header, out _);
+ }
}
- // Multi physical sublogs and multi-replay support
- else
+ finally
{
- var physicalSublogAccessVector = appendOnlyFile.Log.AllLogsBitmask();
+ if (serverOptions.AofPhysicalSublogCount > 1)
+ UnlockSublogs(physicalSublogAccessVector);
}
+
+ if (serverOptions.AofAutoCommit)
+ Commit();
}
}
+
+ internal void EnqueueDatabaseCommit(AofEntryType opType, long version)
+ {
+ var basicHeader = new AofHeader
+ {
+ HeaderType = AofHeaderType.BasicHeader,
+ opType = opType,
+ storeVersion = version,
+ sessionID = -1
+ };
+ EnqueueBroadcastEntry(basicHeader);
+ }
+
+ internal void EnqueueSafeFlushAOF(AofEntryType opType, bool unsafeTruncateLog, int dbId)
+ {
+ var basicHeader = new AofHeader
+ {
+ HeaderType = AofHeaderType.BasicHeader,
+ opType = opType,
+ storeVersion = 0,
+ sessionID = -1,
+ UnsafeTruncateLog = unsafeTruncateLog,
+ databaseId = (byte)dbId
+ };
+ EnqueueBroadcastEntry(basicHeader);
+ }
}
}
\ No newline at end of file
diff --git a/libs/server/AOF/ReadConsistency/ReplicaReadSessionContext.cs b/libs/server/AOF/ReadConsistency/ReplicaReadSessionContext.cs
index 99f80ba278a..01543b41bca 100644
--- a/libs/server/AOF/ReadConsistency/ReplicaReadSessionContext.cs
+++ b/libs/server/AOF/ReadConsistency/ReplicaReadSessionContext.cs
@@ -212,6 +212,12 @@ public bool AfterConsistentReadKeyBatch(int keyCount)
return false;
}
+ // Propagate batch context back to session context to maintain prefix consistency
+ // for subsequent single-key reads across different sublogs.
+ replicaReadContext.maximumSessionSequenceNumber = batchReadContext.maximumSessionSequenceNumber;
+ replicaReadContext.lastVirtualSublogIdx = batchReadContext.lastVirtualSublogIdx;
+ replicaReadContext.lastHash = batchReadContext.lastHash;
+
return true;
}
}
diff --git a/libs/server/AOF/Recover/RecoverLogDriver.cs b/libs/server/AOF/Recover/RecoverLogDriver.cs
index 6caf1034e1a..3d365cf5c92 100644
--- a/libs/server/AOF/Recover/RecoverLogDriver.cs
+++ b/libs/server/AOF/Recover/RecoverLogDriver.cs
@@ -2,6 +2,7 @@
// Licensed under the MIT license.
using System;
+using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
@@ -82,13 +83,18 @@ public unsafe void Consume(byte* record, int recordLength, long currentAddress,
if (payloadLength > 0)
{
var entryPtr = ptr + entryLength;
- if (!aofProcessor.SkipReplay(entryPtr, untilSequenceNumber, out var entrySequenceNumber))
+ var logAddressSequenceNumber = currentAddress + (ptr - record);
+ Debug.Assert(logAddressSequenceNumber > 0, "Entry log address must be positive");
+ if (!aofProcessor.SkipReplay(entryPtr, untilSequenceNumber, logAddressSequenceNumber, out var sequenceNumber))
{
- aofProcessor.ProcessAofRecordInternal(physicalSublogIdx, entryPtr, payloadLength, true, out _);
+ aofProcessor.ProcessAofRecordInternal(physicalSublogIdx, entryPtr, payloadLength, true, out _, logAddressSequenceNumber);
}
else
{
- logger?.LogTrace("Skipping entry replay {entrySequenceNumber} > {untilSequenceNumber}", entrySequenceNumber, untilSequenceNumber);
+ // Sequence numbers are monotonically increasing — all subsequent entries will also exceed the threshold
+ logger?.LogTrace("Skipping entry replay {entrySequenceNumber} > {untilSequenceNumber}, stopping", sequenceNumber, untilSequenceNumber);
+ cts.Cancel();
+ break;
}
entryLength += TsavoriteLog.UnsafeAlign(payloadLength);
}
@@ -119,6 +125,13 @@ public unsafe void Consume(byte* record, int recordLength, long currentAddress,
}
else
{
+ // Wait for previous batch to complete before overwriting shared batch context
+ if (replayTasks != null)
+ {
+ replayBatchContext.LeaderFollowerBarrier.WaitCompleted();
+ replayBatchContext.LeaderFollowerBarrier.Release();
+ }
+
CreateAndRunIntraPageParallelReplayTasks();
replayBatchContext.Record = record;
@@ -127,6 +140,14 @@ public unsafe void Consume(byte* record, int recordLength, long currentAddress,
replayBatchContext.NextAddress = nextAddress;
replayBatchContext.IsProtected = isProtected;
replayBatchContext.LeaderFollowerBarrier.SignalWorkReady();
+
+ // After the last batch, wait for workers and cancel to exit BulkConsumeAllAsync
+ if (nextAddress == untilAddress)
+ {
+ replayBatchContext.LeaderFollowerBarrier.WaitCompleted();
+ replayBatchContext.LeaderFollowerBarrier.Release();
+ cts.Cancel();
+ }
}
}
@@ -174,12 +195,18 @@ internal async Task ContinuousBackgroundReplayAsync(int replayTaskIdx, Tsavorite
if (payloadLength > 0)
{
var entryPtr = ptr + entryLength;
+ var logAddressSequenceNumber = currentAddress + (ptr - record);
+ Debug.Assert(logAddressSequenceNumber > 0, "Entry log address must be positive");
// Check if entry is assigned for processing to this replay task and
- // the sequence number is bellow the threshold to ensure prefix consistency
- if (aofProcessor.CanReplay(entryPtr, replayTaskIdx, out var sequenceNumber) &&
- (untilSequenceNumber == -1 || sequenceNumber <= untilSequenceNumber))
+ // the sequence number is below the threshold to ensure prefix consistency
+ if (aofProcessor.CanReplay(entryPtr, replayTaskIdx, logAddressSequenceNumber, out var sequenceNumber))
{
- aofProcessor.ProcessAofRecordInternal(virtualSublogIdx, entryPtr, payloadLength, true, out var isCheckpointStart);
+ if (untilSequenceNumber != -1 && sequenceNumber > untilSequenceNumber)
+ {
+ // Sequence numbers are monotonically increasing — stop processing this batch
+ break;
+ }
+ aofProcessor.ProcessAofRecordInternal(virtualSublogIdx, entryPtr, payloadLength, true, out var isCheckpointStart, logAddressSequenceNumber);
maxSequenceNumber = Math.Max(sequenceNumber, maxSequenceNumber);
}
entryLength += TsavoriteLog.UnsafeAlign(payloadLength);
@@ -213,8 +240,9 @@ internal async Task ContinuousBackgroundReplayAsync(int replayTaskIdx, Tsavorite
}
finally
{
- // Signal work completion after processing
- replayBatchContext.LeaderFollowerBarrier.SignalCompleted();
+ // Signal work completion after processing (skip if cancelled to avoid blocking on resetReady)
+ if (!cts.Token.IsCancellationRequested)
+ replayBatchContext.LeaderFollowerBarrier.SignalCompleted();
}
}
}
diff --git a/libs/server/AOF/ReplayCoordinator/AofReplayContext.cs b/libs/server/AOF/ReplayCoordinator/AofReplayContext.cs
index 5d45fb1c299..adc7ea2cc1d 100644
--- a/libs/server/AOF/ReplayCoordinator/AofReplayContext.cs
+++ b/libs/server/AOF/ReplayCoordinator/AofReplayContext.cs
@@ -69,8 +69,9 @@ public void Dispose()
///
///
///
- public void AddTransactionGroup(int sessionID, int sublogIdx, byte logAccessBitmap)
- => activeTxns[sessionID] = new(sublogIdx, logAccessBitmap);
+ /// Sequence number or entry address of the TxnStart entry
+ public void AddTransactionGroup(int sessionID, int sublogIdx, byte logAccessBitmap, long startSequenceNumber = 0)
+ => activeTxns[sessionID] = new(sublogIdx, logAccessBitmap, startSequenceNumber);
///
/// Add transaction group to fuzzy region buffer
diff --git a/libs/server/AOF/ReplayCoordinator/AofReplayCoordinator.cs b/libs/server/AOF/ReplayCoordinator/AofReplayCoordinator.cs
index 7332a5312cb..0c1d34c6c5b 100644
--- a/libs/server/AOF/ReplayCoordinator/AofReplayCoordinator.cs
+++ b/libs/server/AOF/ReplayCoordinator/AofReplayCoordinator.cs
@@ -112,28 +112,29 @@ public void Dispose()
///
///
///
- internal unsafe void AddFuzzyRegionOperation(int sublogIdx, ReadOnlySpan entry) => aofReplayContext[sublogIdx].fuzzyRegionOps.Add(entry.ToArray());
+ internal void AddFuzzyRegionOperation(int sublogIdx, ReadOnlySpan entry) => aofReplayContext[sublogIdx].fuzzyRegionOps.Add(entry.ToArray());
///
- /// This method will perform one of the followin
+ /// This method will perform one of the following
/// 1. TxnStart: Create a new transaction group
/// 2. TxnCommit: Replay or buffer transaction group depending if we are in fuzzyRegion.
/// 3. TxnAbort: Clear corresponding sublog replay buffer.
/// 4. Default: Add an operation to an existing transaction group
///
- ///
+ ///
///
///
///
+ ///
/// Returns true if a txn operation was processed and added otherwise false
///
- internal unsafe bool AddOrReplayTransactionOperation(int sublogIdx, byte* ptr, int length, bool asReplica)
+ internal bool AddOrReplayTransactionOperation(int virtualSublogIdx, byte* ptr, int length, bool asReplica, long logAddressSequenceNumber = 0)
{
var header = *(AofHeader*)ptr;
- var shardedHeader = default(AofShardedHeader);
- var replayContext = GetReplayContext(sublogIdx);
- // First try to process this as an existing transaction
- if (aofReplayContext[sublogIdx].activeTxns.TryGetValue(header.sessionID, out var group))
+ var replayContext = GetReplayContext(virtualSublogIdx);
+ // Process operation as part of a transaction if it belongs to the same sessionId and
+ // there is already a transaction group associated with it.
+ if (aofReplayContext[virtualSublogIdx].activeTxns.TryGetValue(header.sessionID, out var group))
{
switch (header.opType)
{
@@ -141,8 +142,7 @@ internal unsafe bool AddOrReplayTransactionOperation(int sublogIdx, byte* ptr, i
throw new GarnetException("No nested transactions expected");
case AofEntryType.TxnAbort:
ClearSessionTxn();
- shardedHeader = *(AofShardedHeader*)ptr;
- aofProcessor.storeWrapper.appendOnlyFile.readConsistencyManager.UpdateVirtualSublogMaxSequenceNumber(sublogIdx, shardedHeader.sequenceNumber);
+ UpdateMaxSequenceNumberFromHeader();
break;
case AofEntryType.TxnCommit:
if (replayContext.inFuzzyRegion)
@@ -150,12 +150,12 @@ internal unsafe bool AddOrReplayTransactionOperation(int sublogIdx, byte* ptr, i
// If in fuzzy region we want to record the commit marker and
// buffer the transaction group for later replay
var commitMarker = new ReadOnlySpan(ptr, length);
- aofReplayContext[sublogIdx].AddToFuzzyRegionBuffer(group, commitMarker);
+ aofReplayContext[virtualSublogIdx].AddToFuzzyRegionBuffer(group, commitMarker);
}
else
{
// Otherwise process transaction group immediately
- ProcessTransactionGroup(sublogIdx, ptr, asReplica, group);
+ ProcessTransactionGroup(virtualSublogIdx, ptr, asReplica, group, logAddressSequenceNumber);
}
// We want to clear and remove in both cases to make space for next txn from session
@@ -170,8 +170,8 @@ internal unsafe bool AddOrReplayTransactionOperation(int sublogIdx, byte* ptr, i
void ClearSessionTxn()
{
- aofReplayContext[sublogIdx].activeTxns[header.sessionID].Clear();
- _ = aofReplayContext[sublogIdx].activeTxns.Remove(header.sessionID);
+ aofReplayContext[virtualSublogIdx].activeTxns[header.sessionID].Clear();
+ _ = aofReplayContext[virtualSublogIdx].activeTxns.Remove(header.sessionID);
}
return true;
@@ -181,24 +181,67 @@ void ClearSessionTxn()
switch (header.opType)
{
case AofEntryType.TxnStart:
- var logAccessCount = !serverOptions.MultiLogEnabled ? 0 : (*(AofTransactionHeader*)ptr).participantCount;
- aofReplayContext[sublogIdx].AddTransactionGroup(header.sessionID, sublogIdx, (byte)logAccessCount);
+ var headerType = header.HeaderType;
+ short logAccessCount = 0;
+ long startSeqNum = 0;
+ if (serverOptions.MultiLogEnabled)
+ {
+ switch (headerType)
+ {
+ case AofHeaderType.SingleLogTransactionHeader:
+ logAccessCount = (*(AofSingleLogTransactionHeader*)ptr).participantCount;
+ startSeqNum = logAddressSequenceNumber;
+ break;
+ case AofHeaderType.ShardedLogTransactionHeader:
+ logAccessCount = (*(AofShardedLogTransactionHeader*)ptr).participantCount;
+ startSeqNum = (*(AofShardedHeader*)ptr).sequenceNumber;
+ break;
+ default:
+ // BasicHeader from SL-era AOF: all replay tasks participate
+ logAccessCount = (short)serverOptions.AofReplayTaskCount;
+ startSeqNum = logAddressSequenceNumber;
+ break;
+ }
+ }
+ aofReplayContext[virtualSublogIdx].AddTransactionGroup(header.sessionID, virtualSublogIdx, (byte)logAccessCount, startSeqNum);
break;
case AofEntryType.TxnAbort:
case AofEntryType.TxnCommit:
// We encountered a transaction end without start - this could happen because we truncated the AOF
// after a checkpoint, and the transaction belonged to the previous version. It can safely
// be ignored.
- shardedHeader = *(AofShardedHeader*)ptr;
- aofProcessor.storeWrapper.appendOnlyFile.readConsistencyManager.UpdateVirtualSublogMaxSequenceNumber(sublogIdx, shardedHeader.sequenceNumber);
+ UpdateMaxSequenceNumberFromHeader();
break;
default:
// Continue processing
return false;
}
- // Processed this record succesfully
+ // Processed this record successfully
return true;
+
+ // U
+ void UpdateMaxSequenceNumberFromHeader()
+ {
+ var headerType = (*(AofHeader*)ptr).HeaderType;
+ long sequenceNumber;
+ switch (headerType)
+ {
+ case AofHeaderType.BasicHeader:
+ case AofHeaderType.SingleLogTransactionHeader:
+ sequenceNumber = logAddressSequenceNumber;
+ break;
+ case AofHeaderType.ShardedHeader:
+ sequenceNumber = (*(AofShardedHeader*)ptr).sequenceNumber;
+ break;
+ case AofHeaderType.ShardedLogTransactionHeader:
+ sequenceNumber = (*(AofShardedLogTransactionHeader*)ptr).shardedHeader.sequenceNumber;
+ break;
+ default:
+ throw new GarnetException($"Unexpected header type {headerType}");
+ }
+ aofProcessor.storeWrapper.appendOnlyFile.readConsistencyManager.UpdateVirtualSublogMaxSequenceNumber(virtualSublogIdx, sequenceNumber);
+ }
}
///
@@ -238,12 +281,13 @@ internal void ProcessFuzzyRegionOperations(int sublogIdx, long storeVersion, boo
///
///
///
- internal void ProcessFuzzyRegionTransactionGroup(int sublogIdx, byte* ptr, bool asReplica)
+ /// Log address of the commit entry
+ internal void ProcessFuzzyRegionTransactionGroup(int sublogIdx, byte* ptr, bool asReplica, long entryAddress = 0)
{
Debug.Assert(aofReplayContext[sublogIdx].txnGroupBuffer != null);
// Process transaction groups in FIFO order
var txnGroup = aofReplayContext[sublogIdx].txnGroupBuffer.Dequeue();
- ProcessTransactionGroup(sublogIdx, ptr, asReplica, txnGroup);
+ ProcessTransactionGroup(sublogIdx, ptr, asReplica, txnGroup, entryAddress);
}
///
@@ -253,7 +297,8 @@ internal void ProcessFuzzyRegionTransactionGroup(int sublogIdx, byte* ptr, bool
///
///
///
- internal void ProcessTransactionGroup(int sublogIdx, byte* ptr, bool asReplica, TransactionGroup txnGroup)
+ /// Log address of the commit entry
+ internal void ProcessTransactionGroup(int sublogIdx, byte* ptr, bool asReplica, TransactionGroup txnGroup, long entryAddress = 0)
{
var replayContext = GetReplayContext(sublogIdx);
if (!asReplica)
@@ -265,7 +310,9 @@ internal void ProcessTransactionGroup(int sublogIdx, byte* ptr, bool asReplica,
replayContext.StringBasicContext,
replayContext.ObjectBasicContext,
replayContext.UnifiedBasicContext,
- txnGroup, asReplica);
+ txnGroup,
+ asReplica,
+ entryAddress);
}
else
{
@@ -274,29 +321,74 @@ internal void ProcessTransactionGroup(int sublogIdx, byte* ptr, bool asReplica,
// Start by saving transaction keys for locking
SaveTransactionGroupKeysToLock(txnManager, txnGroup);
- // Start transaction
- _ = txnManager.Run(internal_txn: true);
-
- // Process in parallel transaction group
- ProcessTransactionGroupOperations(
- aofProcessor,
- txnManager.StringTransactionalContext,
- txnManager.ObjectTransactionalContext,
- txnManager.UnifiedTransactionalContext,
- txnGroup,
- asReplica);
-
- // Wait for all participating subtasks to complete replay unless singleLog
if (serverOptions.MultiLogEnabled)
{
- var shardedHeader = *(AofShardedHeader*)ptr;
- // Synchronize replay of txn
+ var headerType = (AofHeaderType)(*(AofHeader*)ptr).HeaderType;
+ long commitSeqNum;
+ short partCount;
+ var sessionId = (*(AofHeader*)ptr).sessionID;
+
+ if (headerType == AofHeaderType.SingleLogTransactionHeader)
+ {
+ commitSeqNum = entryAddress;
+ partCount = (*(AofSingleLogTransactionHeader*)ptr).participantCount;
+ }
+ else if (headerType == AofHeaderType.ShardedLogTransactionHeader)
+ {
+ var shardedHeader = *(AofShardedHeader*)ptr;
+ commitSeqNum = shardedHeader.sequenceNumber;
+ partCount = (*(AofShardedLogTransactionHeader*)ptr).participantCount;
+ }
+ else
+ {
+ // BasicHeader from SL-era AOF: all replay tasks participate
+ commitSeqNum = entryAddress;
+ partCount = (short)serverOptions.AofReplayTaskCount;
+ }
+
+ // Acquire-barrier: synchronize all participants before locking using TxnStart sequence number
+ ProcessSynchronizedOperation(
+ sublogIdx,
+ txnGroup.StartSequenceNumber,
+ partCount,
+ sessionId,
+ null);
+
+ // Start transaction (acquires locks)
+ _ = txnManager.Run(internal_txn: true);
+
+ // Process transaction group operations
+ ProcessTransactionGroupOperations(
+ aofProcessor,
+ txnManager.StringTransactionalContext,
+ txnManager.ObjectTransactionalContext,
+ txnManager.UnifiedTransactionalContext,
+ txnGroup,
+ asReplica,
+ entryAddress);
+
+ // Release-barrier: synchronize all participants before committing using TxnCommit sequence number
ProcessSynchronizedOperation(
sublogIdx,
- ptr,
- shardedHeader.basicHeader.sessionID,
+ commitSeqNum,
+ partCount,
+ sessionId,
null);
}
+ else
+ {
+ // Single-log: no synchronization needed
+ _ = txnManager.Run(internal_txn: true);
+
+ ProcessTransactionGroupOperations(
+ aofProcessor,
+ txnManager.StringTransactionalContext,
+ txnManager.ObjectTransactionalContext,
+ txnManager.UnifiedTransactionalContext,
+ txnGroup,
+ asReplica,
+ entryAddress);
+ }
// Commit (NOTE: need to ensure that we do not write to log here)
txnManager.Commit(true);
@@ -319,7 +411,7 @@ static void SaveTransactionGroupKeysToLock(TransactionManager txnManager, Transa
// Process transaction
static void ProcessTransactionGroupOperations(AofProcessor aofProcessor,
TStringContext stringContext, TObjectContext objectContext, TUnifiedContext unifiedContext,
- TransactionGroup txnGroup, bool asReplica)
+ TransactionGroup txnGroup, bool asReplica, long entryAddress = 0)
where TStringContext : ITsavoriteContext
where TObjectContext : ITsavoriteContext
where TUnifiedContext : ITsavoriteContext
@@ -339,7 +431,8 @@ static void ProcessTransactionGroupOperations
///
///
- internal void ReplayStoredProc(int sublogIdx, byte id, byte* ptr)
+ /// Log address of the entry, used for single-physical-log mode
+ internal void ReplayStoredProc(int sublogIdx, byte id, byte* ptr, long entryAddress = 0)
{
if (!serverOptions.MultiLogEnabled)
{
@@ -359,26 +453,47 @@ internal void ReplayStoredProc(int sublogIdx, byte id, byte* ptr)
}
else
{
- var shardedHeader = *(AofShardedHeader*)ptr;
- // Initialize custom proc collection to keep track of hashes for keys for which their timestamp needs to be updated
- CustomProcedureKeyHashCollection customProcKeyHashTracker = new(aofProcessor.storeWrapper.appendOnlyFile);
+ var headerType = (AofHeaderType)(*(AofHeader*)ptr).HeaderType;
+ long sequenceNumber;
+ short participantCount;
+ int sessionId = (*(AofHeader*)ptr).sessionID;
+
+ if (headerType == AofHeaderType.SingleLogTransactionHeader)
+ {
+ var singleLogHeader = *(AofSingleLogTransactionHeader*)ptr;
+ sequenceNumber = entryAddress;
+ participantCount = singleLogHeader.participantCount;
+ }
+ else if (headerType == AofHeaderType.ShardedLogTransactionHeader)
+ {
+ var shardedHeader = *(AofShardedHeader*)ptr;
+ sequenceNumber = shardedHeader.sequenceNumber;
+ participantCount = (*(AofShardedLogTransactionHeader*)ptr).participantCount;
+ }
+ else
+ {
+ // BasicHeader from SL-era AOF: all replay tasks participate
+ sequenceNumber = entryAddress;
+ participantCount = (short)serverOptions.AofReplayTaskCount;
+ }
// Synchronized processing of stored proc operation
ProcessSynchronizedOperation(
sublogIdx,
- ptr,
- shardedHeader.basicHeader.sessionID,
- () => { StoredProcRunnerWrapper(sublogIdx, id, ptr); return Task.CompletedTask; }
+ sequenceNumber,
+ participantCount,
+ sessionId,
+ () => { StoredProcRunnerWrapper(sublogIdx, id, ptr, sequenceNumber); return Task.CompletedTask; }
);
// Wrapper for store proc runner used for multi-log synchronization
- void StoredProcRunnerWrapper(int sublogIdx, byte id, byte* ptr)
+ void StoredProcRunnerWrapper(int sublogIdx, byte id, byte* ptr, long seqNum)
{
// Initialize custom proc collection to keep track of hashes for keys for which their timestamp needs to be updated
CustomProcedureKeyHashCollection customProcKeyHashTracker = new(aofProcessor.storeWrapper.appendOnlyFile);
// Update timestamps for associated keys
- customProcKeyHashTracker?.UpdateSequenceNumber(shardedHeader.sequenceNumber);
+ customProcKeyHashTracker?.UpdateSequenceNumber(seqNum);
// Replay StoredProc
StoredProcRunnerBase(sublogIdx, id, ptr, shardedLog: true, customProcKeyHashTracker);
@@ -404,18 +519,16 @@ void StoredProcRunnerBase(int sublogIdx, byte id, byte* entryPtr, bool shardedLo
/// Unified method to process operations that require synchronization across sublogs
///
/// SublogIdx
- /// Pointer to the AOF entry
+ /// Sequence number or entry address for ordering
+ /// Number of participating replay tasks
/// Unique barrier ID for this operation type
/// The operation to execute
- internal void ProcessSynchronizedOperation(int sublogIdx, byte* ptr, int barrierId, Func operation)
+ internal void ProcessSynchronizedOperation(int sublogIdx, long sequenceNumber, short participantCount, int barrierId, Func operation)
{
Debug.Assert(serverOptions.MultiLogEnabled);
- // Extract extended header info and validate header
- var txnHeader = *(AofTransactionHeader*)ptr;
-
// Synchronize execution across sublogs
- var leaderBarrier = GetBarrier(barrierId, txnHeader);
+ var leaderBarrier = GetBarrier(barrierId, sequenceNumber, participantCount);
var isLeader = leaderBarrier.TrySignalOrWait(out var signalException, serverOptions.ReplicaSyncTimeout);
Exception removeBarrierException = null;
@@ -447,7 +560,7 @@ internal void ProcessSynchronizedOperation(int sublogIdx, byte* ptr, int barrier
// The leader will always perform a cleanup
if (isLeader)
{
- if (!TryRemoveBarrier(barrierId, txnHeader, out _))
+ if (!TryRemoveBarrier(barrierId, sequenceNumber))
removeBarrierException = new GarnetException($"RemoveBarrier failed when processing {barrierId}");
// Release participants if any
@@ -464,22 +577,28 @@ internal void ProcessSynchronizedOperation(int sublogIdx, byte* ptr, int barrier
if (removeBarrierException != null)
throw removeBarrierException;
- // Update timestamp
- aofProcessor.storeWrapper.appendOnlyFile.readConsistencyManager.UpdateVirtualSublogMaxSequenceNumber(sublogIdx, txnHeader.shardedHeader.sequenceNumber);
+ // Transaction replay consistency invariant:
+ // Updating the sequence number before the operation executes preserves prefix consistency —
+ // it signals that replay has reached this log position, matching the standalone operation model.
+ // Atomicity is currently preserved through coordinated locking (acquire-barrier before writes,
+ // release-barrier after commit), preventing readers from observing partial transaction state.
+ // Alternatively, atomicity could be preserved without locking by relying on the read protocol
+ // to re-read keys as they are updated; in that model, write-set replay must NOT advance the
+ // sequence number — only the commit marker should update it.
+ aofProcessor.storeWrapper.appendOnlyFile.readConsistencyManager.UpdateVirtualSublogMaxSequenceNumber(sublogIdx, sequenceNumber);
// Get barrier helper
- LeaderBarrier GetBarrier(int sessionId, AofTransactionHeader txnHeader)
+ LeaderBarrier GetBarrier(int sessionId, long seqNum, short partCount)
{
- // Use session ID and txn ID as the barrier key to prevent conflicts between transactions from the same session that access disjoint logs
- var barrierID = new BarrierKey() { SessionId = sessionId, txnId = txnHeader.shardedHeader.sequenceNumber };
- return leaderBarriers.GetOrAdd(barrierID, _ => new LeaderBarrier(txnHeader.participantCount));
+ var barrierID = new BarrierKey() { SessionId = sessionId, txnId = seqNum };
+ return leaderBarriers.GetOrAdd(barrierID, _ => new LeaderBarrier(partCount));
}
// Remove barrier helper
- bool TryRemoveBarrier(int sessionId, AofTransactionHeader txnHeader, out LeaderBarrier eventBarrier)
+ bool TryRemoveBarrier(int sessionId, long seqNum)
{
- var barrierID = new BarrierKey() { SessionId = sessionId, txnId = txnHeader.shardedHeader.sequenceNumber };
- return leaderBarriers.TryRemove(barrierID, out eventBarrier);
+ var barrierID = new BarrierKey() { SessionId = sessionId, txnId = seqNum };
+ return leaderBarriers.TryRemove(barrierID, out _);
}
}
}
diff --git a/libs/server/AOF/ReplayCoordinator/TransactionGroup.cs b/libs/server/AOF/ReplayCoordinator/TransactionGroup.cs
index 71ff65634ac..69a89a719bc 100644
--- a/libs/server/AOF/ReplayCoordinator/TransactionGroup.cs
+++ b/libs/server/AOF/ReplayCoordinator/TransactionGroup.cs
@@ -10,7 +10,8 @@ namespace Garnet.server
///
///
///
- public class TransactionGroup(int sublogIdx, byte logAccessMap)
+ /// Sequence number or entry address of the TxnStart entry
+ public class TransactionGroup(int sublogIdx, byte logAccessMap, long startSequenceNumber = 0)
{
///
/// Virtual sublog index associated with this transaction group.
@@ -22,6 +23,12 @@ public class TransactionGroup(int sublogIdx, byte logAccessMap)
///
public readonly byte LogAccessCount = logAccessMap;
+ ///
+ /// Sequence number or entry address of the TxnStart entry, used to key the acquire-barrier
+ /// so it is distinct from the release-barrier keyed on the TxnCommit entry.
+ ///
+ public readonly long StartSequenceNumber = startSequenceNumber;
+
///
/// Operations associated with this transaction group.
///
diff --git a/libs/server/Databases/MultiDatabaseManager.cs b/libs/server/Databases/MultiDatabaseManager.cs
index cc3ebfc4a64..94a9620c77e 100644
--- a/libs/server/Databases/MultiDatabaseManager.cs
+++ b/libs/server/Databases/MultiDatabaseManager.cs
@@ -454,7 +454,7 @@ public override AofAddress ReplayAOF(AofAddress untilAddress)
// When replaying AOF we do not want to write record again to AOF.
// So initialize local AofProcessor with recordToAof: false.
- var aofProcessor = new AofProcessor(StoreWrapper, recordToAof: false, logger: Logger);
+ var aofProcessor = new AofProcessor(StoreWrapper, clusterProvider: StoreWrapper.clusterProvider, recordToAof: false, logger: Logger);
var replicationOffset = AofAddress.Create(StoreWrapper.serverOptions.AofPhysicalSublogCount, 0);
try
diff --git a/libs/server/Databases/SingleDatabaseManager.cs b/libs/server/Databases/SingleDatabaseManager.cs
index bef2926fb4a..9112b88acd0 100644
--- a/libs/server/Databases/SingleDatabaseManager.cs
+++ b/libs/server/Databases/SingleDatabaseManager.cs
@@ -248,7 +248,7 @@ public override AofAddress ReplayAOF(AofAddress untilAddress)
// When replaying AOF we do not want to write record again to AOF.
// So initialize local AofProcessor with recordToAof: false.
- var aofProcessor = new AofProcessor(StoreWrapper, recordToAof: false, logger: Logger);
+ var aofProcessor = new AofProcessor(StoreWrapper, clusterProvider: StoreWrapper.clusterProvider, recordToAof: false, logger: Logger);
try
{
diff --git a/libs/server/Transaction/TransactionManager.cs b/libs/server/Transaction/TransactionManager.cs
index 7d9097fb456..692ea3fd641 100644
--- a/libs/server/Transaction/TransactionManager.cs
+++ b/libs/server/Transaction/TransactionManager.cs
@@ -518,7 +518,7 @@ public void ComputeCustomProcShardedLogAccess(PinnedSpanByte key, CustomTransact
if (proc.customProcKeyHashCollection == null)
{
// Used with parallel replay, this BitVector will track which replay tasks should participate in the parallel replay of this custom proc.
- proc.replayTaskAccessVector ??= [.. Enumerable.Range(0, appendOnlyFile.Log.Size).Select(_ => new BitVector(AofTransactionHeader.ReplayTaskAccessVectorBytes))];
+ proc.replayTaskAccessVector ??= [.. Enumerable.Range(0, appendOnlyFile.Log.Size).Select(_ => new BitVector(AofShardedLogTransactionHeader.ReplayTaskAccessVectorBytes))];
var physicalSublogIdx = appendOnlyFile.Log.GetPhysicalSublogIdx(keyHash);
var replayIdx = appendOnlyFile.Log.GetReplayTaskIdx(keyHash);
@@ -552,7 +552,7 @@ void ComputeSublogAccessVector(out ulong physicalSublogAccessVector, out BitVect
return;
// Initialize only for multi-log
- virtualSublogAccessVector = [.. Enumerable.Range(0, appendOnlyFile.Log.Size).Select(_ => new BitVector(AofTransactionHeader.ReplayTaskAccessVectorBytes))];
+ virtualSublogAccessVector = [.. Enumerable.Range(0, appendOnlyFile.Log.Size).Select(_ => new BitVector(AofShardedLogTransactionHeader.ReplayTaskAccessVectorBytes))];
// If sharded log is enabled calculate sublog access bitmap
for (var i = 0; i < txnKeysParseState.Count; i++)
diff --git a/test/cluster/Garnet.test.cluster.migrate/ClusterMigrateTests.cs b/test/cluster/Garnet.test.cluster.migrate/ClusterMigrateTests.cs
index f5f903484b9..38f28091663 100644
--- a/test/cluster/Garnet.test.cluster.migrate/ClusterMigrateTests.cs
+++ b/test/cluster/Garnet.test.cluster.migrate/ClusterMigrateTests.cs
@@ -2270,21 +2270,22 @@ public void ClusterMigrateSetSlotRangeResilience()
{
context.logger?.LogDebug("0. ClusterMigrateSetSlotRangeResilience started");
var shards = 2;
+ var sourceNodeIndex = 0;
+ var targetNodeIndex = 1;
context.CreateInstances(shards, useTLS: UseTLS);
context.CreateConnection(useTLS: UseTLS);
// Setup: node 0 owns all slots, node 1 owns none
_ = context.clusterTestUtils.AddDelSlotsRange(0, [(0, 16383)], addslot: true, logger: context.logger);
- context.clusterTestUtils.SetConfigEpoch(0, 1, logger: context.logger);
- context.clusterTestUtils.SetConfigEpoch(1, 2, logger: context.logger);
- context.clusterTestUtils.Meet(0, 1, logger: context.logger);
- context.clusterTestUtils.WaitUntilNodeIsKnown(1, 0, logger: context.logger);
+ context.clusterTestUtils.SetConfigEpoch(sourceNodeIndex, 1, logger: context.logger);
+ context.clusterTestUtils.SetConfigEpoch(targetNodeIndex, 2, logger: context.logger);
+ context.clusterTestUtils.Meet(sourceNodeIndex, targetNodeIndex, logger: context.logger);
+ context.clusterTestUtils.WaitUntilNodeIsKnown(targetNodeIndex, sourceNodeIndex, logger: context.logger);
+ context.clusterTestUtils.WaitUntilNodeIsKnown(sourceNodeIndex, targetNodeIndex, logger: context.logger);
// Create data in a single slot using the standard helper
var keyCount = 50;
var slot = CreateSingleSlotData(keyLen: 16, valueLen: 16, keyTagEnd: 6, keyCount, out var data);
- var sourceNodeIndex = 0;
- var targetNodeIndex = 1;
context.logger?.LogDebug("1. Verifying data insertion into slot {slot}", slot);
var actualKeyCount = context.clusterTestUtils.CountKeysInSlot(sourceNodeIndex, slot, logger: context.logger);
diff --git a/test/cluster/Garnet.test.cluster.multilog.diskless/Garnet.test.cluster.multilog.diskless.csproj b/test/cluster/Garnet.test.cluster.multilog.diskless/Garnet.test.cluster.multilog.diskless.csproj
new file mode 100644
index 00000000000..1dd9a0f4fa4
--- /dev/null
+++ b/test/cluster/Garnet.test.cluster.multilog.diskless/Garnet.test.cluster.multilog.diskless.csproj
@@ -0,0 +1,48 @@
+
+
+
+ true
+ ../../../Garnet.snk
+ false
+
+
+
+ 1701;1702;1591
+
+
+
+
+ PreserveNewest
+
+
+
+
+
+
+
+
+
+
+
+ all
+ runtime; build; native; contentfiles; analyzers; buildtransitive
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ false
+
+
diff --git a/test/cluster/Garnet.test.cluster.multilog/MultiLogTests/ClusterReplicationDisklessSyncShardedLog.cs b/test/cluster/Garnet.test.cluster.multilog.diskless/MultiLogTests/ClusterReplicationDisklessSyncShardedLog.cs
similarity index 100%
rename from test/cluster/Garnet.test.cluster.multilog/MultiLogTests/ClusterReplicationDisklessSyncShardedLog.cs
rename to test/cluster/Garnet.test.cluster.multilog.diskless/MultiLogTests/ClusterReplicationDisklessSyncShardedLog.cs
diff --git a/test/cluster/Garnet.test.cluster.multilog/Garnet.test.cluster.multilog.csproj b/test/cluster/Garnet.test.cluster.multilog/Garnet.test.cluster.multilog.csproj
index 354e9d60bda..e29bd3169f8 100644
--- a/test/cluster/Garnet.test.cluster.multilog/Garnet.test.cluster.multilog.csproj
+++ b/test/cluster/Garnet.test.cluster.multilog/Garnet.test.cluster.multilog.csproj
@@ -39,7 +39,7 @@
-
+
diff --git a/test/cluster/Garnet.test.cluster.multilog/MultiLogTests/ClusterReplicationShardedLog.cs b/test/cluster/Garnet.test.cluster.multilog/MultiLogTests/ClusterReplicationShardedLog.cs
index 6d5f6ef2317..e5728fb54bd 100644
--- a/test/cluster/Garnet.test.cluster.multilog/MultiLogTests/ClusterReplicationShardedLog.cs
+++ b/test/cluster/Garnet.test.cluster.multilog/MultiLogTests/ClusterReplicationShardedLog.cs
@@ -8,6 +8,7 @@
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
using NUnit.Framework;
using NUnit.Framework.Legacy;
using StackExchange.Redis;
@@ -27,11 +28,9 @@ public class ClusterReplicationShardedLog : ClusterReplicationBaseTests
{"ClusterSRNoCheckpointRestartSecondary", true},
{"ClusterSRPrimaryCheckpoint", true},
{"ClusterCheckpointRetrieveDisableStorageTier", true},
- {"ClusterCheckpointRetrieveDelta", true},
- {"ClusterSRPrimaryCheckpointRetrieve", true},
{"ClusterSRAddReplicaAfterPrimaryCheckpoint", true},
{"ClusterSRPrimaryRestart", true},
- {"ClusterSRRedirectWrites", true},
+ {"ClusterSRRedirectWrites", false}, // Does not test AOF
{"ClusterSRReplicaOfTest", true},
{"ClusterReplicationSimpleFailover", true},
{"ClusterFailoverAttachReplicas", true},
@@ -44,14 +43,12 @@ public class ClusterReplicationShardedLog : ClusterReplicationBaseTests
{"ClusterDivergentCheckpointMMFastCommitTest", true},
{"ClusterReplicationCheckpointAlignmentTest", true},
{"ClusterReplicationLua", true},
- {"ClusterReplicationStoredProc", true},
+ {"ClusterReplicationStoredProc", false}, // Duplicate test in this class
{"ClusterReplicationManualCheckpointing", true},
{"ReplicaSyncTaskFaultsRecoverAsync", true},
- {"ClusterReplicationMultiRestartRecover", true},
- {"ReplicasRestartAsReplicasAsync", true},
- {"PrimaryUnavailableRecoveryAsync", true},
+ {"ClusterReplicationMultiRestartRecover", false},
{"ClusterReplicationDivergentHistoryWithoutCheckpoint", true},
- {"ClusterReplicationSimpleTransactionTest", true}
+ {"ClusterReplicationSimpleTransactionTest", false} // Duplicate test in this class
};
[OneTimeSetUp]
@@ -60,6 +57,8 @@ public void OneTimeSetUp()
var methods = typeof(ClusterReplicationShardedLog).GetMethods().Where(static mtd => mtd.GetCustomAttribute() != null);
foreach (var method in methods)
enabledTests.TryAdd(method.Name, true);
+
+ monitorTests.Add("ClusterReplicationShardedLogTxnTest", LogLevel.Warning);
}
[SetUp]
@@ -72,6 +71,7 @@ public override void Setup()
}
asyncReplay = false;
sublogCount = TestSublogCount;
+
base.Setup();
}
@@ -131,6 +131,7 @@ public void ClusterReplicationShardedLogTxnTest([Values] bool storedProcedure, [
// Attach replica
resp = context.clusterTestUtils.ClusterReplicate(replicaNodeIndex, primaryNodeIndex, logger: context.logger);
ClassicAssert.AreEqual("OK", resp);
+ context.clusterTestUtils.WaitForReplicaRecovery(replicaNodeIndex, logger: context.logger);
string[] keys = ["{_}a", "{_}b", "{_}c", "{_}x", "{_}y", "{_}z"];
string[] values = ["10", "15", "20", "25", "30", "35"];
@@ -257,6 +258,7 @@ public void ClusterReplicationSimpleMultiReplay()
// Attach replica
var resp = context.clusterTestUtils.ClusterReplicate(replicaNodeIndex, primaryNodeIndex, logger: context.logger);
ClassicAssert.AreEqual("OK", resp);
+ context.clusterTestUtils.WaitForReplicaRecovery(replicaNodeIndex, logger: context.logger);
var keyLength = 16;
var kvpairCount = 2;
@@ -350,6 +352,7 @@ public void ClusterParallelReplicationUpgrade([Values] bool upgradeFromSingleLog
// Add replica
ClassicAssert.AreEqual("OK", context.clusterTestUtils.ClusterReplicate(replicaIndex, targetIndex, logger: context.logger));
+ context.clusterTestUtils.WaitForReplicaRecovery(replicaIndex, logger: context.logger);
var keyLength = 16;
var kvpairCount = keyCount;
@@ -370,5 +373,94 @@ public void ClusterParallelReplicationUpgrade([Values] bool upgradeFromSingleLog
context.SimpleValidateDB(disableObjects, targetIndex);
context.SimpleValidateDB(disableObjects, replicaIndex);
}
+
+ [Test, Order(5)]
+ [Category("REPLICATION")]
+ public async Task ClusterAofUpgradeSLtoSLMRRecoverAsync([Values] bool useStoredProcedure)
+ {
+ var primary_count = 1;
+ var primaryNodeIndex = 0;
+
+ // Phase 1: Start in single-log single-replay mode (SL) and write data
+ context.CreateInstances(
+ primary_count,
+ disableObjects: false,
+ enableAOF: true,
+ useTLS: useTLS,
+ asyncReplay: asyncReplay,
+ sublogCount: 1,
+ replayTaskCount: 1);
+ context.CreateConnection(useTLS: useTLS);
+
+ _ = context.clusterTestUtils.AddDelSlotsRange(primaryNodeIndex, [(0, 16383)], addslot: true, logger: context.logger);
+ context.clusterTestUtils.SetConfigEpoch(primaryNodeIndex, primaryNodeIndex + 1, logger: context.logger);
+
+ var primaryServer = context.clusterTestUtils.GetServer(primaryNodeIndex);
+
+ // Register stored proc if needed
+ if (useStoredProcedure)
+ {
+ _ = context.nodes[primaryNodeIndex].Register.NewTransactionProc(BulkIncrementBy.Name, () => new BulkIncrementBy(), BulkIncrementBy.CommandInfo);
+ _ = context.nodes[primaryNodeIndex].Register.NewTransactionProc(BulkRead.Name, () => new BulkRead(), BulkRead.CommandInfo);
+ }
+
+ // Write standalone ops
+ string[] keys = ["{_}a", "{_}b", "{_}c", "{_}x", "{_}y", "{_}z"];
+ string[] values = ["10", "15", "20", "25", "30", "35"];
+
+ if (useStoredProcedure)
+ {
+ ClusterTestContext.ExecuteStoredProcBulkIncrement(primaryServer, keys, values);
+ }
+ else
+ {
+ context.ExecuteTxnBulkIncrement(keys, values);
+ }
+
+ // Verify at primary before upgrade
+ for (var i = 0; i < keys.Length; i++)
+ {
+ var resp = context.clusterTestUtils.GetKey(primaryNodeIndex, Encoding.ASCII.GetBytes(keys[i]), out _, out _, out _);
+ ClassicAssert.AreEqual(values[i], resp, $"At primary before upgrade, key {keys[i]}");
+ }
+
+ // Commit AOF (no checkpoint) and stop
+ await context.nodes[primaryNodeIndex].Store.CommitAOFAsync(default).ConfigureAwait(false);
+ context.nodes[primaryNodeIndex].Dispose(false);
+
+ // Phase 2: Restart with multi-replay config (SLMR) — AOF replay uses multiple tasks
+ context.nodes[primaryNodeIndex] = context.CreateInstance(
+ context.clusterTestUtils.GetEndPoint(primaryNodeIndex),
+ tryRecover: true,
+ enableAOF: true,
+ useTLS: useTLS,
+ cleanClusterConfig: false,
+ asyncReplay: asyncReplay,
+ sublogCount: 1,
+ replayTaskCount: TestReplayTaskCount);
+
+ if (useStoredProcedure)
+ {
+ _ = context.nodes[primaryNodeIndex].Register.NewTransactionProc(BulkIncrementBy.Name, () => new BulkIncrementBy(), BulkIncrementBy.CommandInfo);
+ _ = context.nodes[primaryNodeIndex].Register.NewTransactionProc(BulkRead.Name, () => new BulkRead(), BulkRead.CommandInfo);
+ }
+
+ context.nodes[primaryNodeIndex].Start();
+ context.CreateConnection(useTLS: useTLS);
+
+ // Verify SL-era data survived the multi-replay recovery
+ for (var i = 0; i < keys.Length; i++)
+ {
+ var resp = context.clusterTestUtils.GetKey(primaryNodeIndex, Encoding.ASCII.GetBytes(keys[i]), out _, out _, out _);
+ ClassicAssert.AreEqual(values[i], resp, $"At primary after SLMR upgrade, key {keys[i]}");
+ }
+
+ if (useStoredProcedure)
+ {
+ primaryServer = context.clusterTestUtils.GetServer(primaryNodeIndex);
+ var result = ClusterTestContext.ExecuteBulkReadStoredProc(primaryServer, keys);
+ ClassicAssert.AreEqual(values, result);
+ }
+ }
}
}
\ No newline at end of file
diff --git a/test/cluster/Garnet.test.cluster.replication/ReplicationTests/ClusterReplicationBaseTests.cs b/test/cluster/Garnet.test.cluster.replication/ReplicationTests/ClusterReplicationBaseTests.cs
index 647d6667dd7..60fda798b0e 100644
--- a/test/cluster/Garnet.test.cluster.replication/ReplicationTests/ClusterReplicationBaseTests.cs
+++ b/test/cluster/Garnet.test.cluster.replication/ReplicationTests/ClusterReplicationBaseTests.cs
@@ -36,14 +36,6 @@ public class ClusterReplicationBaseTests : TestBase
new(() => AsyncUtils.BlockingWait(ClusterSRPrimaryCheckpointAsync(false, true)), "ClusterSRPrimaryCheckpointAsync(false, true)"),
new(() => AsyncUtils.BlockingWait(ClusterSRPrimaryCheckpointAsync(true, false)), "ClusterSRPrimaryCheckpointAsync(true, false)"),
new(() => AsyncUtils.BlockingWait(ClusterSRPrimaryCheckpointAsync(true, true)), "ClusterSRPrimaryCheckpointAsync(true, true)"),
- new(() => ClusterSRPrimaryCheckpointRetrieve(false, false, false), "ClusterSRPrimaryCheckpointRetrieve(false, false, false)"),
- new(() => ClusterSRPrimaryCheckpointRetrieve(false, false, true), "ClusterSRPrimaryCheckpointRetrieve(false, false, true)"),
- new(() => ClusterSRPrimaryCheckpointRetrieve(false, true, false), "ClusterSRPrimaryCheckpointRetrieve(false, true, false)"),
- new(() => ClusterSRPrimaryCheckpointRetrieve(false, true, true), "ClusterSRPrimaryCheckpointRetrieve(false, true, true)"),
- new(() => ClusterSRPrimaryCheckpointRetrieve(true, false, false), "ClusterSRPrimaryCheckpointRetrieve(true, false, false)"),
- new(() => ClusterSRPrimaryCheckpointRetrieve(true, false, true), "ClusterSRPrimaryCheckpointRetrieve(true, false, true)"),
- new(() => ClusterSRPrimaryCheckpointRetrieve(true, true, false), "ClusterSRPrimaryCheckpointRetrieve(true, true, false)"),
- new(() => ClusterSRPrimaryCheckpointRetrieve(true, true, true), "ClusterSRPrimaryCheckpointRetrieve(true, true, true)"),
new(() => ClusterSRAddReplicaAfterPrimaryCheckpoint(false, false, false), "ClusterSRAddReplicaAfterPrimaryCheckpoint(false, false, false)"),
new(() => ClusterSRAddReplicaAfterPrimaryCheckpoint(false, false, true), "ClusterSRAddReplicaAfterPrimaryCheckpoint(false, false, true)"),
new(() => ClusterSRAddReplicaAfterPrimaryCheckpoint(false, true, false), "ClusterSRAddReplicaAfterPrimaryCheckpoint(false, true, false)"),
@@ -142,8 +134,9 @@ public void ClusterSRTest([Values] bool disableObjects)
[Category("REPLICATION")]
public void ClusterSRNoCheckpointRestartSecondary([Values] bool performRMW, [Values] bool disableObjects)
{
- if (useTLS)
- context.EnableGarnetLoggingEvents([GarnetTestLoggingEventType.LogPrimaryStreamType, GarnetTestLoggingEventType.LogRunAofSyncTask]);
+ // Disable excessive logging; Leave for future debugging
+ //if (useTLS)
+ // context.EnableGarnetLoggingEvents([GarnetTestLoggingEventType.LogPrimaryStreamType, GarnetTestLoggingEventType.LogRunAofSyncTask]);
var replica_count = 1;// Per primary
var primary_count = 1;
@@ -211,8 +204,9 @@ public void ClusterSRNoCheckpointRestartSecondary([Values] bool performRMW, [Val
[Category("REPLICATION")]
public async Task ClusterSRPrimaryCheckpointAsync([Values] bool performRMW, [Values] bool disableObjects)
{
- if (useTLS)
- context.EnableGarnetLoggingEvents([GarnetTestLoggingEventType.LogPrimaryStreamType, GarnetTestLoggingEventType.LogRunAofSyncTask]);
+ // Disable excessive logging; Leave for future debugging
+ //if (useTLS)
+ // context.EnableGarnetLoggingEvents([GarnetTestLoggingEventType.LogPrimaryStreamType, GarnetTestLoggingEventType.LogRunAofSyncTask]);
var replica_count = 1;// Per primary
var primary_count = 1;
diff --git a/test/cluster/Garnet.test.cluster/ClusterTestContext.cs b/test/cluster/Garnet.test.cluster/ClusterTestContext.cs
index 098baf3bc11..2702f29ff59 100644
--- a/test/cluster/Garnet.test.cluster/ClusterTestContext.cs
+++ b/test/cluster/Garnet.test.cluster/ClusterTestContext.cs
@@ -450,6 +450,7 @@ public GarnetServer CreateInstance(
bool useAcl = false,
bool asyncReplay = false,
int sublogCount = 1,
+ int replayTaskCount = 1,
int vectorSetReplayTaskCount = 0,
EndPoint clusterAnnounceEndpoint = null,
X509CertificateCollection certificates = null,
@@ -484,6 +485,7 @@ public GarnetServer CreateInstance(
useAcl: useAcl,
asyncReplay: asyncReplay,
sublogCount: sublogCount,
+ replayTaskCount: replayTaskCount,
aclFile: credManager.aclFilePath,
authUsername: clusterCreds.user,
authPassword: clusterCreds.password,
diff --git a/test/standalone/Garnet.test/TestUtils.cs b/test/standalone/Garnet.test/TestUtils.cs
index bf51b8a9656..5d5d3564e16 100644
--- a/test/standalone/Garnet.test/TestUtils.cs
+++ b/test/standalone/Garnet.test/TestUtils.cs
@@ -306,7 +306,8 @@ public static GarnetServer CreateGarnetServer(
bool enableRangeIndexPreview = false,
string aofMemorySize = "64m",
string aofPageSize = null,
- bool copyReadsToTail = false
+ bool copyReadsToTail = false,
+ int replayTaskCount = 1
)
{
if (useAzureStorage)
@@ -386,6 +387,7 @@ public static GarnetServer CreateGarnetServer(
EnableModuleCommand = enableModuleCommand,
EnableReadCache = enableReadCache,
ReplicationOffsetMaxLag = asyncReplay ? -1 : 0,
+ AofReplayTaskCount = replayTaskCount,
LuaOptions = enableLua ? new LuaOptions(luaMemoryMode, luaMemoryLimit, luaTimeout ?? Timeout.InfiniteTimeSpan, luaLoggingMode, luaAllowedFunctions ?? [], logger) : null,
UnixSocketPath = unixSocketPath,
UnixSocketPermission = unixSocketPermission,