Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Garnet.slnx
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,6 @@
<Project Path="test/cluster/Garnet.test.cluster.replication.disklesssync/Garnet.test.cluster.replication.disklesssync.csproj" />
<Project Path="test/cluster/Garnet.test.cluster.replication.tls/Garnet.test.cluster.replication.tls.csproj" />
<Project Path="test/cluster/Garnet.test.cluster.vectorsets/Garnet.test.cluster.vectorsets.csproj" />
<Project Path="test/cluster/Garnet.test.cluster.replication.rangeindex/Garnet.test.cluster.replication.rangeindex.csproj" />
</Folder>
</Solution>
4 changes: 2 additions & 2 deletions libs/cluster/ClusterFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public DeviceLogCommitCheckpointManager CreateCheckpointManager(int aofPhysicalS
=> new GarnetClusterCheckpointManager(aofPhysicalSublogCount, deviceFactoryCreator, checkpointNamingScheme, isMainStore, logger: logger);

/// <inheritdoc />
public IClusterProvider CreateClusterProvider(StoreWrapper store)
=> new ClusterProvider(store);
public IClusterProvider CreateClusterProvider(StoreWrapper store, RangeIndexManager rangeIndexManager)
=> new ClusterProvider(store, rangeIndexManager);
}
}
4 changes: 3 additions & 1 deletion libs/cluster/Server/ClusterProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public sealed partial class ClusterProvider : IClusterProvider
internal readonly ILoggerFactory loggerFactory;
internal readonly StoreWrapper storeWrapper;
internal readonly GarnetServerOptions serverOptions;
internal readonly RangeIndexManager rangeIndexManager;
internal long GarnetCurrentEpoch = 1;
ClusterAuthContainer authContainer;

