Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
44f61ba
Fix batch read consistency: propagate batchReadContext back to replic…
vazois May 7, 2026
e37f1f3
Replace sequence numbers with log addresses for single-physical-log AOF
vazois May 8, 2026
b98c82e
fix post rebase break
vazois May 13, 2026
c23634f
Add acquire-barrier for transaction lock coordination in AofReplayCoo…
vazois May 13, 2026
04fb58c
Consolidate broadcast enqueue logic and fix EnqueueSafeFlushAOF multi…
vazois May 14, 2026
0ebf8c0
Document transaction replay consistency invariant for sequence number…
vazois May 14, 2026
68c2e84
Use explicit switch cases in GetSynchronizedOperationParams for heade…
vazois May 14, 2026
e4d1562
rename AofHeader to be consistent and add description
vazois May 14, 2026
0921001
Fix SL-to-SLMR AOF multi-replay recovery bugs
vazois May 18, 2026
3ad1e5b
Optimize: early exit from replay when sequence threshold exceeded
vazois May 18, 2026
e7d27d6
Fix cluster AOF stored proc recovery and move upgrade tests
vazois May 18, 2026
4d0877f
fix bug calculating replay task indices for non-power of two replay t…
vazois May 19, 2026
6db4ea5
cleanup and renaming
vazois May 19, 2026
2a46a5b
restructure mlog tests into base and diskless
vazois May 19, 2026
608e243
fix formatting
vazois May 19, 2026
a7759de
cleanup and renaming
vazois May 19, 2026
865f507
add monitoring for failing tests and wait for recovery
vazois May 19, 2026
68dd71e
test cleanup and rename
vazois May 20, 2026
7dc664f
fix migrate test
vazois May 20, 2026
f484a0f
handle cancelation at parallel replay
vazois May 21, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Garnet.slnx
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
<Project Path="test/cluster/Garnet.test.cluster/Garnet.test.cluster.csproj" />
<Project Path="test/cluster/Garnet.test.cluster.migrate/Garnet.test.cluster.migrate.csproj" />
<Project Path="test/cluster/Garnet.test.cluster.multilog/Garnet.test.cluster.multilog.csproj" />
<Project Path="test/cluster/Garnet.test.cluster.multilog.diskless/Garnet.test.cluster.multilog.diskless.csproj" />
<Project Path="test/cluster/Garnet.test.cluster.replication/Garnet.test.cluster.replication.csproj" />
<Project Path="test/cluster/Garnet.test.cluster.replication.asyncreplay/Garnet.test.cluster.replication.asyncreplay.csproj" />
<Project Path="test/cluster/Garnet.test.cluster.replication.disklesssync/Garnet.test.cluster.replication.disklesssync.csproj" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ private unsafe void ConsumeSchedulePage(byte* record, int recordLength, long cur
replicationManager.SetSublogReplicationOffset(physicalSublogIdx, currentAddress);

// Wait for replay to complete.
replayBatchContext.LeaderFollowerBarrier.WaitCompleted(serverOptions.ReplicaSyncTimeout, cts.Token);
if (!replayBatchContext.LeaderFollowerBarrier.WaitCompleted(serverOptions.ReplicaSyncTimeout, cts.Token))
ExceptionUtils.ThrowException(new GarnetException("Timed out waiting for parallel replay tasks to complete", LogLevel.Warning, clientResponse: false));
// Release participants for next cycle
replayBatchContext.LeaderFollowerBarrier.Release();

Expand All @@ -159,11 +160,13 @@ private unsafe void ConsumeScheduleChannel(byte* record, int recordLength, long
if (payloadLength > 0)
{
var entryPtr = ptr + entryLength;
var logAddressSequenceNumber = currentAddress + (ptr - record);
var replayTaskIdx = replicationManager.AofProcessor.GetReplayTaskIdx(entryPtr);
replayTasks[replayTaskIdx].AddRecord(new ReplayRecord()
{
entryPtr = entryPtr,
payloadLength = payloadLength
payloadLength = payloadLength,
logAddressSequenceNumber = logAddressSequenceNumber
});
entryLength += TsavoriteLog.UnsafeAlign(payloadLength);
}
Expand Down Expand Up @@ -229,7 +232,8 @@ private unsafe void ConsumeDirect(byte* record, int recordLength, long currentAd
var payloadLength = physicalSublog.UnsafeGetLength(ptr);
if (payloadLength > 0)
{
replicationManager.AofProcessor.ProcessAofRecordInternal(physicalSublogIdx, ptr + entryLength, payloadLength, true, out var isCheckpointStart);
var logAddressSequenceNumber = currentAddress + (ptr - record);
replicationManager.AofProcessor.ProcessAofRecordInternal(physicalSublogIdx, ptr + entryLength, payloadLength, true, out var isCheckpointStart, logAddressSequenceNumber);
// Encountered checkpoint start marker, log the ReplicationCheckpointStartOffset so we know the correct AOF truncation
// point when we take a checkpoint at the checkpoint end marker
if (isCheckpointStart)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ internal unsafe struct ReplayRecord
{
public byte* entryPtr;
public int payloadLength;
public long logAddressSequenceNumber;
}

internal sealed class ReplicaReplayTask(
Expand Down Expand Up @@ -62,13 +63,22 @@ internal async Task FullPageBasedBackgroundReplayAsync()
{
await replayBatchContext.LeaderFollowerBarrier.WaitReadyWorkAsync(cancellationToken: cts.Token).ConfigureAwait(false);
}
catch (TaskCanceledException) when (cts.Token.IsCancellationRequested)
{
// Suppress the exception if the task was cancelled because of store wrapper disposal
}
catch (Exception ex)
{
logger?.LogError(ex, "{method} failed at WaitAsync", nameof(FullPageBasedBackgroundReplayAsync));
await cts.CancelAsync().ConfigureAwait(false);
break;
}

// Guard: if cancellation happened during WaitReadyWorkAsync, exit cleanly
// without falling through to the processing block (which would issue a spurious SignalCompleted)
if (cts.Token.IsCancellationRequested)
break;

try
{
unsafe
Expand All @@ -80,9 +90,6 @@ internal async Task FullPageBasedBackgroundReplayAsync()
var isProtected = replayBatchContext.IsProtected;
var ptr = record;

var maxSequenceNumber = 0L;

// logger?.LogError("[{sublogIdx},{replayIdx}] = {currentAddress} -> {nextAddress}", sublogIdx, replayIdx, currentAddress, nextAddress);
while (ptr < record + recordLength)
{
cts.Token.ThrowIfCancellationRequested();
Expand All @@ -91,24 +98,23 @@ internal async Task FullPageBasedBackgroundReplayAsync()
if (payloadLength > 0)
{
var entryPtr = ptr + entryLength;
if (replicationManager.AofProcessor.CanReplay(entryPtr, replayTaskIdx, out var sequenceNumber))
var logAddressSequenceNumber = currentAddress + (ptr - record);
if (replicationManager.AofProcessor.CanReplay(entryPtr, replayTaskIdx, logAddressSequenceNumber, out _))
{
replicationManager.AofProcessor.ProcessAofRecordInternal(virtualSublogIdx, entryPtr, payloadLength, true, out var isCheckpointStart);
replicationManager.AofProcessor.ProcessAofRecordInternal(virtualSublogIdx, entryPtr, payloadLength, true, out var isCheckpointStart, logAddressSequenceNumber);
// Encountered checkpoint start marker, log the ReplicationCheckpointStartOffset so we know the correct AOF truncation
// point when we take a checkpoint at the checkpoint end marker
if (isCheckpointStart)
{
// logger?.LogError("[{sublogIdx}] CheckpointStart {address}", sublogIdx, clusterProvider.replicationManager.GetSublogReplicationOffset(sublogIdx));
replicationManager.ReplicationCheckpointStartOffset[physicalSublogIdx] = replicationManager.GetSublogReplicationOffset(physicalSublogIdx);
}
}
maxSequenceNumber = Math.Max(sequenceNumber, maxSequenceNumber);
entryLength += TsavoriteLog.UnsafeAlign(payloadLength);
}
else if (payloadLength < 0)
{
if (!clusterProvider.serverOptions.EnableFastCommit)
throw new GarnetException("Received FastCommit request at replica AOF processor, but FastCommit is not enabled", clientResponse: false);
ExceptionUtils.ThrowException(new GarnetException("Received FastCommit request at replica AOF processor, but FastCommit is not enabled", clientResponse: false));

// Only a single thread should commit metadata
if (replayTaskIdx == 0)
Expand All @@ -122,10 +128,16 @@ internal async Task FullPageBasedBackgroundReplayAsync()
ptr += entryLength;
}

// Update max sequence number for this virtual sublog which is mapped
appendOnlyFile.readConsistencyManager.UpdateVirtualSublogMaxSequenceNumber(virtualSublogIdx, maxSequenceNumber);
// Advance frontier to nextAddress (past all entries in this page).
// This ensures the read consistency protocol (which waits for frontier > sessionSeq)
// can proceed once all writes in the page are complete.
appendOnlyFile.readConsistencyManager.UpdateVirtualSublogMaxSequenceNumber(virtualSublogIdx, nextAddress);
}
}
catch (TaskCanceledException) when (cts.Token.IsCancellationRequested)
{
// Suppress the exception if the task was cancelled because of store wrapper disposal
}
catch (Exception ex)
{
logger?.LogError(ex, "{method} failed at replaying", nameof(FullPageBasedBackgroundReplayAsync));
Expand Down Expand Up @@ -156,7 +168,7 @@ internal async Task ChannelBasedBackgroundReplayAsync()
{
unsafe
{
replicationManager.AofProcessor.ProcessAofRecordInternal(virtualSublogIdx, record.entryPtr, record.payloadLength, true, out var isCheckpointStart);
replicationManager.AofProcessor.ProcessAofRecordInternal(virtualSublogIdx, record.entryPtr, record.payloadLength, true, out var isCheckpointStart, record.logAddressSequenceNumber);

// Encountered checkpoint start marker, log the ReplicationCheckpointStartOffset so we know the correct AOF truncation
// point when we take a checkpoint at the checkpoint end marker
Expand Down
2 changes: 1 addition & 1 deletion libs/common/Synchronization/LeaderFollowerBarrier.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public bool WaitCompleted(TimeSpan timeout = default, CancellationToken cancella
var waitTimeout = ProcessTimeSpan(timeout);
for (var i = 0; i < participantCount; i++)
{
if (!workCompleted.Wait(waitTimeout, cancellationToken))
if (!AsyncUtils.BlockingWait(workCompleted.WaitAsync(waitTimeout, cancellationToken)))
return false;
}

Expand Down
22 changes: 22 additions & 0 deletions libs/server/AOF/AofEntryType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,26 @@ internal enum AofStoreType : byte
CheckpointType = 0x4,
FlushDbType = 0x5,
}

internal static class AofEntryTypeExtensions
{
/// <summary>
/// Returns true if the entry type carries a key payload after the header.
/// Keyless entries (transactions, checkpoints, flush, stored procedures) have no key.
/// </summary>
internal static bool HasKey(this AofEntryType opType) => opType switch
{
AofEntryType.StoreUpsert or
AofEntryType.StoreRMW or
AofEntryType.StoreDelete or
AofEntryType.ObjectStoreUpsert or
AofEntryType.ObjectStoreRMW or
AofEntryType.ObjectStoreDelete or
AofEntryType.UnifiedStoreStringUpsert or
AofEntryType.UnifiedStoreObjectUpsert or
AofEntryType.UnifiedStoreRMW or
AofEntryType.UnifiedStoreDelete => true,
_ => false,
};
}
}
60 changes: 57 additions & 3 deletions libs/server/AOF/AofHeader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,42 @@

namespace Garnet.server
{
// AOF Header Hierarchy
//
// The header type determines the wire format of each AOF entry based on the log topology:
//
// AofHeader (16B) — Base header for all entries. Used standalone with single-log mode.
// │
// ├── AofShardedHeader (24B) = AofHeader + sequenceNumber
// │ Used for per-key entries in multi-physical-log (sharded) mode.
// │ The sequence number enables cross-sublog ordering.
// │
// ├── AofSingleLogTransactionHeader (50B) = AofHeader + participantCount + replayTaskAccessVector
// │ Used for coordinated/broadcast operations (transactions, checkpoints, flush)
// │ in single-physical-log + multi-replay mode. Uses log addresses for ordering
// │ instead of embedded sequence numbers, saving 8B per entry.
// │
// └── AofShardedLogTransactionHeader (58B) = AofShardedHeader + participantCount + replayTaskAccessVector
// Used for coordinated/broadcast operations in multi-physical-log (sharded) mode.
// Embeds a sequence number (via AofShardedHeader) for cross-sublog ordering.
//
// Selection logic:
// Single log (1 physical, 1 replay task) → BasicHeader
// Single physical log, multi-replay → BasicHeader (per-key), SingleLogTransactionHeader (broadcast)
// Multi physical log, multi-replay → ShardedHeader (per-key), ShardedLogTransactionHeader (broadcast)
internal enum AofHeaderType : byte
{
BasicHeader = 0,
ShardedHeader = 1,
TransactionHeader = 2
SingleLogTransactionHeader = 2,
ShardedLogTransactionHeader = 3,
}

/// <summary>
/// Used for coordinated operations
/// </summary>
[StructLayout(LayoutKind.Explicit, Size = TotalSize)]
unsafe struct AofTransactionHeader
unsafe struct AofShardedLogTransactionHeader
{
public const int TotalSize = AofShardedHeader.TotalSize + 2 + 32;
// maximum 256 replay tasks per physical sublog, hence 32 bytes bitmap
Expand All @@ -44,6 +68,35 @@ unsafe struct AofTransactionHeader
public fixed byte replayTaskAccessVector[ReplayTaskAccessVectorBytes];
}

/// <summary>
/// Used for single-physical-log with multi-replay to carry transaction participant info
/// without embedding a sequence number (log addresses are used instead).
/// </summary>
[StructLayout(LayoutKind.Explicit, Size = TotalSize)]
unsafe struct AofSingleLogTransactionHeader
{
public const int TotalSize = AofHeader.TotalSize + 2 + AofShardedLogTransactionHeader.ReplayTaskAccessVectorBytes;

/// <summary>
/// Basic AOF header
/// </summary>
[FieldOffset(0)]
public AofHeader basicHeader;

/// <summary>
/// Used for synchronizing virtual sublog replay
/// NOTE: This stores the total number of replay tasks that participate in a given transaction.
/// </summary>
[FieldOffset(AofHeader.TotalSize)]
public short participantCount;

/// <summary>
/// Used to track replay task participating in the txn
/// </summary>
[FieldOffset(AofHeader.TotalSize + 2)]
public fixed byte replayTaskAccessVector[AofShardedLogTransactionHeader.ReplayTaskAccessVectorBytes];
}

/// <summary>
/// Used for sharded log to add a k
/// </summary>
Expand Down Expand Up @@ -79,7 +132,8 @@ struct AofHeader
{
AofHeaderType.BasicHeader => entryPtr + TotalSize,
AofHeaderType.ShardedHeader => entryPtr + AofShardedHeader.TotalSize,
AofHeaderType.TransactionHeader => entryPtr + AofTransactionHeader.TotalSize,
AofHeaderType.ShardedLogTransactionHeader => entryPtr + AofShardedLogTransactionHeader.TotalSize,
AofHeaderType.SingleLogTransactionHeader => entryPtr + AofSingleLogTransactionHeader.TotalSize,
_ => throw new GarnetException($"Type not supported {headerType}"),
};
}
Expand Down
Loading
Loading