From 01138ea47c2a492ee2db442e39c16ec445e4cbef Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Mon, 18 May 2026 15:09:21 -0700 Subject: [PATCH 01/14] Add RangeIndex file enumeration for disk-based replication Add EnumerateFilesForReplication() to RangeIndexManager that lists all flush.bftree and checkpoint snapshot files needed for a given checkpoint token and hlog address range. Refactor flush file parsing into a shared EnumerateFlushFiles() helper used by both OnTruncateImpl and the new replication enumeration method. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../Resp/RangeIndex/RangeIndexManager.cs | 138 ++++++++++++++++-- 1 file changed, 123 insertions(+), 15 deletions(-) diff --git a/libs/server/Resp/RangeIndex/RangeIndexManager.cs b/libs/server/Resp/RangeIndex/RangeIndexManager.cs index ca25e8c6628..932301cdb08 100644 --- a/libs/server/Resp/RangeIndex/RangeIndexManager.cs +++ b/libs/server/Resp/RangeIndex/RangeIndexManager.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Concurrent; +using System.Collections.Generic; using System.Globalization; using System.IO; using System.IO.Hashing; @@ -55,9 +56,20 @@ public sealed partial class RangeIndexManager : IDisposable /// Size of the RangeIndex stub in bytes. internal const int IndexSizeBytes = RangeIndexStub.Size; + // Flush filename layout: ..flush.bftree + private const int HashPrefixLength = 32; + private const int AddrHexLength = 16; + private const string FlushSuffix = ".flush.bftree"; + private const int FlushFileNameLength = HashPrefixLength + 1 + AddrHexLength + 13; // ".flush.bftree".Length == 13 + private const int AddrStartIndex = HashPrefixLength + 1; + /// Gets the number of live (registered) BfTree indexes (activated + pending). internal int LiveIndexCount => liveIndexes.Count; + /// Gets the log-tied root directory for RI files. Used by the cluster replication layer + /// to determine the target directory for received flush files on the replica side. + internal string RiLogRoot => riLogRoot; + /// /// Tracks BfTree entries (activated + pending), keyed by = a Guid /// derived from the key bytes via XxHash128. Same hash scheme used to derive the @@ -860,22 +872,8 @@ internal void OnTruncateImpl(long newBeginAddress) try { - foreach (var path in Directory.EnumerateFiles(riLogRoot)) + foreach (var (path, _, addr) in EnumerateFlushFiles()) { - var name = Path.GetFileName(path); - - if (!name.EndsWith(".flush.bftree", StringComparison.Ordinal)) - continue; - - // Pattern: ..flush.bftree - // hash is 32 hex chars, then '.', then 16 hex chars (addr), then ".flush.bftree". - if (name.Length != 32 + 1 + 16 + ".flush.bftree".Length) - continue; - - var addrSegment = name.AsSpan(33, 16); - if (!long.TryParse(addrSegment, NumberStyles.HexNumber, CultureInfo.InvariantCulture, out var addr)) - continue; - if (addr < newBeginAddress) TryDelete(path); } @@ -891,5 +889,115 @@ void TryDelete(string p) catch (Exception ex) { logger?.LogWarning(ex, "OnTruncate: failed to delete {Path}", p); } } } + + // --------------------------------------------------------------- + // Shared flush-file enumeration helper + // --------------------------------------------------------------- + + /// + /// Enumerates all *.flush.bftree files under riLogRoot, parsing and + /// yielding the embedded logical address from each valid filename. + /// + /// Tuples of (fullPath, fileName, logicalAddress). Skips files that don't + /// match the expected naming pattern. Returns empty if riLogRoot is not configured + /// or does not exist. + private IEnumerable<(string Path, string Name, long Address)> EnumerateFlushFiles() + { + if (string.IsNullOrEmpty(riLogRoot) || !Directory.Exists(riLogRoot)) + yield break; + + foreach (var path in Directory.EnumerateFiles(riLogRoot, "*.flush.bftree")) + { + var name = Path.GetFileName(path); + + // Pattern: ..flush.bftree + if (name.Length != FlushFileNameLength) + continue; + + var addrSegment = name.AsSpan(AddrStartIndex, AddrHexLength); + if (!long.TryParse(addrSegment, NumberStyles.HexNumber, + CultureInfo.InvariantCulture, out var addr)) + continue; + + yield return (path, name, addr); + } + } + + // --------------------------------------------------------------- + // Replication: file enumeration for checkpoint transfer + // --------------------------------------------------------------- + + /// + /// Represents a single RangeIndex file that must be shipped during replication. + /// + internal readonly struct RangeIndexFileEntry + { + /// Full path to the file on the primary. + public readonly string Path; + + /// Target filename to recreate on the replica (relative to its root). + public readonly string FileName; + + /// Whether this is a per-flush file (true) or a per-checkpoint snapshot (false). + public readonly bool IsFlushFile; + + public RangeIndexFileEntry(string path, string fileName, bool isFlushFile) + { + Path = path; + FileName = fileName; + IsFlushFile = isFlushFile; + } + } + + /// + /// Enumerates all RangeIndex files that must be sent during disk-based replication + /// for a given checkpoint token and hlog address range. + /// + /// Two categories of files are collected: + /// + /// Per-flush files in riLogRoot whose embedded address falls within + /// [hlogStartAddress, hlogEndAddress). These correspond to RI stubs in the hlog + /// segments being transferred. + /// Per-checkpoint snapshot files under + /// cpr-checkpoints/<token>/rangeindex/. These correspond to RI stubs in + /// the snapshot region at checkpoint time. + /// + /// + /// Multiple flush files per key hash are possible and all within the range are + /// included — uses the exact source address + /// to locate the specific flush file. + /// + /// The checkpoint token (storeHlogToken) to locate + /// per-checkpoint snapshots. + /// The hybridLogFileStartAddress from + /// LogFileInfo — inclusive lower bound for flush file filtering. + /// The hybridLogFileEndAddress from + /// LogFileInfo — exclusive upper bound for flush file filtering. + /// An enumerable of describing files to + /// transfer. Empty if RangeIndex is not configured or no files match. + internal IEnumerable EnumerateFilesForReplication( + Guid checkpointToken, long hlogStartAddress, long hlogEndAddress) + { + // 1. Per-flush files: ..flush.bftree where addr ∈ [start, end) + foreach (var (path, name, addr) in EnumerateFlushFiles()) + { + if (addr >= hlogStartAddress && addr < hlogEndAddress) + yield return new RangeIndexFileEntry(path, name, isFlushFile: true); + } + + // 2. Per-checkpoint snapshot files: cpr-checkpoints//rangeindex/.bftree + if (!string.IsNullOrEmpty(cprDir)) + { + var snapshotDir = CheckpointSnapshotDir(checkpointToken); + if (Directory.Exists(snapshotDir)) + { + foreach (var path in Directory.EnumerateFiles(snapshotDir, "*.bftree")) + { + var name = System.IO.Path.GetFileName(path); + yield return new RangeIndexFileEntry(path, name, isFlushFile: false); + } + } + } + } } } \ No newline at end of file From 59f04a635d4d2a736fb5f8b89f51d7668b3aa78f Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Mon, 18 May 2026 16:20:50 -0700 Subject: [PATCH 02/14] Add primary-side RangeIndex replication: reader, data source, and transmit Implement the primary-side pipeline for shipping RangeIndex files during disk-based replication: - Add STORE_RANGEINDEX_FLUSH (7) and STORE_RANGEINDEX_SNAPSHOT (8) to CheckpointFileType enum - RangeIndexFileDataSource: FileStream-based ISnapshotDataSource for chunked reading of .bftree files - RangeIndexFileTransmitSource: sends metadata header (keyHash + address as binary, startAddress=-1) then streams file content in chunks - RangeIndexSnapshotReader: ISnapshotReader that enumerates RI files via EnumerateFilesForReplication and yields transmit sources - Make RangeIndexFileEntry, EnumerateFilesForReplication, and RiLogRoot public for cross-project access Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../Server/Replication/CheckpointFileType.cs | 8 ++ .../RangeIndexFileDataSource.cs | 104 +++++++++++++++++ .../RangeIndexFileTransmitSource.cs | 108 ++++++++++++++++++ .../RangeIndexSnapshotReader.cs | 73 ++++++++++++ .../Resp/RangeIndex/RangeIndexManager.cs | 26 +++-- 5 files changed, 310 insertions(+), 9 deletions(-) create mode 100644 libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexFileDataSource.cs create mode 100644 libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexFileTransmitSource.cs create mode 100644 libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexSnapshotReader.cs diff --git a/libs/cluster/Server/Replication/CheckpointFileType.cs b/libs/cluster/Server/Replication/CheckpointFileType.cs index e5721b6a6e3..bddb0c54e93 100644 --- a/libs/cluster/Server/Replication/CheckpointFileType.cs +++ b/libs/cluster/Server/Replication/CheckpointFileType.cs @@ -33,5 +33,13 @@ enum CheckpointFileType : byte /// Store Snapshot - Object /// STORE_SNAPSHOT_OBJ = 6, + /// + /// RangeIndex per-flush snapshot file (flush.bftree) + /// + STORE_RANGEINDEX_FLUSH = 7, + /// + /// RangeIndex per-checkpoint snapshot file (checkpoint .bftree) + /// + STORE_RANGEINDEX_SNAPSHOT = 8, } } \ No newline at end of file diff --git a/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexFileDataSource.cs b/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexFileDataSource.cs new file mode 100644 index 00000000000..f372f9abb37 --- /dev/null +++ b/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexFileDataSource.cs @@ -0,0 +1,104 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace Garnet.cluster +{ + /// + /// A checkpoint data source that reads a RangeIndex .bftree file using FileStream. + /// Unlike which uses Tsavorite's IDevice for sector-aligned I/O, + /// this source reads plain files directly since .bftree files are not managed by the device layer. + /// + internal sealed class RangeIndexFileDataSource : ISnapshotDataSource + { + /// + /// Default chunk size for streaming .bftree files (128 KB). + /// + internal const int DefaultChunkSize = 1 << 17; + + private readonly string filePath; + private readonly int chunkSize; + private FileStream stream; + + /// + public CheckpointFileType Type { get; } + + /// + public Guid Token { get; } + + /// + /// The 32-character key hash prefix identifying the RangeIndex tree. + /// + public string KeyHash { get; } + + /// + /// The logical hlog address embedded in the flush filename. + /// Only meaningful for . + /// + public long Address { get; } + + /// + public long StartOffset => 0; + + /// + public long CurrentOffset { get; private set; } + + /// + public long EndOffset { get; } + + /// + public bool HasNextChunk => CurrentOffset < EndOffset; + + /// + /// Creates a new RangeIndexFileDataSource. + /// + /// The checkpoint file type (STORE_RANGEINDEX_FLUSH or STORE_RANGEINDEX_SNAPSHOT). + /// The checkpoint token. + /// Full path to the .bftree file on disk. + /// The 32-character key hash prefix. + /// The hlog logical address (flush files only). + /// Maximum bytes to read per chunk. + public RangeIndexFileDataSource(CheckpointFileType type, Guid token, string filePath, string keyHash, long address, int chunkSize = DefaultChunkSize) + { + Type = type; + Token = token; + KeyHash = keyHash; + Address = address; + this.filePath = filePath; + this.chunkSize = chunkSize; + + var fileInfo = new FileInfo(filePath); + if (!fileInfo.Exists) + throw new FileNotFoundException($"RangeIndex file not found: {filePath}"); + + EndOffset = fileInfo.Length; + } + + /// + public async Task ReadNextChunkAsync(CancellationToken cancellationToken = default) + { + stream ??= new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.Read, bufferSize: chunkSize, useAsync: true); + + var remaining = EndOffset - CurrentOffset; + var bytesToRead = (int)Math.Min(remaining, chunkSize); + var buffer = new byte[bytesToRead]; + + var bytesRead = await stream.ReadAsync(buffer, 0, bytesToRead, cancellationToken).ConfigureAwait(false); + var chunkStart = CurrentOffset; + CurrentOffset += bytesRead; + + return new DataSourceReadResult(buffer, chunkStartAddress: chunkStart); + } + + /// + public void Dispose() + { + stream?.Dispose(); + stream = null; + } + } +} diff --git a/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexFileTransmitSource.cs b/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexFileTransmitSource.cs new file mode 100644 index 00000000000..1407373e976 --- /dev/null +++ b/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexFileTransmitSource.cs @@ -0,0 +1,108 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Garnet.client; +using Garnet.common; +using Microsoft.Extensions.Logging; + +namespace Garnet.cluster +{ + /// + /// Transmits a RangeIndex file over the network. Sends a metadata header (with + /// startAddress = -1) containing the serialized key hash and address needed to + /// reconstruct the target path, followed by chunked file content, followed by an empty + /// end-of-transmission marker. + /// + /// Metadata payload layout: + /// + /// STORE_RANGEINDEX_FLUSH: keyHash (32 bytes ASCII) + address (8 bytes little-endian) = 40 bytes + /// STORE_RANGEINDEX_SNAPSHOT: keyHash (32 bytes ASCII) = 32 bytes + /// + /// + internal sealed class RangeIndexFileTransmitSource : ISnapshotTransmitSource + { + private readonly ILogger logger; + + public ISnapshotDataSource DataSource { get; } + + public RangeIndexFileTransmitSource(ISnapshotDataSource dataSource, ILogger logger = null) + { + DataSource = dataSource; + this.logger = logger; + } + + /// + public async Task TransmitAsync(GarnetClientSession gcs, TimeSpan timeout, CancellationToken cancellationToken = default) + { + var riDataSource = (RangeIndexFileDataSource)DataSource; + var fileTokenBytes = DataSource.Token.ToByteArray(); + + // Serialize metadata: keyHash + address (flush only) + var metadata = SerializeMetadata(riDataSource); + + // Send header with startAddress = -1 to indicate single-message control payload. + var headerResp = await gcs.ExecuteClusterSnapshotData( + fileTokenBytes, (int)DataSource.Type, -1, metadata) + .WaitAsync(timeout, cancellationToken).ConfigureAwait(false); + + if (!headerResp.Equals("OK")) + ExceptionUtils.ThrowException(new GarnetException( + $"Primary error at RangeIndex header {DataSource.Type} {headerResp} keyHash={riDataSource.KeyHash}")); + + // Stream file content in chunks + while (DataSource.HasNextChunk) + { + var result = await DataSource.ReadNextChunkAsync(cancellationToken).ConfigureAwait(false); + + var resp = await gcs.ExecuteClusterSnapshotData( + fileTokenBytes, + (int)DataSource.Type, + startAddress: result.ChunkStartAddress, + new Span(result.Data, 0, result.BytesRead)).WaitAsync(timeout, cancellationToken).ConfigureAwait(false); + + if (!resp.Equals("OK")) + ExceptionUtils.ThrowException(new GarnetException( + $"Primary error at RangeIndex TransmitAsync {DataSource.Type} {resp} [{DataSource.StartOffset},{DataSource.CurrentOffset},{DataSource.EndOffset}]")); + } + + // Send empty package to indicate end of transmission + var endResp = await gcs.ExecuteClusterSnapshotData( + fileTokenBytes, (int)DataSource.Type, DataSource.CurrentOffset, []) + .WaitAsync(timeout, cancellationToken).ConfigureAwait(false); + + if (!endResp.Equals("OK")) + ExceptionUtils.ThrowException(new GarnetException( + $"Primary error at RangeIndex TransmitAsync Completion {DataSource.Type} {endResp}")); + } + + /// + /// Serializes the metadata payload for the header message. + /// FLUSH: keyHash (32 bytes ASCII) + address (8 bytes LE) = 40 bytes. + /// SNAPSHOT: keyHash (32 bytes ASCII) = 32 bytes. + /// + private static byte[] SerializeMetadata(RangeIndexFileDataSource source) + { + var keyHashBytes = Encoding.ASCII.GetBytes(source.KeyHash); + + if (source.Type == CheckpointFileType.STORE_RANGEINDEX_FLUSH) + { + var metadata = new byte[32 + 8]; + Buffer.BlockCopy(keyHashBytes, 0, metadata, 0, 32); + BitConverter.TryWriteBytes(metadata.AsSpan(32), source.Address); + return metadata; + } + + // Snapshot: keyHash only + return keyHashBytes; + } + + public void Dispose() + { + DataSource?.Dispose(); + } + } +} diff --git a/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexSnapshotReader.cs b/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexSnapshotReader.cs new file mode 100644 index 00000000000..3d039534828 --- /dev/null +++ b/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexSnapshotReader.cs @@ -0,0 +1,73 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.Collections.Generic; +using Garnet.server; +using Microsoft.Extensions.Logging; + +namespace Garnet.cluster +{ + /// + /// Snapshot reader for RangeIndex files. Enumerates all flush.bftree and checkpoint + /// snapshot.bftree files that must be shipped for a given checkpoint, and yields + /// transmit sources for each. + /// + /// Each sends a filename header + /// (with startAddress = -1) followed by chunked file content, so the replica + /// knows where to write each file. + /// + internal sealed class RangeIndexSnapshotReader : ISnapshotReader + { + private readonly List dataSources = []; + private readonly ILogger logger; + + /// + /// Creates a new RangeIndexSnapshotReader. + /// + /// The RangeIndexManager instance for file enumeration. + /// The checkpoint token (storeHlogToken). + /// hybridLogFileStartAddress from LogFileInfo. + /// hybridLogFileEndAddress from LogFileInfo. + /// Optional logger. + public RangeIndexSnapshotReader( + RangeIndexManager rangeIndexManager, + Guid checkpointToken, + long hlogStartAddress, + long hlogEndAddress, + ILogger logger = null) + { + this.logger = logger; + + foreach (var entry in rangeIndexManager.EnumerateFilesForReplication(checkpointToken, hlogStartAddress, hlogEndAddress)) + { + var type = entry.IsFlushFile + ? CheckpointFileType.STORE_RANGEINDEX_FLUSH + : CheckpointFileType.STORE_RANGEINDEX_SNAPSHOT; + + dataSources.Add(new RangeIndexFileDataSource(type, checkpointToken, entry.Path, entry.KeyHash, entry.Address)); + logger?.LogInformation("RangeIndexSnapshotReader: queued {type} keyHash={keyHash} addr={addr}", type, entry.KeyHash, entry.Address); + } + } + + /// + public IEnumerable GetTransmitSources() + { + foreach (var dataSource in dataSources) + { + yield return new RangeIndexFileTransmitSource(dataSource, logger); + } + } + + /// + public void Dispose() + { + foreach (var ds in dataSources) + { + try { ds.Dispose(); } + catch (Exception ex) { logger?.LogError(ex, "Error disposing RI data source {type} {keyHash}", ds.Type, ds.KeyHash); } + } + dataSources.Clear(); + } + } +} diff --git a/libs/server/Resp/RangeIndex/RangeIndexManager.cs b/libs/server/Resp/RangeIndex/RangeIndexManager.cs index 932301cdb08..6bca4023fcd 100644 --- a/libs/server/Resp/RangeIndex/RangeIndexManager.cs +++ b/libs/server/Resp/RangeIndex/RangeIndexManager.cs @@ -68,7 +68,7 @@ public sealed partial class RangeIndexManager : IDisposable /// Gets the log-tied root directory for RI files. Used by the cluster replication layer /// to determine the target directory for received flush files on the replica side. - internal string RiLogRoot => riLogRoot; + public string RiLogRoot => riLogRoot; /// /// Tracks BfTree entries (activated + pending), keyed by = a Guid @@ -930,21 +930,25 @@ void TryDelete(string p) /// /// Represents a single RangeIndex file that must be shipped during replication. /// - internal readonly struct RangeIndexFileEntry + public readonly struct RangeIndexFileEntry { /// Full path to the file on the primary. public readonly string Path; - /// Target filename to recreate on the replica (relative to its root). - public readonly string FileName; + /// The 32-character key hash prefix identifying the RangeIndex tree. + public readonly string KeyHash; + + /// The hlog logical address (for flush files); 0 for checkpoint snapshots. + public readonly long Address; /// Whether this is a per-flush file (true) or a per-checkpoint snapshot (false). public readonly bool IsFlushFile; - public RangeIndexFileEntry(string path, string fileName, bool isFlushFile) + public RangeIndexFileEntry(string path, string keyHash, long address, bool isFlushFile) { Path = path; - FileName = fileName; + KeyHash = keyHash; + Address = address; IsFlushFile = isFlushFile; } } @@ -975,14 +979,17 @@ public RangeIndexFileEntry(string path, string fileName, bool isFlushFile) /// LogFileInfo — exclusive upper bound for flush file filtering. /// An enumerable of describing files to /// transfer. Empty if RangeIndex is not configured or no files match. - internal IEnumerable EnumerateFilesForReplication( + public IEnumerable EnumerateFilesForReplication( Guid checkpointToken, long hlogStartAddress, long hlogEndAddress) { // 1. Per-flush files: ..flush.bftree where addr ∈ [start, end) foreach (var (path, name, addr) in EnumerateFlushFiles()) { if (addr >= hlogStartAddress && addr < hlogEndAddress) - yield return new RangeIndexFileEntry(path, name, isFlushFile: true); + { + var keyHash = name[..HashPrefixLength]; + yield return new RangeIndexFileEntry(path, keyHash, addr, isFlushFile: true); + } } // 2. Per-checkpoint snapshot files: cpr-checkpoints//rangeindex/.bftree @@ -994,7 +1001,8 @@ internal IEnumerable EnumerateFilesForReplication( foreach (var path in Directory.EnumerateFiles(snapshotDir, "*.bftree")) { var name = System.IO.Path.GetFileName(path); - yield return new RangeIndexFileEntry(path, name, isFlushFile: false); + var keyHash = name[..HashPrefixLength]; + yield return new RangeIndexFileEntry(path, keyHash, address: 0, isFlushFile: false); } } } From ee2ecec602bf4cd86b98ef4dcd134263f2e1740e Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Mon, 18 May 2026 16:59:28 -0700 Subject: [PATCH 03/14] Add receiver-side RangeIndex replication and session integration - Add RangeIndexFileDataSink with FromMetadata factory for zero-copy deserialization of metadata header directly from ReadOnlySpan - Integrate RangeIndexSnapshotReader into ReplicaSyncSession.SendCheckpointAsync conditioned on serverOptions.EnableRangeIndexPreview - Move GetMetadata serialization into RangeIndexFileDataSource (not interface) - Make LogFlushPath/CheckpointSnapshotPath public, rangeIndexManager public - Remove unused Garnet.server/System.Text usings from handler Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../RangeIndexFileDataSource.cs | 17 +++ .../RangeIndexFileTransmitSource.cs | 26 +---- .../ReplicaSyncSession.cs | 12 ++ .../RangeIndexFileDataSink.cs | 107 ++++++++++++++++++ .../ReceiveCheckpointHandler.cs | 7 ++ .../Resp/RangeIndex/RangeIndexManager.cs | 4 +- libs/server/StoreWrapper.cs | 2 +- 7 files changed, 148 insertions(+), 27 deletions(-) create mode 100644 libs/cluster/Server/Replication/ReplicaOps/DiskbasedReplication/RangeIndexFileDataSink.cs diff --git a/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexFileDataSource.cs b/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexFileDataSource.cs index f372f9abb37..0460761cdd6 100644 --- a/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexFileDataSource.cs +++ b/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexFileDataSource.cs @@ -53,6 +53,23 @@ internal sealed class RangeIndexFileDataSource : ISnapshotDataSource /// public bool HasNextChunk => CurrentOffset < EndOffset; + /// + public byte[] GetMetadata() + { + var keyHashBytes = System.Text.Encoding.ASCII.GetBytes(KeyHash); + + if (Type == CheckpointFileType.STORE_RANGEINDEX_FLUSH) + { + var metadata = new byte[32 + 8]; + Buffer.BlockCopy(keyHashBytes, 0, metadata, 0, 32); + BitConverter.TryWriteBytes(metadata.AsSpan(32), Address); + return metadata; + } + + // Snapshot: keyHash only + return keyHashBytes; + } + /// /// Creates a new RangeIndexFileDataSource. /// diff --git a/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexFileTransmitSource.cs b/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexFileTransmitSource.cs index 1407373e976..52124c36a1a 100644 --- a/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexFileTransmitSource.cs +++ b/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexFileTransmitSource.cs @@ -2,7 +2,6 @@ // Licensed under the MIT license. using System; -using System.Text; using System.Threading; using System.Threading.Tasks; using Garnet.client; @@ -41,8 +40,8 @@ public async Task TransmitAsync(GarnetClientSession gcs, TimeSpan timeout, Cance var riDataSource = (RangeIndexFileDataSource)DataSource; var fileTokenBytes = DataSource.Token.ToByteArray(); - // Serialize metadata: keyHash + address (flush only) - var metadata = SerializeMetadata(riDataSource); + // Get metadata from data source: keyHash + address (flush only) + var metadata = riDataSource.GetMetadata(); // Send header with startAddress = -1 to indicate single-message control payload. var headerResp = await gcs.ExecuteClusterSnapshotData( @@ -79,27 +78,6 @@ public async Task TransmitAsync(GarnetClientSession gcs, TimeSpan timeout, Cance $"Primary error at RangeIndex TransmitAsync Completion {DataSource.Type} {endResp}")); } - /// - /// Serializes the metadata payload for the header message. - /// FLUSH: keyHash (32 bytes ASCII) + address (8 bytes LE) = 40 bytes. - /// SNAPSHOT: keyHash (32 bytes ASCII) = 32 bytes. - /// - private static byte[] SerializeMetadata(RangeIndexFileDataSource source) - { - var keyHashBytes = Encoding.ASCII.GetBytes(source.KeyHash); - - if (source.Type == CheckpointFileType.STORE_RANGEINDEX_FLUSH) - { - var metadata = new byte[32 + 8]; - Buffer.BlockCopy(keyHashBytes, 0, metadata, 0, 32); - BitConverter.TryWriteBytes(metadata.AsSpan(32), source.Address); - return metadata; - } - - // Snapshot: keyHash only - return keyHashBytes; - } - public void Dispose() { DataSource?.Dispose(); diff --git a/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/ReplicaSyncSession.cs b/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/ReplicaSyncSession.cs index 6154e9e7ec5..471c7fd9200 100644 --- a/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/ReplicaSyncSession.cs +++ b/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/ReplicaSyncSession.cs @@ -139,6 +139,18 @@ public async Task SendCheckpointAsync() using var checkpointTransmissionDriver = new SnapshotTransmissionDriver(gcs, storeWrapper.serverOptions.ReplicaSyncTimeout, logger); checkpointTransmissionDriver.AddReader(new TsavoriteSnapshotReader(clusterProvider, localEntry, hlog_size, index_size, storeWrapper.serverOptions.ReplicaSyncTimeout, logger)); + + // Add RangeIndex files if RI is enabled + if (storeWrapper.serverOptions.EnableRangeIndexPreview) + { + checkpointTransmissionDriver.AddReader(new RangeIndexSnapshotReader( + storeWrapper.rangeIndexManager, + localEntry.metadata.storeHlogToken, + hlog_size.hybridLogFileStartAddress, + hlog_size.hybridLogFileEndAddress, + logger)); + } + await checkpointTransmissionDriver.SendCheckpointAsync(cts.Token).ConfigureAwait(false); } #endregion diff --git a/libs/cluster/Server/Replication/ReplicaOps/DiskbasedReplication/RangeIndexFileDataSink.cs b/libs/cluster/Server/Replication/ReplicaOps/DiskbasedReplication/RangeIndexFileDataSink.cs new file mode 100644 index 00000000000..e761182e27e --- /dev/null +++ b/libs/cluster/Server/Replication/ReplicaOps/DiskbasedReplication/RangeIndexFileDataSink.cs @@ -0,0 +1,107 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.IO; +using System.Text; +using Garnet.common; +using Garnet.server; +using Microsoft.Extensions.Logging; + +namespace Garnet.cluster +{ + /// + /// FileStream-backed implementation of that writes + /// RangeIndex .bftree file segments received from the primary during replication. + /// + internal sealed class RangeIndexFileDataSink : ISnapshotDataSink + { + private readonly string filePath; + private readonly ILogger logger; + private FileStream stream; + + public CheckpointFileType Type { get; } + public Guid Token { get; } + + /// + /// Creates a new RangeIndexFileDataSink. + /// + /// The checkpoint file type. + /// The checkpoint token. + /// Full path to write the received file. + /// Optional logger. + private RangeIndexFileDataSink(CheckpointFileType type, Guid token, string filePath, ILogger logger = null) + { + Type = type; + Token = token; + this.filePath = filePath; + this.logger = logger; + + // Ensure the target directory exists + var dir = Path.GetDirectoryName(filePath); + if (!string.IsNullOrEmpty(dir)) + Directory.CreateDirectory(dir); + + // Open for writing (overwrite any partial file from a prior failed attempt) + stream = new FileStream(filePath, FileMode.Create, FileAccess.Write, FileShare.None, bufferSize: 1 << 17, useAsync: false); + } + + /// + /// Deserializes the metadata payload and creates a sink targeting the correct file path. + /// Metadata layout: + /// + /// STORE_RANGEINDEX_FLUSH: keyHash (32 bytes ASCII) + address (8 bytes LE) = 40 bytes + /// STORE_RANGEINDEX_SNAPSHOT: keyHash (32 bytes ASCII) = 32 bytes + /// + /// + /// The checkpoint file type. + /// The checkpoint token. + /// The raw metadata bytes from the header message. + /// The RangeIndex manager for path derivation. + /// Optional logger. + public static RangeIndexFileDataSink FromMetadata(CheckpointFileType type, Guid token, ReadOnlySpan metadata, RangeIndexManager riManager, ILogger logger = null) + { + if (metadata.Length < 32) + ExceptionUtils.ThrowException(new GarnetException($"RangeIndex metadata too short ({metadata.Length} bytes) for type {type}")); + + var keyHash = Encoding.ASCII.GetString(metadata[..32]); + string filePath; + + if (type == CheckpointFileType.STORE_RANGEINDEX_FLUSH) + { + if (metadata.Length < 40) + ExceptionUtils.ThrowException(new GarnetException($"RangeIndex flush metadata too short ({metadata.Length} bytes), expected 40")); + var address = BitConverter.ToInt64(metadata[32..40]); + filePath = riManager.LogFlushPath(keyHash, address); + } + else + { + filePath = riManager.CheckpointSnapshotPath(keyHash, token); + } + + return new RangeIndexFileDataSink(type, token, filePath, logger); + } + + /// + public void WriteChunk(long startAddress, ReadOnlySpan data) + { + stream.Write(data); + } + + /// + public void Complete() + { + stream?.Flush(); + stream?.Dispose(); + stream = null; + logger?.LogInformation("RangeIndexFileDataSink: completed writing {type} to {path}", Type, filePath); + } + + /// + public void Dispose() + { + stream?.Dispose(); + stream = null; + } + } +} diff --git a/libs/cluster/Server/Replication/ReplicaOps/DiskbasedReplication/ReceiveCheckpointHandler.cs b/libs/cluster/Server/Replication/ReplicaOps/DiskbasedReplication/ReceiveCheckpointHandler.cs index 830f467e693..470a9034bf7 100644 --- a/libs/cluster/Server/Replication/ReplicaOps/DiskbasedReplication/ReceiveCheckpointHandler.cs +++ b/libs/cluster/Server/Replication/ReplicaOps/DiskbasedReplication/ReceiveCheckpointHandler.cs @@ -112,6 +112,13 @@ public void ProcessSnapshotData(Guid token, CheckpointFileType type, long startA { switch (type) { + case CheckpointFileType.STORE_RANGEINDEX_FLUSH: + case CheckpointFileType.STORE_RANGEINDEX_SNAPSHOT: + // Create sink immediately from metadata; data chunks will follow + if (!clusterProvider.serverOptions.EnableRangeIndexPreview) + ExceptionUtils.ThrowException(new GarnetException("RangeIndex not enabled but received RI checkpoint data")); + activeSink = RangeIndexFileDataSink.FromMetadata(type, token, data, clusterProvider.storeWrapper.rangeIndexManager, logger); + return; case CheckpointFileType.STORE_INDEX: case CheckpointFileType.STORE_SNAPSHOT: var sink = new MetadataDataSink(type, token, clusterProvider); diff --git a/libs/server/Resp/RangeIndex/RangeIndexManager.cs b/libs/server/Resp/RangeIndex/RangeIndexManager.cs index 6bca4023fcd..3d2d54311b5 100644 --- a/libs/server/Resp/RangeIndex/RangeIndexManager.cs +++ b/libs/server/Resp/RangeIndex/RangeIndexManager.cs @@ -253,11 +253,11 @@ internal string LogScratchPath(string hashPrefix) => Path.Combine(riLogRoot ?? string.Empty, hashPrefix + ".scratch.cpr"); /// {logRoot}/<hash>.<addr:x16>.flush.bftree - internal string LogFlushPath(string hashPrefix, long logicalAddress) + public string LogFlushPath(string hashPrefix, long logicalAddress) => Path.Combine(riLogRoot ?? string.Empty, $"{hashPrefix}.{logicalAddress:x16}.flush.bftree"); /// {cprDir}/<token>/rangeindex/<hash>.bftree - internal string CheckpointSnapshotPath(string hashPrefix, Guid checkpointToken) + public string CheckpointSnapshotPath(string hashPrefix, Guid checkpointToken) => Path.Combine(cprDir ?? string.Empty, checkpointToken.ToString(), "rangeindex", hashPrefix + ".bftree"); /// The directory holding per-checkpoint RI snapshots for a given token. diff --git a/libs/server/StoreWrapper.cs b/libs/server/StoreWrapper.cs index c1938427fa4..0d4fe333d66 100644 --- a/libs/server/StoreWrapper.cs +++ b/libs/server/StoreWrapper.cs @@ -121,7 +121,7 @@ public sealed class StoreWrapper /// /// RangeIndex (BfTree) manager shared across sessions /// - internal readonly RangeIndexManager rangeIndexManager; + public readonly RangeIndexManager rangeIndexManager; /// /// Definition for delegate creating a new logical database From 34b7b670d35095385d6d90f1bf538555a3bc9553 Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Tue, 19 May 2026 11:08:43 -0700 Subject: [PATCH 04/14] fix formatting --- .../PrimaryOps/DiskbasedReplication/RangeIndexFileDataSource.cs | 2 +- .../DiskbasedReplication/RangeIndexFileTransmitSource.cs | 2 +- .../PrimaryOps/DiskbasedReplication/RangeIndexSnapshotReader.cs | 2 +- .../ReplicaOps/DiskbasedReplication/RangeIndexFileDataSink.cs | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexFileDataSource.cs b/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexFileDataSource.cs index 0460761cdd6..129110809b3 100644 --- a/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexFileDataSource.cs +++ b/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexFileDataSource.cs @@ -118,4 +118,4 @@ public void Dispose() stream = null; } } -} +} \ No newline at end of file diff --git a/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexFileTransmitSource.cs b/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexFileTransmitSource.cs index 52124c36a1a..8b3373762e1 100644 --- a/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexFileTransmitSource.cs +++ b/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexFileTransmitSource.cs @@ -83,4 +83,4 @@ public void Dispose() DataSource?.Dispose(); } } -} +} \ No newline at end of file diff --git a/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexSnapshotReader.cs b/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexSnapshotReader.cs index 3d039534828..6fadf0a09e8 100644 --- a/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexSnapshotReader.cs +++ b/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexSnapshotReader.cs @@ -70,4 +70,4 @@ public void Dispose() dataSources.Clear(); } } -} +} \ No newline at end of file diff --git a/libs/cluster/Server/Replication/ReplicaOps/DiskbasedReplication/RangeIndexFileDataSink.cs b/libs/cluster/Server/Replication/ReplicaOps/DiskbasedReplication/RangeIndexFileDataSink.cs index e761182e27e..5b254dac6a7 100644 --- a/libs/cluster/Server/Replication/ReplicaOps/DiskbasedReplication/RangeIndexFileDataSink.cs +++ b/libs/cluster/Server/Replication/ReplicaOps/DiskbasedReplication/RangeIndexFileDataSink.cs @@ -104,4 +104,4 @@ public void Dispose() stream = null; } } -} +} \ No newline at end of file From dd4024d0d74f7e83720437f87a12651247954e4c Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Tue, 19 May 2026 14:34:00 -0700 Subject: [PATCH 05/14] add tests for cluster diskbased replication --- Garnet.slnx | 1 + libs/host/GarnetServer.cs | 4 +- .../ClusterRangeIndexReplicationTests.cs | 328 ++++++++++++++++++ ...test.cluster.replication.rangeindex.csproj | 46 +++ .../Garnet.test.cluster/ClusterTestContext.cs | 13 +- test/standalone/Garnet.test/TestUtils.cs | 6 +- 6 files changed, 389 insertions(+), 9 deletions(-) create mode 100644 test/cluster/Garnet.test.cluster.replication.rangeindex/ClusterRangeIndexReplicationTests.cs create mode 100644 test/cluster/Garnet.test.cluster.replication.rangeindex/Garnet.test.cluster.replication.rangeindex.csproj diff --git a/Garnet.slnx b/Garnet.slnx index 9208610c2ed..c4076e99b04 100644 --- a/Garnet.slnx +++ b/Garnet.slnx @@ -93,5 +93,6 @@ + diff --git a/libs/host/GarnetServer.cs b/libs/host/GarnetServer.cs index 68651184064..0cd4ac1feb7 100644 --- a/libs/host/GarnetServer.cs +++ b/libs/host/GarnetServer.cs @@ -188,8 +188,8 @@ private void InitializeServer() var clusterFactory = opts.EnableCluster ? new ClusterFactory() : null; - if (opts.EnableCluster && opts.EnableRangeIndexPreview) - throw new GarnetException("Range Index (preview) is not supported in cluster mode."); + if (opts.ReplicaDisklessSync && opts.EnableRangeIndexPreview) + throw new GarnetException("Range Index (preview) is not supported in cluster diskless mode."); this.logger = this.loggerFactory?.CreateLogger("GarnetServer"); logger?.LogInformation("Garnet {version} {bits} bit; {clusterMode} mode; Endpoint: [{endpoint}]", diff --git a/test/cluster/Garnet.test.cluster.replication.rangeindex/ClusterRangeIndexReplicationTests.cs b/test/cluster/Garnet.test.cluster.replication.rangeindex/ClusterRangeIndexReplicationTests.cs new file mode 100644 index 00000000000..6522f04ceae --- /dev/null +++ b/test/cluster/Garnet.test.cluster.replication.rangeindex/ClusterRangeIndexReplicationTests.cs @@ -0,0 +1,328 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.Collections.Generic; +using System.Net; +using Microsoft.Extensions.Logging; +using NUnit.Framework; +using NUnit.Framework.Legacy; + +namespace Garnet.test.cluster +{ + [TestFixture] + [NonParallelizable] + public class ClusterRangeIndexReplicationTests : TestBase + { + ClusterTestContext context; + + readonly int timeout = 60; + readonly int testTimeout = (int)TimeSpan.FromSeconds(120).TotalSeconds; + + public Dictionary monitorTests = new() + { + { "ClusterRangeIndexCheckpointSync", LogLevel.Trace }, + { "ClusterRangeIndexReplicationFailover", LogLevel.Trace }, + }; + + [SetUp] + public virtual void Setup() + { + context = new ClusterTestContext(); + context.Setup(monitorTests, testTimeoutSeconds: testTimeout); + } + + [TearDown] + public virtual void TearDown() + { + context?.TearDown(); + } + + void PopulateRangeIndex(int nodeIndex, string indexName, int fieldCount) + { + var endpoint = (IPEndPoint)context.endpoints[nodeIndex]; + var result = context.clusterTestUtils.Execute(endpoint, "RI.CREATE", + [indexName, "MEMORY", "CACHESIZE", "65536", "MINRECORD", "8"], + logger: context.logger); + ClassicAssert.AreEqual("OK", (string)result); + + for (var i = 0; i < fieldCount; i++) + { + result = context.clusterTestUtils.Execute(endpoint, "RI.SET", + [indexName, $"field{i:D4}", $"value{i:D4}"], + logger: context.logger); + ClassicAssert.AreEqual("OK", (string)result); + } + } + + void ValidateRangeIndex(int nodeIndex, string indexName, int fieldCount) + { + var endpoint = (IPEndPoint)context.endpoints[nodeIndex]; + for (var i = 0; i < fieldCount; i++) + { + var result = context.clusterTestUtils.Execute(endpoint, "RI.GET", + [indexName, $"field{i:D4}"], + logger: context.logger); + ClassicAssert.AreEqual($"value{i:D4}", (string)result, + $"Mismatch at field{i:D4} on node {nodeIndex}"); + } + } + + /// + /// Create RI keys on primary, take checkpoint, attach replica via checkpoint-based sync. + /// Verify RI.GET returns correct values on replica. + /// + [Test, Order(1)] + [Category("REPLICATION")] + public void ClusterRangeIndexCheckpointSync() + { + var primaryIndex = 0; + var replicaIndex = 1; + var nodes_count = 2; + + context.CreateInstances(nodes_count, disableObjects: true, enableAOF: true, + enableRangeIndexPreview: true); + context.CreateConnection(); + var (_, _) = context.clusterTestUtils.SimpleSetupCluster(1, 1, logger: context.logger); + + // Populate range index on primary + PopulateRangeIndex(primaryIndex, "idx1", 10); + + // Wait for replica to sync via AOF + context.clusterTestUtils.WaitForReplicaAofSync(primaryIndex, replicaIndex, context.logger); + + // Validate on replica + ValidateRangeIndex(replicaIndex, "idx1", 10); + } + + /// + /// Multiple RI keys with MEMORY backend, checkpoint, replicate. + /// Verify all keys and their fields are accessible on replica. + /// + [Test, Order(2)] + [Category("REPLICATION")] + public void ClusterRangeIndexCheckpointSyncMultipleKeys() + { + var primaryIndex = 0; + var replicaIndex = 1; + var nodes_count = 2; + + context.CreateInstances(nodes_count, disableObjects: true, enableAOF: true, + enableRangeIndexPreview: true); + context.CreateConnection(); + var (_, _) = context.clusterTestUtils.SimpleSetupCluster(1, 1, logger: context.logger); + + // Populate multiple range indexes on primary + PopulateRangeIndex(primaryIndex, "idx1", 10); + PopulateRangeIndex(primaryIndex, "idx2", 20); + PopulateRangeIndex(primaryIndex, "idx3", 5); + + // Take checkpoint + context.clusterTestUtils.Checkpoint(primaryIndex, logger: context.logger); + var primaryLastSaveTime = context.clusterTestUtils.LastSave(primaryIndex, logger: context.logger); + context.clusterTestUtils.WaitCheckpoint(primaryIndex, primaryLastSaveTime, logger: context.logger); + + // Wait for replica to sync + context.clusterTestUtils.WaitForReplicaAofSync(primaryIndex, replicaIndex, context.logger); + + // Validate all indexes on replica + ValidateRangeIndex(replicaIndex, "idx1", 10); + ValidateRangeIndex(replicaIndex, "idx2", 20); + ValidateRangeIndex(replicaIndex, "idx3", 5); + } + + /// + /// Low memory forces RI key eviction (flush.bftree files created). + /// Take checkpoint, attach replica. Verify flush snapshot files are transferred + /// and RI data is restored on replica. + /// + [Test, Order(3)] + [Category("REPLICATION")] + public void ClusterRangeIndexCheckpointSyncWithEviction() + { + var primaryIndex = 0; + var replicaIndex = 1; + var nodes_count = 2; + + context.CreateInstances(nodes_count, disableObjects: true, enableAOF: true, + lowMemory: true, enableRangeIndexPreview: true); + context.CreateConnection(); + var (_, _) = context.clusterTestUtils.SimpleSetupCluster(1, 1, logger: context.logger); + + // Populate range indexes — low memory should trigger eviction + PopulateRangeIndex(primaryIndex, "idx1", 10); + PopulateRangeIndex(primaryIndex, "idx2", 10); + + // Take checkpoint + context.clusterTestUtils.Checkpoint(primaryIndex, logger: context.logger); + var primaryLastSaveTime = context.clusterTestUtils.LastSave(primaryIndex, logger: context.logger); + context.clusterTestUtils.WaitCheckpoint(primaryIndex, primaryLastSaveTime, logger: context.logger); + + // Wait for replica to sync + context.clusterTestUtils.WaitForReplicaAofSync(primaryIndex, replicaIndex, context.logger); + + // Validate on replica — even evicted keys should be accessible + ValidateRangeIndex(replicaIndex, "idx1", 10); + ValidateRangeIndex(replicaIndex, "idx2", 10); + } + + /// + /// Populate RI → checkpoint → more RI.SET → attach replica. + /// Verify both checkpointed data and AOF-replayed data arrive. + /// + [Test, Order(4)] + [Category("REPLICATION")] + public void ClusterRangeIndexAddReplicaAfterCheckpoint() + { + var primaryIndex = 0; + var replicaIndex = 1; + var nodes_count = 2; + + context.CreateInstances(nodes_count, tryRecover: true, disableObjects: true, + enableAOF: true, enableRangeIndexPreview: true); + context.CreateConnection(); + + // Setup primary only — do not attach replica yet + ClassicAssert.AreEqual("OK", context.clusterTestUtils.AddDelSlotsRange(0, + new List<(int, int)> { (0, 16383) }, true, context.logger)); + context.clusterTestUtils.BumpEpoch(0, logger: context.logger); + + // Populate and checkpoint + PopulateRangeIndex(primaryIndex, "idx1", 10); + context.clusterTestUtils.Checkpoint(primaryIndex, logger: context.logger); + var primaryLastSaveTime = context.clusterTestUtils.LastSave(primaryIndex, logger: context.logger); + context.clusterTestUtils.WaitCheckpoint(primaryIndex, primaryLastSaveTime, logger: context.logger); + + // Add more data after checkpoint (will be in AOF) + var endpoint = (IPEndPoint)context.endpoints[primaryIndex]; + for (var i = 10; i < 20; i++) + { + context.clusterTestUtils.Execute(endpoint, "RI.SET", + ["idx1", $"field{i:D4}", $"value{i:D4}"], + logger: context.logger); + } + + // Now attach replica + var primaryId = context.clusterTestUtils.GetNodeIdFromNode(0, context.logger); + context.clusterTestUtils.Meet(1, 0, logger: context.logger); + context.clusterTestUtils.WaitAll(context.logger); + _ = context.clusterTestUtils.ClusterReplicate(1, primaryId, async: false, logger: context.logger); + context.clusterTestUtils.BumpEpoch(1, logger: context.logger); + + // Wait for sync + context.clusterTestUtils.WaitForReplicaAofSync(primaryIndex, replicaIndex, context.logger); + + // Validate all 20 fields — 10 from checkpoint, 10 from AOF + ValidateRangeIndex(replicaIndex, "idx1", 20); + } + + /// + /// Checkpoint sync → shutdown replica → more RI.SET on primary → restart replica. + /// Verify replica catches up. + /// + [Test, Order(5)] + [Category("REPLICATION")] + public void ClusterRangeIndexReplicationRestartReplica() + { + var primaryIndex = 0; + var replicaIndex = 1; + var nodes_count = 2; + + context.CreateInstances(nodes_count, disableObjects: true, enableAOF: true, + enableRangeIndexPreview: true); + context.CreateConnection(); + var (_, _) = context.clusterTestUtils.SimpleSetupCluster(1, 1, logger: context.logger); + + // Populate and sync + PopulateRangeIndex(primaryIndex, "idx1", 10); + context.clusterTestUtils.WaitForReplicaAofSync(primaryIndex, replicaIndex, context.logger); + ValidateRangeIndex(replicaIndex, "idx1", 10); + + // Take checkpoint so replica can recover + var primaryLastSaveTime = context.clusterTestUtils.LastSave(primaryIndex, logger: context.logger); + var replicaLastSaveTime = context.clusterTestUtils.LastSave(replicaIndex, logger: context.logger); + context.clusterTestUtils.Checkpoint(primaryIndex, logger: context.logger); + context.clusterTestUtils.WaitCheckpoint(primaryIndex, primaryLastSaveTime, logger: context.logger); + context.clusterTestUtils.WaitCheckpoint(replicaIndex, replicaLastSaveTime, logger: context.logger); + context.clusterTestUtils.WaitForReplicaAofSync(primaryIndex, replicaIndex, context.logger); + + // Shutdown replica + context.nodes[replicaIndex].Dispose(false); + context.clusterTestUtils.WaitForAofSyncDriverDipose(primaryIndex); + + // Add more data while replica is down + var endpoint = (IPEndPoint)context.endpoints[primaryIndex]; + for (var i = 10; i < 20; i++) + { + context.clusterTestUtils.Execute(endpoint, "RI.SET", + ["idx1", $"field{i:D4}", $"value{i:D4}"], + logger: context.logger); + } + + // Restart replica + context.nodes[replicaIndex] = context.CreateInstance( + context.clusterTestUtils.GetEndPoint(replicaIndex), + disableObjects: true, + tryRecover: true, + enableAOF: true, + timeout: timeout, + cleanClusterConfig: false, + enableRangeIndexPreview: true); + context.nodes[replicaIndex].Start(); + context.CreateConnection(); + + // Wait for sync + context.clusterTestUtils.WaitForReplicaAofSync(primaryIndex, replicaIndex, context.logger); + + // Validate all 20 fields + ValidateRangeIndex(replicaIndex, "idx1", 20); + } + + /// + /// Populate RI on primary → replicate via checkpoint → failover replica to primary. + /// Verify RI data accessible on new primary. + /// + [Test, Order(6)] + [Category("REPLICATION")] + public void ClusterRangeIndexReplicationFailover() + { + var primaryIndex = 0; + var replicaIndex = 1; + var nodes_count = 2; + + context.CreateInstances(nodes_count, disableObjects: true, enableAOF: true, + enableRangeIndexPreview: true); + context.CreateConnection(); + var (_, _) = context.clusterTestUtils.SimpleSetupCluster(1, 1, logger: context.logger); + + // Populate and sync + PopulateRangeIndex(primaryIndex, "idx1", 10); + context.clusterTestUtils.WaitForReplicaAofSync(primaryIndex, replicaIndex, context.logger); + ValidateRangeIndex(replicaIndex, "idx1", 10); + + // Take checkpoint + var primaryLastSaveTime = context.clusterTestUtils.LastSave(primaryIndex, logger: context.logger); + var replicaLastSaveTime = context.clusterTestUtils.LastSave(replicaIndex, logger: context.logger); + context.clusterTestUtils.Checkpoint(primaryIndex, logger: context.logger); + context.clusterTestUtils.WaitCheckpoint(primaryIndex, primaryLastSaveTime, logger: context.logger); + context.clusterTestUtils.WaitCheckpoint(replicaIndex, replicaLastSaveTime, logger: context.logger); + context.clusterTestUtils.WaitForReplicaAofSync(primaryIndex, replicaIndex, context.logger); + + // Failover + _ = context.clusterTestUtils.ClusterFailover(replicaIndex, logger: context.logger); + context.clusterTestUtils.WaitForNoFailover(replicaIndex, logger: context.logger); + context.clusterTestUtils.WaitForFailoverCompleted(replicaIndex, logger: context.logger); + context.clusterTestUtils.WaitForReplicaRecovery(primaryIndex, logger: context.logger); + + // Validate RI data on new primary (was replicaIndex) + ValidateRangeIndex(replicaIndex, "idx1", 10); + + // Verify we can write to new primary + var newPrimaryEndpoint = (IPEndPoint)context.endpoints[replicaIndex]; + var result = context.clusterTestUtils.Execute(newPrimaryEndpoint, "RI.SET", + ["idx1", "field0010", "value0010"], + logger: context.logger); + ClassicAssert.AreEqual("OK", (string)result); + } + } +} diff --git a/test/cluster/Garnet.test.cluster.replication.rangeindex/Garnet.test.cluster.replication.rangeindex.csproj b/test/cluster/Garnet.test.cluster.replication.rangeindex/Garnet.test.cluster.replication.rangeindex.csproj new file mode 100644 index 00000000000..0db6c2b85b3 --- /dev/null +++ b/test/cluster/Garnet.test.cluster.replication.rangeindex/Garnet.test.cluster.replication.rangeindex.csproj @@ -0,0 +1,46 @@ + + + + true + ../../../Garnet.snk + false + + + + 1701;1702;1591 + + + + + PreserveNewest + + + + + + + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + + + + + + + + + + false + + diff --git a/test/cluster/Garnet.test.cluster/ClusterTestContext.cs b/test/cluster/Garnet.test.cluster/ClusterTestContext.cs index 098baf3bc11..703b5e8a7ea 100644 --- a/test/cluster/Garnet.test.cluster/ClusterTestContext.cs +++ b/test/cluster/Garnet.test.cluster/ClusterTestContext.cs @@ -326,7 +326,8 @@ public void CreateInstances( ClusterPreferredEndpointType clusterPreferredEndpointType = ClusterPreferredEndpointType.Ip, bool useClusterAnnounceHostname = false, int vectorSetReplayTaskCount = 0, - int threadPoolMinIOCompletionThreads = 0) + int threadPoolMinIOCompletionThreads = 0, + bool enableRangeIndexPreview = false) { var ipAddress = IPAddress.Loopback; TestUtils.EndPoint = new IPEndPoint(ipAddress, Port); @@ -386,7 +387,8 @@ public void CreateInstances( clusterPreferredEndpointType: clusterPreferredEndpointType, clusterAnnounceHostname: useClusterAnnounceHostname ? "localhost" : null, vectorSetReplayTaskCount: vectorSetReplayTaskCount, - threadPoolMinIOCompletionThreads: threadPoolMinIOCompletionThreads); + threadPoolMinIOCompletionThreads: threadPoolMinIOCompletionThreads, + enableRangeIndexPreview: enableRangeIndexPreview); foreach (var node in nodes) node.Start(); @@ -454,9 +456,9 @@ public GarnetServer CreateInstance( EndPoint clusterAnnounceEndpoint = null, X509CertificateCollection certificates = null, ServerCredential clusterCreds = new ServerCredential(), - int threadPoolMinIOCompletionThreads = 0) + int threadPoolMinIOCompletionThreads = 0, + bool enableRangeIndexPreview = false) { - var opts = TestUtils.GetGarnetServerOptions( TestFolder, TestFolder, @@ -490,7 +492,8 @@ public GarnetServer CreateInstance( certificates: certificates, clusterAnnounceEndpoint: clusterAnnounceEndpoint, vectorSetReplayTaskCount: vectorSetReplayTaskCount, - threadPoolMinIOCompletionThreads: threadPoolMinIOCompletionThreads); + threadPoolMinIOCompletionThreads: threadPoolMinIOCompletionThreads, + enableRangeIndexPreview: enableRangeIndexPreview); return new GarnetServer(opts, loggerFactory); } diff --git a/test/standalone/Garnet.test/TestUtils.cs b/test/standalone/Garnet.test/TestUtils.cs index bf51b8a9656..6ecef8b2fd8 100644 --- a/test/standalone/Garnet.test/TestUtils.cs +++ b/test/standalone/Garnet.test/TestUtils.cs @@ -565,7 +565,8 @@ public static (GarnetServer[] Nodes, GarnetServerOptions[] Options) CreateGarnet ClusterPreferredEndpointType clusterPreferredEndpointType = ClusterPreferredEndpointType.Ip, string clusterAnnounceHostname = null, int vectorSetReplayTaskCount = 0, - int threadPoolMinIOCompletionThreads = 0) + int threadPoolMinIOCompletionThreads = 0, + bool enableRangeIndexPreview = false) { if (UseAzureStorage) IgnoreIfNotRunningAzureTests(); @@ -633,7 +634,8 @@ public static (GarnetServer[] Nodes, GarnetServerOptions[] Options) CreateGarnet clusterPreferredEndpointType: clusterPreferredEndpointType, clusterAnnounceHostname: clusterAnnounceHostname, vectorSetReplayTaskCount: vectorSetReplayTaskCount, - threadPoolMinIOCompletionThreads: threadPoolMinIOCompletionThreads); + threadPoolMinIOCompletionThreads: threadPoolMinIOCompletionThreads, + enableRangeIndexPreview: enableRangeIndexPreview); ClassicAssert.IsNotNull(opts); From 3cb2ce2b5c8e143b98493b6ae537ad480be22451 Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Tue, 19 May 2026 15:00:51 -0700 Subject: [PATCH 06/14] fix formatting --- .../ClusterRangeIndexReplicationTests.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/cluster/Garnet.test.cluster.replication.rangeindex/ClusterRangeIndexReplicationTests.cs b/test/cluster/Garnet.test.cluster.replication.rangeindex/ClusterRangeIndexReplicationTests.cs index 6522f04ceae..770ffdac7f8 100644 --- a/test/cluster/Garnet.test.cluster.replication.rangeindex/ClusterRangeIndexReplicationTests.cs +++ b/test/cluster/Garnet.test.cluster.replication.rangeindex/ClusterRangeIndexReplicationTests.cs @@ -325,4 +325,4 @@ public void ClusterRangeIndexReplicationFailover() ClassicAssert.AreEqual("OK", (string)result); } } -} +} \ No newline at end of file From 8e95283e281387d2ab2639d7eb46c0577f9e7413 Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Tue, 19 May 2026 15:06:44 -0700 Subject: [PATCH 07/14] addressing comments round 1 --- .../RangeIndexFileTransmitSource.cs | 6 ++--- .../ClusterRangeIndexReplicationTests.cs | 26 +++++++++---------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexFileTransmitSource.cs b/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexFileTransmitSource.cs index 8b3373762e1..989854ad08b 100644 --- a/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexFileTransmitSource.cs +++ b/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexFileTransmitSource.cs @@ -50,7 +50,7 @@ public async Task TransmitAsync(GarnetClientSession gcs, TimeSpan timeout, Cance if (!headerResp.Equals("OK")) ExceptionUtils.ThrowException(new GarnetException( - $"Primary error at RangeIndex header {DataSource.Type} {headerResp} keyHash={riDataSource.KeyHash}")); + $"Replica error at RangeIndex header {DataSource.Type} {headerResp} keyHash={riDataSource.KeyHash}")); // Stream file content in chunks while (DataSource.HasNextChunk) @@ -65,7 +65,7 @@ public async Task TransmitAsync(GarnetClientSession gcs, TimeSpan timeout, Cance if (!resp.Equals("OK")) ExceptionUtils.ThrowException(new GarnetException( - $"Primary error at RangeIndex TransmitAsync {DataSource.Type} {resp} [{DataSource.StartOffset},{DataSource.CurrentOffset},{DataSource.EndOffset}]")); + $"Replica error at RangeIndex TransmitAsync {DataSource.Type} {resp} [{DataSource.StartOffset},{DataSource.CurrentOffset},{DataSource.EndOffset}]")); } // Send empty package to indicate end of transmission @@ -75,7 +75,7 @@ public async Task TransmitAsync(GarnetClientSession gcs, TimeSpan timeout, Cance if (!endResp.Equals("OK")) ExceptionUtils.ThrowException(new GarnetException( - $"Primary error at RangeIndex TransmitAsync Completion {DataSource.Type} {endResp}")); + $"Replica error at RangeIndex TransmitAsync Completion {DataSource.Type} {endResp}")); } public void Dispose() diff --git a/test/cluster/Garnet.test.cluster.replication.rangeindex/ClusterRangeIndexReplicationTests.cs b/test/cluster/Garnet.test.cluster.replication.rangeindex/ClusterRangeIndexReplicationTests.cs index 770ffdac7f8..b4dd9fd7994 100644 --- a/test/cluster/Garnet.test.cluster.replication.rangeindex/ClusterRangeIndexReplicationTests.cs +++ b/test/cluster/Garnet.test.cluster.replication.rangeindex/ClusterRangeIndexReplicationTests.cs @@ -78,9 +78,9 @@ public void ClusterRangeIndexCheckpointSync() { var primaryIndex = 0; var replicaIndex = 1; - var nodes_count = 2; + var nodesCount = 2; - context.CreateInstances(nodes_count, disableObjects: true, enableAOF: true, + context.CreateInstances(nodesCount, disableObjects: true, enableAOF: true, enableRangeIndexPreview: true); context.CreateConnection(); var (_, _) = context.clusterTestUtils.SimpleSetupCluster(1, 1, logger: context.logger); @@ -105,9 +105,9 @@ public void ClusterRangeIndexCheckpointSyncMultipleKeys() { var primaryIndex = 0; var replicaIndex = 1; - var nodes_count = 2; + var nodesCount = 2; - context.CreateInstances(nodes_count, disableObjects: true, enableAOF: true, + context.CreateInstances(nodesCount, disableObjects: true, enableAOF: true, enableRangeIndexPreview: true); context.CreateConnection(); var (_, _) = context.clusterTestUtils.SimpleSetupCluster(1, 1, logger: context.logger); @@ -142,9 +142,9 @@ public void ClusterRangeIndexCheckpointSyncWithEviction() { var primaryIndex = 0; var replicaIndex = 1; - var nodes_count = 2; + var nodesCount = 2; - context.CreateInstances(nodes_count, disableObjects: true, enableAOF: true, + context.CreateInstances(nodesCount, disableObjects: true, enableAOF: true, lowMemory: true, enableRangeIndexPreview: true); context.CreateConnection(); var (_, _) = context.clusterTestUtils.SimpleSetupCluster(1, 1, logger: context.logger); @@ -176,9 +176,9 @@ public void ClusterRangeIndexAddReplicaAfterCheckpoint() { var primaryIndex = 0; var replicaIndex = 1; - var nodes_count = 2; + var nodesCount = 2; - context.CreateInstances(nodes_count, tryRecover: true, disableObjects: true, + context.CreateInstances(nodesCount, tryRecover: true, disableObjects: true, enableAOF: true, enableRangeIndexPreview: true); context.CreateConnection(); @@ -226,9 +226,9 @@ public void ClusterRangeIndexReplicationRestartReplica() { var primaryIndex = 0; var replicaIndex = 1; - var nodes_count = 2; + var nodesCount = 2; - context.CreateInstances(nodes_count, disableObjects: true, enableAOF: true, + context.CreateInstances(nodesCount, disableObjects: true, enableAOF: true, enableRangeIndexPreview: true); context.CreateConnection(); var (_, _) = context.clusterTestUtils.SimpleSetupCluster(1, 1, logger: context.logger); @@ -288,9 +288,9 @@ public void ClusterRangeIndexReplicationFailover() { var primaryIndex = 0; var replicaIndex = 1; - var nodes_count = 2; + var nodesCount = 2; - context.CreateInstances(nodes_count, disableObjects: true, enableAOF: true, + context.CreateInstances(nodesCount, disableObjects: true, enableAOF: true, enableRangeIndexPreview: true); context.CreateConnection(); var (_, _) = context.clusterTestUtils.SimpleSetupCluster(1, 1, logger: context.logger); @@ -325,4 +325,4 @@ public void ClusterRangeIndexReplicationFailover() ClassicAssert.AreEqual("OK", (string)result); } } -} \ No newline at end of file +} From 4308a0b508db0c1f065113d5ea0fa16c0f442060 Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Tue, 19 May 2026 15:09:44 -0700 Subject: [PATCH 08/14] enforce consistent little endian read/write for rangeindex metadata --- .../RangeIndexFileDataSource.cs | 22 ++++++++++++++++--- .../RangeIndexFileDataSink.cs | 17 +++++++------- 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexFileDataSource.cs b/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexFileDataSource.cs index 129110809b3..9b3b82ef930 100644 --- a/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexFileDataSource.cs +++ b/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexFileDataSource.cs @@ -2,6 +2,7 @@ // Licensed under the MIT license. using System; +using System.Buffers.Binary; using System.IO; using System.Threading; using System.Threading.Tasks; @@ -20,6 +21,21 @@ internal sealed class RangeIndexFileDataSource : ISnapshotDataSource /// internal const int DefaultChunkSize = 1 << 17; + /// + /// Length in bytes of the ASCII-encoded key hash in the metadata payload. + /// + internal const int KeyHashLength = 32; + + /// + /// Length in bytes of the little-endian encoded logical address in the metadata payload. + /// + internal const int AddressLength = sizeof(long); + + /// + /// Total metadata length for flush files (key hash + address). + /// + internal const int FlushMetadataLength = KeyHashLength + AddressLength; + private readonly string filePath; private readonly int chunkSize; private FileStream stream; @@ -60,9 +76,9 @@ public byte[] GetMetadata() if (Type == CheckpointFileType.STORE_RANGEINDEX_FLUSH) { - var metadata = new byte[32 + 8]; - Buffer.BlockCopy(keyHashBytes, 0, metadata, 0, 32); - BitConverter.TryWriteBytes(metadata.AsSpan(32), Address); + var metadata = new byte[FlushMetadataLength]; + Buffer.BlockCopy(keyHashBytes, 0, metadata, 0, KeyHashLength); + BinaryPrimitives.WriteInt64LittleEndian(metadata.AsSpan(KeyHashLength), Address); return metadata; } diff --git a/libs/cluster/Server/Replication/ReplicaOps/DiskbasedReplication/RangeIndexFileDataSink.cs b/libs/cluster/Server/Replication/ReplicaOps/DiskbasedReplication/RangeIndexFileDataSink.cs index 5b254dac6a7..d226cd4917d 100644 --- a/libs/cluster/Server/Replication/ReplicaOps/DiskbasedReplication/RangeIndexFileDataSink.cs +++ b/libs/cluster/Server/Replication/ReplicaOps/DiskbasedReplication/RangeIndexFileDataSink.cs @@ -2,6 +2,7 @@ // Licensed under the MIT license. using System; +using System.Buffers.Binary; using System.IO; using System.Text; using Garnet.common; @@ -48,10 +49,10 @@ private RangeIndexFileDataSink(CheckpointFileType type, Guid token, string fileP /// /// Deserializes the metadata payload and creates a sink targeting the correct file path. - /// Metadata layout: + /// Metadata layout (sizes defined in ): /// - /// STORE_RANGEINDEX_FLUSH: keyHash (32 bytes ASCII) + address (8 bytes LE) = 40 bytes - /// STORE_RANGEINDEX_SNAPSHOT: keyHash (32 bytes ASCII) = 32 bytes + /// STORE_RANGEINDEX_FLUSH: keyHash (KeyHashLength bytes ASCII) + address (AddressLength bytes LE) = FlushMetadataLength bytes + /// STORE_RANGEINDEX_SNAPSHOT: keyHash (KeyHashLength bytes ASCII) /// /// /// The checkpoint file type. @@ -61,17 +62,17 @@ private RangeIndexFileDataSink(CheckpointFileType type, Guid token, string fileP /// Optional logger. public static RangeIndexFileDataSink FromMetadata(CheckpointFileType type, Guid token, ReadOnlySpan metadata, RangeIndexManager riManager, ILogger logger = null) { - if (metadata.Length < 32) + if (metadata.Length < RangeIndexFileDataSource.KeyHashLength) ExceptionUtils.ThrowException(new GarnetException($"RangeIndex metadata too short ({metadata.Length} bytes) for type {type}")); - var keyHash = Encoding.ASCII.GetString(metadata[..32]); + var keyHash = Encoding.ASCII.GetString(metadata[..RangeIndexFileDataSource.KeyHashLength]); string filePath; if (type == CheckpointFileType.STORE_RANGEINDEX_FLUSH) { - if (metadata.Length < 40) - ExceptionUtils.ThrowException(new GarnetException($"RangeIndex flush metadata too short ({metadata.Length} bytes), expected 40")); - var address = BitConverter.ToInt64(metadata[32..40]); + if (metadata.Length < RangeIndexFileDataSource.FlushMetadataLength) + ExceptionUtils.ThrowException(new GarnetException($"RangeIndex flush metadata too short ({metadata.Length} bytes), expected {RangeIndexFileDataSource.FlushMetadataLength}")); + var address = BinaryPrimitives.ReadInt64LittleEndian(metadata[RangeIndexFileDataSource.KeyHashLength..RangeIndexFileDataSource.FlushMetadataLength]); filePath = riManager.LogFlushPath(keyHash, address); } else From 55433fb249ac6f9d91c8a6a3e8558467cd9badd3 Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Tue, 19 May 2026 15:15:03 -0700 Subject: [PATCH 09/14] expose rangeIndexManager through the ClusterProvider --- libs/cluster/Server/ClusterProvider.cs | 2 ++ .../PrimaryOps/DiskbasedReplication/ReplicaSyncSession.cs | 2 +- .../ReplicaOps/DiskbasedReplication/ReceiveCheckpointHandler.cs | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/libs/cluster/Server/ClusterProvider.cs b/libs/cluster/Server/ClusterProvider.cs index 42a0421b207..627fecdcadb 100644 --- a/libs/cluster/Server/ClusterProvider.cs +++ b/libs/cluster/Server/ClusterProvider.cs @@ -27,6 +27,7 @@ public sealed partial class ClusterProvider : IClusterProvider internal readonly ILoggerFactory loggerFactory; internal readonly StoreWrapper storeWrapper; internal readonly GarnetServerOptions serverOptions; + internal readonly RangeIndexManager rangeIndexManager; internal long GarnetCurrentEpoch = 1; ClusterAuthContainer authContainer; @@ -47,6 +48,7 @@ public ClusterProvider(StoreWrapper storeWrapper) { this.storeWrapper = storeWrapper; this.serverOptions = storeWrapper.serverOptions; + this.rangeIndexManager = storeWrapper.rangeIndexManager; this.loggerFactory = storeWrapper.loggerFactory; authContainer = new ClusterAuthContainer diff --git a/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/ReplicaSyncSession.cs b/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/ReplicaSyncSession.cs index 471c7fd9200..456f16777c7 100644 --- a/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/ReplicaSyncSession.cs +++ b/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/ReplicaSyncSession.cs @@ -144,7 +144,7 @@ public async Task SendCheckpointAsync() if (storeWrapper.serverOptions.EnableRangeIndexPreview) { checkpointTransmissionDriver.AddReader(new RangeIndexSnapshotReader( - storeWrapper.rangeIndexManager, + clusterProvider.rangeIndexManager, localEntry.metadata.storeHlogToken, hlog_size.hybridLogFileStartAddress, hlog_size.hybridLogFileEndAddress, diff --git a/libs/cluster/Server/Replication/ReplicaOps/DiskbasedReplication/ReceiveCheckpointHandler.cs b/libs/cluster/Server/Replication/ReplicaOps/DiskbasedReplication/ReceiveCheckpointHandler.cs index 470a9034bf7..4d5e6777ee3 100644 --- a/libs/cluster/Server/Replication/ReplicaOps/DiskbasedReplication/ReceiveCheckpointHandler.cs +++ b/libs/cluster/Server/Replication/ReplicaOps/DiskbasedReplication/ReceiveCheckpointHandler.cs @@ -117,7 +117,7 @@ public void ProcessSnapshotData(Guid token, CheckpointFileType type, long startA // Create sink immediately from metadata; data chunks will follow if (!clusterProvider.serverOptions.EnableRangeIndexPreview) ExceptionUtils.ThrowException(new GarnetException("RangeIndex not enabled but received RI checkpoint data")); - activeSink = RangeIndexFileDataSink.FromMetadata(type, token, data, clusterProvider.storeWrapper.rangeIndexManager, logger); + activeSink = RangeIndexFileDataSink.FromMetadata(type, token, data, clusterProvider.rangeIndexManager, logger); return; case CheckpointFileType.STORE_INDEX: case CheckpointFileType.STORE_SNAPSHOT: From 08dad68d9abd0b2e907d643512d25e9a56cfce0b Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Tue, 19 May 2026 15:20:02 -0700 Subject: [PATCH 10/14] validate stream position --- .../ReplicaOps/DiskbasedReplication/RangeIndexFileDataSink.cs | 2 ++ libs/server/Resp/RangeIndex/RangeIndexManager.cs | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/libs/cluster/Server/Replication/ReplicaOps/DiskbasedReplication/RangeIndexFileDataSink.cs b/libs/cluster/Server/Replication/ReplicaOps/DiskbasedReplication/RangeIndexFileDataSink.cs index d226cd4917d..2abd4738ee1 100644 --- a/libs/cluster/Server/Replication/ReplicaOps/DiskbasedReplication/RangeIndexFileDataSink.cs +++ b/libs/cluster/Server/Replication/ReplicaOps/DiskbasedReplication/RangeIndexFileDataSink.cs @@ -86,6 +86,8 @@ public static RangeIndexFileDataSink FromMetadata(CheckpointFileType type, Guid /// public void WriteChunk(long startAddress, ReadOnlySpan data) { + if (stream.Position != startAddress) + ExceptionUtils.ThrowException(new GarnetException($"RangeIndexFileDataSink: expected stream position {startAddress} but found {stream.Position} for {filePath}")); stream.Write(data); } diff --git a/libs/server/Resp/RangeIndex/RangeIndexManager.cs b/libs/server/Resp/RangeIndex/RangeIndexManager.cs index 3d2d54311b5..17d709c525a 100644 --- a/libs/server/Resp/RangeIndex/RangeIndexManager.cs +++ b/libs/server/Resp/RangeIndex/RangeIndexManager.cs @@ -1000,7 +1000,7 @@ public IEnumerable EnumerateFilesForReplication( { foreach (var path in Directory.EnumerateFiles(snapshotDir, "*.bftree")) { - var name = System.IO.Path.GetFileName(path); + var name = Path.GetFileName(path); var keyHash = name[..HashPrefixLength]; yield return new RangeIndexFileEntry(path, keyHash, address: 0, isFlushFile: false); } From 1332479e1259113acc22be674da4113b36a00bf3 Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Tue, 19 May 2026 15:25:20 -0700 Subject: [PATCH 11/14] ensure break out when readAsync reads zero bytes --- .../RangeIndexFileDataSource.cs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexFileDataSource.cs b/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexFileDataSource.cs index 9b3b82ef930..37c767f0374 100644 --- a/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexFileDataSource.cs +++ b/libs/cluster/Server/Replication/PrimaryOps/DiskbasedReplication/RangeIndexFileDataSource.cs @@ -6,6 +6,7 @@ using System.IO; using System.Threading; using System.Threading.Tasks; +using Microsoft.Extensions.Logging; namespace Garnet.cluster { @@ -38,6 +39,7 @@ internal sealed class RangeIndexFileDataSource : ISnapshotDataSource private readonly string filePath; private readonly int chunkSize; + private readonly ILogger logger; private FileStream stream; /// @@ -95,7 +97,8 @@ public byte[] GetMetadata() /// The 32-character key hash prefix. /// The hlog logical address (flush files only). /// Maximum bytes to read per chunk. - public RangeIndexFileDataSource(CheckpointFileType type, Guid token, string filePath, string keyHash, long address, int chunkSize = DefaultChunkSize) + /// Optional logger. + public RangeIndexFileDataSource(CheckpointFileType type, Guid token, string filePath, string keyHash, long address, int chunkSize = DefaultChunkSize, ILogger logger = null) { Type = type; Token = token; @@ -103,6 +106,7 @@ public RangeIndexFileDataSource(CheckpointFileType type, Guid token, string file Address = address; this.filePath = filePath; this.chunkSize = chunkSize; + this.logger = logger; var fileInfo = new FileInfo(filePath); if (!fileInfo.Exists) @@ -121,6 +125,14 @@ public async Task ReadNextChunkAsync(CancellationToken can var buffer = new byte[bytesToRead]; var bytesRead = await stream.ReadAsync(buffer, 0, bytesToRead, cancellationToken).ConfigureAwait(false); + + if (bytesRead == 0) + { + logger?.LogWarning("RangeIndexFileDataSource: unexpected EOF at offset {currentOffset}, expected {endOffset} for {filePath}", CurrentOffset, EndOffset, filePath); + CurrentOffset = EndOffset; + return new DataSourceReadResult([], chunkStartAddress: CurrentOffset); + } + var chunkStart = CurrentOffset; CurrentOffset += bytesRead; From 6f3c70430b4d6003eeb4e9b932d4a5389147b349 Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Tue, 19 May 2026 15:29:12 -0700 Subject: [PATCH 12/14] add validation for snapshot file names --- libs/server/Resp/RangeIndex/RangeIndexManager.cs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/libs/server/Resp/RangeIndex/RangeIndexManager.cs b/libs/server/Resp/RangeIndex/RangeIndexManager.cs index 17d709c525a..bc0f846a0ee 100644 --- a/libs/server/Resp/RangeIndex/RangeIndexManager.cs +++ b/libs/server/Resp/RangeIndex/RangeIndexManager.cs @@ -1001,6 +1001,11 @@ public IEnumerable EnumerateFilesForReplication( foreach (var path in Directory.EnumerateFiles(snapshotDir, "*.bftree")) { var name = Path.GetFileName(path); + + // Skip files that don't match the expected format: <32-hex-hash>.bftree + if (name.Length != HashPrefixLength + ".bftree".Length) + continue; + var keyHash = name[..HashPrefixLength]; yield return new RangeIndexFileEntry(path, keyHash, address: 0, isFlushFile: false); } From 32a770acd7e1a6b3e7f0861c7644aad61591690a Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Tue, 19 May 2026 15:43:41 -0700 Subject: [PATCH 13/14] fix formatting --- .../ClusterRangeIndexReplicationTests.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/cluster/Garnet.test.cluster.replication.rangeindex/ClusterRangeIndexReplicationTests.cs b/test/cluster/Garnet.test.cluster.replication.rangeindex/ClusterRangeIndexReplicationTests.cs index b4dd9fd7994..1713846f1c0 100644 --- a/test/cluster/Garnet.test.cluster.replication.rangeindex/ClusterRangeIndexReplicationTests.cs +++ b/test/cluster/Garnet.test.cluster.replication.rangeindex/ClusterRangeIndexReplicationTests.cs @@ -325,4 +325,4 @@ public void ClusterRangeIndexReplicationFailover() ClassicAssert.AreEqual("OK", (string)result); } } -} +} \ No newline at end of file From 8d9e5a478d74e7e0598b40807fd357cf210eb876 Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Fri, 22 May 2026 17:04:09 -0700 Subject: [PATCH 14/14] pass RangeIndexManager to ClusterProvider CreateFactory --- libs/cluster/ClusterFactory.cs | 4 ++-- libs/cluster/Server/ClusterProvider.cs | 4 ++-- libs/server/Cluster/IClusterFactory.cs | 2 +- libs/server/StoreWrapper.cs | 4 ++-- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/libs/cluster/ClusterFactory.cs b/libs/cluster/ClusterFactory.cs index 753d896def7..189e76e237e 100644 --- a/libs/cluster/ClusterFactory.cs +++ b/libs/cluster/ClusterFactory.cs @@ -17,7 +17,7 @@ public DeviceLogCommitCheckpointManager CreateCheckpointManager(int aofPhysicalS => new GarnetClusterCheckpointManager(aofPhysicalSublogCount, deviceFactoryCreator, checkpointNamingScheme, isMainStore, logger: logger); /// - public IClusterProvider CreateClusterProvider(StoreWrapper store) - => new ClusterProvider(store); + public IClusterProvider CreateClusterProvider(StoreWrapper store, RangeIndexManager rangeIndexManager) + => new ClusterProvider(store, rangeIndexManager); } } \ No newline at end of file diff --git a/libs/cluster/Server/ClusterProvider.cs b/libs/cluster/Server/ClusterProvider.cs index 627fecdcadb..9188382a888 100644 --- a/libs/cluster/Server/ClusterProvider.cs +++ b/libs/cluster/Server/ClusterProvider.cs @@ -44,11 +44,11 @@ public sealed partial class ClusterProvider : IClusterProvider /// /// Create new cluster provider /// - public ClusterProvider(StoreWrapper storeWrapper) + public ClusterProvider(StoreWrapper storeWrapper, RangeIndexManager rangeIndexManager) { this.storeWrapper = storeWrapper; this.serverOptions = storeWrapper.serverOptions; - this.rangeIndexManager = storeWrapper.rangeIndexManager; + this.rangeIndexManager = rangeIndexManager; this.loggerFactory = storeWrapper.loggerFactory; authContainer = new ClusterAuthContainer diff --git a/libs/server/Cluster/IClusterFactory.cs b/libs/server/Cluster/IClusterFactory.cs index 6e39c11e222..2568ca71ce0 100644 --- a/libs/server/Cluster/IClusterFactory.cs +++ b/libs/server/Cluster/IClusterFactory.cs @@ -19,6 +19,6 @@ public interface IClusterFactory /// /// Create cluster provider /// - IClusterProvider CreateClusterProvider(StoreWrapper store); + IClusterProvider CreateClusterProvider(StoreWrapper store, RangeIndexManager rangeIndexManager); } } \ No newline at end of file diff --git a/libs/server/StoreWrapper.cs b/libs/server/StoreWrapper.cs index 0d4fe333d66..f6047c82870 100644 --- a/libs/server/StoreWrapper.cs +++ b/libs/server/StoreWrapper.cs @@ -121,7 +121,7 @@ public sealed class StoreWrapper /// /// RangeIndex (BfTree) manager shared across sessions /// - public readonly RangeIndexManager rangeIndexManager; + internal readonly RangeIndexManager rangeIndexManager; /// /// Definition for delegate creating a new logical database @@ -280,7 +280,7 @@ public StoreWrapper( } if (clusterFactory != null) - clusterProvider = clusterFactory.CreateClusterProvider(this); + clusterProvider = clusterFactory.CreateClusterProvider(this, rangeIndexManager); ctsCommit = new(); if (!serverOptions.EnableCluster)