Expand All @@ -43,10 +44,11 @@ public sealed partial class ClusterProvider : IClusterProvider
/// <summary>
/// Create new cluster provider
/// </summary>
public ClusterProvider(StoreWrapper storeWrapper)
public ClusterProvider(StoreWrapper storeWrapper, RangeIndexManager rangeIndexManager)
{
this.storeWrapper = storeWrapper;
this.serverOptions = storeWrapper.serverOptions;
this.rangeIndexManager = rangeIndexManager;
this.loggerFactory = storeWrapper.loggerFactory;

authContainer = new ClusterAuthContainer
Expand Down
8 changes: 8 additions & 0 deletions libs/cluster/Server/Replication/CheckpointFileType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,13 @@ enum CheckpointFileType : byte
/// Store Snapshot - Object
/// </summary>
STORE_SNAPSHOT_OBJ = 6,
/// <summary>
/// RangeIndex per-flush snapshot file (flush.bftree)
/// </summary>
STORE_RANGEINDEX_FLUSH = 7,
/// <summary>
/// RangeIndex per-checkpoint snapshot file (checkpoint .bftree)
/// </summary>
STORE_RANGEINDEX_SNAPSHOT = 8,
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System;
using System.Buffers.Binary;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

namespace Garnet.cluster
{
/// <summary>
/// A checkpoint data source that reads a RangeIndex .bftree file using FileStream.
/// Unlike <see cref="FileDataSource"/> which uses Tsavorite's IDevice for sector-aligned I/O,
/// this source reads plain files directly since .bftree files are not managed by the device layer.
/// </summary>
internal sealed class RangeIndexFileDataSource : ISnapshotDataSource
{
/// <summary>
/// Default chunk size for streaming .bftree files (128 KB).
/// </summary>
internal const int DefaultChunkSize = 1 << 17;

/// <summary>
/// Length in bytes of the ASCII-encoded key hash in the metadata payload.
/// </summary>
internal const int KeyHashLength = 32;

/// <summary>
/// Length in bytes of the little-endian encoded logical address in the metadata payload.
/// </summary>
internal const int AddressLength = sizeof(long);

/// <summary>
/// Total metadata length for flush files (key hash + address).
/// </summary>
internal const int FlushMetadataLength = KeyHashLength + AddressLength;

private readonly string filePath;
private readonly int chunkSize;
private readonly ILogger logger;
private FileStream stream;

/// <inheritdoc/>
public CheckpointFileType Type { get; }

/// <inheritdoc/>
public Guid Token { get; }

/// <summary>
/// The 32-character key hash prefix identifying the RangeIndex tree.
/// </summary>
public string KeyHash { get; }

/// <summary>
/// The logical hlog address embedded in the flush filename.
/// Only meaningful for <see cref="CheckpointFileType.STORE_RANGEINDEX_FLUSH"/>.
/// </summary>
public long Address { get; }

/// <inheritdoc/>
public long StartOffset => 0;

/// <inheritdoc/>
public long CurrentOffset { get; private set; }

/// <inheritdoc/>
public long EndOffset { get; }

/// <inheritdoc/>
public bool HasNextChunk => CurrentOffset < EndOffset;

/// <inheritdoc/>
public byte[] GetMetadata()
{
var keyHashBytes = System.Text.Encoding.ASCII.GetBytes(KeyHash);

if (Type == CheckpointFileType.STORE_RANGEINDEX_FLUSH)
{
var metadata = new byte[FlushMetadataLength];
Buffer.BlockCopy(keyHashBytes, 0, metadata, 0, KeyHashLength);
BinaryPrimitives.WriteInt64LittleEndian(metadata.AsSpan(KeyHashLength), Address);
return metadata;
}
Comment thread
vazois marked this conversation as resolved.

// Snapshot: keyHash only
return keyHashBytes;
}

/// <summary>
/// Creates a new RangeIndexFileDataSource.
/// </summary>
/// <param name="type">The checkpoint file type (STORE_RANGEINDEX_FLUSH or STORE_RANGEINDEX_SNAPSHOT).</param>
/// <param name="token">The checkpoint token.</param>
/// <param name="filePath">Full path to the .bftree file on disk.</param>
/// <param name="keyHash">The 32-character key hash prefix.</param>
/// <param name="address">The hlog logical address (flush files only).</param>
/// <param name="chunkSize">Maximum bytes to read per chunk.</param>
/// <param name="logger">Optional logger.</param>
public RangeIndexFileDataSource(CheckpointFileType type, Guid token, string filePath, string keyHash, long address, int chunkSize = DefaultChunkSize, ILogger logger = null)
{
Type = type;
Token = token;
KeyHash = keyHash;
Address = address;
this.filePath = filePath;
this.chunkSize = chunkSize;
this.logger = logger;

var fileInfo = new FileInfo(filePath);
if (!fileInfo.Exists)
throw new FileNotFoundException($"RangeIndex file not found: {filePath}");

EndOffset = fileInfo.Length;
}

/// <inheritdoc/>
public async Task<DataSourceReadResult> ReadNextChunkAsync(CancellationToken cancellationToken = default)
{
stream ??= new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.Read, bufferSize: chunkSize, useAsync: true);

var remaining = EndOffset - CurrentOffset;
var bytesToRead = (int)Math.Min(remaining, chunkSize);
var buffer = new byte[bytesToRead];
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Since BFTree files can get big, maybe for follow up PR we could optimize these allocations?
We could use ArrayPool<byte>.Shared, but the problem is DataSourceReadResult currently doesn't have a way to custome Buffer.Return - Maybe we would need to implement a IDataSourceReadResult and make a ReturnBuffer interface method


var bytesRead = await stream.ReadAsync(buffer, 0, bytesToRead, cancellationToken).ConfigureAwait(false);

if (bytesRead == 0)
{
logger?.LogWarning("RangeIndexFileDataSource: unexpected EOF at offset {currentOffset}, expected {endOffset} for {filePath}", CurrentOffset, EndOffset, filePath);
CurrentOffset = EndOffset;
return new DataSourceReadResult([], chunkStartAddress: CurrentOffset);
}

var chunkStart = CurrentOffset;
CurrentOffset += bytesRead;

return new DataSourceReadResult(buffer, chunkStartAddress: chunkStart);
Comment thread
vazois marked this conversation as resolved.
}

/// <inheritdoc/>
public void Dispose()
{
stream?.Dispose();
stream = null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System;
using System.Threading;
using System.Threading.Tasks;
using Garnet.client;
using Garnet.common;
using Microsoft.Extensions.Logging;

namespace Garnet.cluster
{
/// <summary>
/// Transmits a RangeIndex file over the network. Sends a metadata header (with
/// <c>startAddress = -1</c>) containing the serialized key hash and address needed to
/// reconstruct the target path, followed by chunked file content, followed by an empty
/// end-of-transmission marker.
///
/// <para>Metadata payload layout:</para>
/// <list type="bullet">
/// <item><b>STORE_RANGEINDEX_FLUSH</b>: keyHash (32 bytes ASCII) + address (8 bytes little-endian) = 40 bytes</item>
/// <item><b>STORE_RANGEINDEX_SNAPSHOT</b>: keyHash (32 bytes ASCII) = 32 bytes</item>
/// </list>
/// </summary>
internal sealed class RangeIndexFileTransmitSource : ISnapshotTransmitSource
{
private readonly ILogger logger;

public ISnapshotDataSource DataSource { get; }

public RangeIndexFileTransmitSource(ISnapshotDataSource dataSource, ILogger logger = null)
{
DataSource = dataSource;
this.logger = logger;
}

/// <inheritdoc/>
public async Task TransmitAsync(GarnetClientSession gcs, TimeSpan timeout, CancellationToken cancellationToken = default)
{
var riDataSource = (RangeIndexFileDataSource)DataSource;
var fileTokenBytes = DataSource.Token.ToByteArray();

// Get metadata from data source: keyHash + address (flush only)
var metadata = riDataSource.GetMetadata();

// Send header with startAddress = -1 to indicate single-message control payload.
var headerResp = await gcs.ExecuteClusterSnapshotData(
fileTokenBytes, (int)DataSource.Type, -1, metadata)
.WaitAsync(timeout, cancellationToken).ConfigureAwait(false);

if (!headerResp.Equals("OK"))
ExceptionUtils.ThrowException(new GarnetException(
$"Replica error at RangeIndex header {DataSource.Type} {headerResp} keyHash={riDataSource.KeyHash}"));

// Stream file content in chunks
while (DataSource.HasNextChunk)
{
var result = await DataSource.ReadNextChunkAsync(cancellationToken).ConfigureAwait(false);

var resp = await gcs.ExecuteClusterSnapshotData(
fileTokenBytes,
(int)DataSource.Type,
startAddress: result.ChunkStartAddress,
new Span<byte>(result.Data, 0, result.BytesRead)).WaitAsync(timeout, cancellationToken).ConfigureAwait(false);

if (!resp.Equals("OK"))
ExceptionUtils.ThrowException(new GarnetException(
$"Replica error at RangeIndex TransmitAsync {DataSource.Type} {resp} [{DataSource.StartOffset},{DataSource.CurrentOffset},{DataSource.EndOffset}]"));
}

// Send empty package to indicate end of transmission
var endResp = await gcs.ExecuteClusterSnapshotData(
fileTokenBytes, (int)DataSource.Type, DataSource.CurrentOffset, [])
.WaitAsync(timeout, cancellationToken).ConfigureAwait(false);

if (!endResp.Equals("OK"))
ExceptionUtils.ThrowException(new GarnetException(
$"Replica error at RangeIndex TransmitAsync Completion {DataSource.Type} {endResp}"));
}

public void Dispose()
{
DataSource?.Dispose();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System;
using System.Collections.Generic;
using Garnet.server;
using Microsoft.Extensions.Logging;

namespace Garnet.cluster
{
/// <summary>
/// Snapshot reader for RangeIndex files. Enumerates all flush.bftree and checkpoint
/// snapshot.bftree files that must be shipped for a given checkpoint, and yields
/// transmit sources for each.
///
/// <para>Each <see cref="RangeIndexFileTransmitSource"/> sends a filename header
/// (with <c>startAddress = -1</c>) followed by chunked file content, so the replica
/// knows where to write each file.</para>
/// </summary>
internal sealed class RangeIndexSnapshotReader : ISnapshotReader
{
private readonly List<RangeIndexFileDataSource> dataSources = [];
private readonly ILogger logger;

/// <summary>
/// Creates a new RangeIndexSnapshotReader.
/// </summary>
/// <param name="rangeIndexManager">The RangeIndexManager instance for file enumeration.</param>
/// <param name="checkpointToken">The checkpoint token (storeHlogToken).</param>
/// <param name="hlogStartAddress">hybridLogFileStartAddress from LogFileInfo.</param>
/// <param name="hlogEndAddress">hybridLogFileEndAddress from LogFileInfo.</param>
/// <param name="logger">Optional logger.</param>
public RangeIndexSnapshotReader(
RangeIndexManager rangeIndexManager,
Guid checkpointToken,
long hlogStartAddress,
long hlogEndAddress,
ILogger logger = null)
{
this.logger = logger;

foreach (var entry in rangeIndexManager.EnumerateFilesForReplication(checkpointToken, hlogStartAddress, hlogEndAddress))
{
var type = entry.IsFlushFile
? CheckpointFileType.STORE_RANGEINDEX_FLUSH
: CheckpointFileType.STORE_RANGEINDEX_SNAPSHOT;

dataSources.Add(new RangeIndexFileDataSource(type, checkpointToken, entry.Path, entry.KeyHash, entry.Address));
logger?.LogInformation("RangeIndexSnapshotReader: queued {type} keyHash={keyHash} addr={addr}", type, entry.KeyHash, entry.Address);
}
}

/// <inheritdoc/>
public IEnumerable<ISnapshotTransmitSource> GetTransmitSources()
{
foreach (var dataSource in dataSources)
{
yield return new RangeIndexFileTransmitSource(dataSource, logger);
}
}

/// <inheritdoc/>
public void Dispose()
{
foreach (var ds in dataSources)
{
try { ds.Dispose(); }
catch (Exception ex) { logger?.LogError(ex, "Error disposing RI data source {type} {keyHash}", ds.Type, ds.KeyHash); }
}
dataSources.Clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,18 @@ public async Task<bool> SendCheckpointAsync()

using var checkpointTransmissionDriver = new SnapshotTransmissionDriver(gcs, storeWrapper.serverOptions.ReplicaSyncTimeout, logger);
checkpointTransmissionDriver.AddReader(new TsavoriteSnapshotReader(clusterProvider, localEntry, hlog_size, index_size, storeWrapper.serverOptions.ReplicaSyncTimeout, logger));

// Add RangeIndex files if RI is enabled
if (storeWrapper.serverOptions.EnableRangeIndexPreview)
{
checkpointTransmissionDriver.AddReader(new RangeIndexSnapshotReader(
clusterProvider.rangeIndexManager,
localEntry.metadata.storeHlogToken,
hlog_size.hybridLogFileStartAddress,
hlog_size.hybridLogFileEndAddress,
logger));
}

await checkpointTransmissionDriver.SendCheckpointAsync(cts.Token).ConfigureAwait(false);
}
#endregion
Expand Down
Loading
Loading