Skip to content

Commit dc78c02

Browse files
Tiago NapoliCopilot
andcommitted
Add RangeIndex cluster migration with sketch protection
Migration plumbing: - Chunked serializer (pure state machine, no I/O) + async MigrationReader wrapper - Chunked deserializer with FileStream I/O for receiving migration data - TransmitRangeIndexAsync source-side driver with configurable chunk size - RangeIndexMigrationReceiveState receiver with state machine - Remove redundant content-length prefix in migration wire format - SnapshotRangeIndexAndCreateReader factory on RangeIndexManager Bug fixes: - Remove RI+cluster startup guard (GarnetServer.cs) - Fix round-trip migration: Publish deletes existing data file - Fix Publish registration: accept InPlaceUpdated/CopyUpdated - Fix serializer empty-buffer bug in FileData phase - Add missing buffer-too-small guard for KeyHeader phase - TransmitRangeIndexAsync catches all exceptions (never throws) Sketch protection (SLOTS path): - All RI keys added to sketch in one batch - TRANSMITTING epoch barrier blocks writes during snapshot+transmit - DELETING epoch barrier blocks reads+writes during delete - try/finally ensures sketch cleanup on failure - TODO: Vector Sets have the same unprotected pattern Sketch protection (KEYS path): - RI keys already in sketch from user enumeration - Mark transmitted keys for DeleteKeysAsync() Tests: - 26 unit tests for serializer/deserializer - 11 cluster migration integration tests (SingleBySlot, ByKeys, ManyBySlot, WhileModifying, MigrateBack, LargeTree, ChunkSize, StressAsync) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent ddd711b commit dc78c02

23 files changed

Lines changed: 3241 additions & 157 deletions

libs/client/ClientSession/GarnetClientSessionIncremental.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,12 @@ public enum MigrationRecordSpanType : byte
4242
/// Bespoke encoding for Vector Set indexes.
4343
/// </summary>
4444
VectorSetIndex = 3,
45+
46+
/// <summary>
47+
/// Chunked serialization stream for a RangeIndex key during migration.
48+
/// The receiver uses a state machine to track the in-progress stream.
49+
/// </summary>
50+
SerializedRangeIndexStream = 4,
4551
}
4652

4753
public sealed unsafe partial class GarnetClientSession : IServerHook, IMessageConsumer

libs/cluster/Server/Migration/MigrateOperation.cs

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,20 @@ internal sealed partial class MigrateOperation
2222
public StoreScan storeScan;
2323

2424
private readonly ConcurrentDictionary<byte[], byte[]> vectorSetsIndexKeysToMigrate;
25+
private readonly ConcurrentDictionary<byte[], byte[]> rangeIndexKeysToMigrate;
2526

2627
readonly MigrateSession session;
2728
readonly GarnetClientSession gcs;
2829
readonly LocalServerSession localServerSession;
2930

3031
public GarnetClientSession Client => gcs;
3132

33+
public LocalServerSession LocalSession => localServerSession;
34+
3235
public IEnumerable<KeyValuePair<byte[], byte[]>> VectorSets => vectorSetsIndexKeysToMigrate;
3336

37+
public IEnumerable<KeyValuePair<byte[], byte[]>> RangeIndexes => rangeIndexKeysToMigrate;
38+
3439
public void ThrowIfCancelled() => session._cts.Token.ThrowIfCancellationRequested();
3540

3641
public bool Contains(int slot) => session._sslots.Contains(slot);
@@ -46,6 +51,8 @@ public bool ContainsNamespace(ReadOnlySpan<byte> namespaceBytes)
4651
public void EncounteredVectorSet(byte[] key, byte[] value)
4752
=> vectorSetsIndexKeysToMigrate.TryAdd(key, value);
4853

54+
public void EncounteredRangeIndex(byte[] key, byte[] value) => rangeIndexKeysToMigrate.TryAdd(key, value);
55+
4956
public MigrateOperation(MigrateSession session, Sketch sketch = null, int batchSize = 1 << 18)
5057
{
5158
this.session = session;
@@ -55,6 +62,7 @@ public MigrateOperation(MigrateSession session, Sketch sketch = null, int batchS
5562
storeScan = new StoreScan(this);
5663
keysToDelete = [];
5764
vectorSetsIndexKeysToMigrate = new(ByteArrayComparer.Instance);
65+
rangeIndexKeysToMigrate = new(ByteArrayComparer.Instance);
5866
}
5967

6068
public async ValueTask<bool> InitializeAsync()
@@ -127,7 +135,7 @@ public async Task<bool> TransmitSlotsAsync()
127135
return true;
128136
}
129137

130-
public async Task<bool> TransmitKeysAsync(Dictionary<byte[], byte[]> vectorSetKeysToIgnore)
138+
public async Task<bool> TransmitKeysAsync(Dictionary<byte[], byte[]> vectorSetKeysToIgnore, Dictionary<byte[], byte[]> rangeIndexKeysToIgnore)
131139
{
132140
// Use this for both stores; main store will just use the SpanByteAndMemory directly. We want it to be outside iterations
133141
// so we can reuse the SpanByteAndMemory.Memory across iterations.
@@ -136,7 +144,8 @@ public async Task<bool> TransmitKeysAsync(Dictionary<byte[], byte[]> vectorSetKe
136144
var output = new UnifiedOutput();
137145

138146
#if NET9_0_OR_GREATER
139-
var ignoreLookup = vectorSetKeysToIgnore.GetAlternateLookup<ReadOnlySpan<byte>>();
147+
var vectorSetIgnoreLookup = vectorSetKeysToIgnore.GetAlternateLookup<ReadOnlySpan<byte>>();
148+
var rangeIndexIgnoreLookup = rangeIndexKeysToIgnore.GetAlternateLookup<ReadOnlySpan<byte>>();
140149
#endif
141150

142151
try
@@ -154,19 +163,23 @@ public async Task<bool> TransmitKeysAsync(Dictionary<byte[], byte[]> vectorSetKe
154163

155164
var spanByte = keys[i].Item1;
156165

157-
// Don't transmit if a Vector Set
158-
var isVectorSet =
159-
vectorSetKeysToIgnore.Count > 0 &&
166+
// Don't transmit VectorSet or RangeIndex keys — these are handled out-of-band
167+
var shouldSkip =
168+
(vectorSetKeysToIgnore.Count > 0 &&
160169
#if NET9_0_OR_GREATER
161-
ignoreLookup.ContainsKey(spanByte.ReadOnlySpan);
170+
vectorSetIgnoreLookup.ContainsKey(spanByte.ReadOnlySpan)) ||
162171
#else
163-
vectorSetKeysToIgnore.ContainsKey(spanByte.ToArray());
172+
vectorSetKeysToIgnore.ContainsKey(spanByte.ToArray())) ||
173+
#endif
174+
(rangeIndexKeysToIgnore.Count > 0 &&
175+
#if NET9_0_OR_GREATER
176+
rangeIndexIgnoreLookup.ContainsKey(spanByte.ReadOnlySpan));
177+
#else
178+
rangeIndexKeysToIgnore.ContainsKey(spanByte.ToArray()));
164179
#endif
165180

166-
if (isVectorSet)
167-
{
181+
if (shouldSkip)
168182
continue;
169-
}
170183

171184
if (!await session.WriteOrSendRecordAsync(gcs, localServerSession, keys[i].Item1, ref input, ref output, out var status).ConfigureAwait(false))
172185
return false;
@@ -285,6 +298,19 @@ public void DeleteVectorSet(PinnedSpanByte key)
285298

286299
session.logger?.LogDebug("Deleting Vector Set {key} after migration: {delRes}", System.Text.Encoding.UTF8.GetString(key), delRes);
287300
}
301+
302+
/// <summary>
303+
/// Delete a RangeIndex after migration if _copyOption is not set.
304+
/// </summary>
305+
public void DeleteRangeIndex(PinnedSpanByte key)
306+
{
307+
if (session._copyOption)
308+
return;
309+
310+
var delRes = localServerSession.BasicGarnetApi.DELETE(key);
311+
312+
session.logger?.LogDebug("Deleting RangeIndex {key} after migration: {delRes}", System.Text.Encoding.UTF8.GetString(key), delRes);
313+
}
288314
}
289315
}
290316
}

libs/cluster/Server/Migration/MigrateScanFunctions.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,12 @@ public bool Reader<TSourceLogRecord>(in TSourceLogRecord srcLogRecord, RecordMet
5050
// Check if key belongs to slot that is being migrated and if it can be added to our buffer
5151
if (migrateOperation.Contains(slot))
5252
{
53-
if (srcLogRecord.RecordType == VectorManager.RecordType)
53+
if (srcLogRecord.RecordType == RangeIndexManager.RangeIndexRecordType)
54+
{
55+
// RangeIndex keys need out-of-band migration (snapshot + chunks)
56+
migrateOperation.EncounteredRangeIndex(key.ToArray(), srcLogRecord.ValueSpan.ToArray());
57+
}
58+
else if (srcLogRecord.RecordType == VectorManager.RecordType)
5459
{
5560
// We can't delete the vector set _yet_ nor can we migrate it,
5661
// we just need to remember it to migrate once the associated namespaces are all moved over
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT license.
3+
4+
using System;
5+
using System.Buffers;
6+
using System.Collections.Generic;
7+
using System.Threading.Tasks;
8+
using Garnet.client;
9+
using Garnet.server;
10+
using Microsoft.Extensions.Logging;
11+
using Tsavorite.core;
12+
13+
namespace Garnet.cluster
14+
{
15+
/// <summary>
16+
/// RangeIndex migration support: source-side transmit driver.
17+
/// </summary>
18+
internal sealed partial class MigrateSession : IDisposable
19+
{
20+
/// <summary>
21+
/// Transmit a single RangeIndex key to the destination node.
22+
/// Uses <see cref="RangeIndexManager.SnapshotRangeIndexAndCreateReader"/> to obtain an async
23+
/// migration reader that snapshots and streams the BfTree data.
24+
/// Forces a flush and awaits ACK. Does not delete the source key — the caller
25+
/// is responsible for deletion in the appropriate sketch status phase.
26+
/// </summary>
27+
private async Task<bool> TransmitRangeIndexAsync(MigrateOperation mo, byte[] keyBytes, byte[] stubBytes, int chunkSize = RangeIndexManager.DefaultMigrationChunkSize)
28+
{
29+
var rangeIndexManager = clusterProvider.storeWrapper.DefaultDatabase.RangeIndexManager;
30+
var gcs = mo.Client;
31+
32+
RangeIndexMigrationReader reader;
33+
try
34+
{
35+
reader = rangeIndexManager.SnapshotRangeIndexAndCreateReader(mo.LocalSession, keyBytes, stubBytes, chunkSize);
36+
}
37+
catch (Exception ex)
38+
{
39+
logger?.LogError(ex, "TransmitRangeIndex: failed to snapshot BfTree for key");
40+
return false;
41+
}
42+
43+
var buffer = ArrayPool<byte>.Shared.Rent(chunkSize);
44+
try
45+
{
46+
using (reader)
47+
{
48+
while (!reader.IsComplete)
49+
{
50+
var payloadLen = await reader.ReadNextChunkAsync(buffer).ConfigureAwait(false);
51+
if (payloadLen == 0)
52+
{
53+
logger?.LogError("TransmitRangeIndex: reader returned zero-length payload with a {Size}-byte buffer", chunkSize);
54+
return false;
55+
}
56+
57+
if (!await WriteOrSendRecordSpanAsync(gcs, MigrationRecordSpanType.SerializedRangeIndexStream, buffer.AsSpan(0, payloadLen)).ConfigureAwait(false))
58+
{
59+
logger?.LogError("TransmitRangeIndex: failed to write chunk");
60+
return false;
61+
}
62+
}
63+
64+
// Force flush and await ACK
65+
if (!await HandleMigrateTaskResponseAsync(gcs.SendAndResetIterationBuffer()).ConfigureAwait(false))
66+
{
67+
logger?.LogError("TransmitRangeIndex: flush failed");
68+
return false;
69+
}
70+
71+
return true;
72+
}
73+
}
74+
catch (Exception ex)
75+
{
76+
logger?.LogError(ex, "TransmitRangeIndex: unexpected error during transmission");
77+
return false;
78+
}
79+
finally
80+
{
81+
ArrayPool<byte>.Shared.Return(buffer);
82+
}
83+
}
84+
85+
/// <summary>
86+
/// Migrate a batch of RangeIndex keys with sketch protection.
87+
/// Adds all keys to the sketch, transitions through TRANSMITTING → DELETING → MIGRATED
88+
/// with epoch barriers, ensuring concurrent operations are properly gated.
89+
/// </summary>
90+
private async Task<bool> MigrateRangeIndexKeysAsync(MigrateOperation mo, Dictionary<byte[], byte[]> rangeIndexKeys)
91+
{
92+
logger?.LogWarning("Migrating {count} RangeIndex keys", rangeIndexKeys.Count);
93+
94+
// Add all RI keys to sketch during INITIALIZING (no gating yet)
95+
mo.sketch.Clear();
96+
mo.sketch.SetStatus(SketchStatus.INITIALIZING);
97+
foreach (var (key, _) in rangeIndexKeys)
98+
mo.sketch.TryHashAndStore(key);
99+
100+
// Block writes during snapshot + transmit
101+
mo.sketch.SetStatus(SketchStatus.TRANSMITTING);
102+
await WaitForConfigPropagationAsync().ConfigureAwait(false);
103+
104+
try
105+
{
106+
foreach (var (key, stubBytes) in rangeIndexKeys)
107+
{
108+
if (!await TransmitRangeIndexAsync(mo, key, stubBytes).ConfigureAwait(false))
109+
{
110+
logger?.LogError("Failed to migrate RangeIndex key");
111+
return false;
112+
}
113+
}
114+
115+
// Block reads + writes during delete
116+
mo.sketch.SetStatus(SketchStatus.DELETING);
117+
await WaitForConfigPropagationAsync().ConfigureAwait(false);
118+
119+
foreach (var (key, _) in rangeIndexKeys)
120+
{
121+
var pinnedKey = PinnedSpanByte.FromPinnedSpan(key);
122+
mo.DeleteRangeIndex(pinnedKey);
123+
}
124+
125+
return true;
126+
}
127+
finally
128+
{
129+
// Always clean up the sketch, even on failure, to unblock client operations
130+
mo.sketch.Clear();
131+
}
132+
}
133+
}
134+
}

libs/cluster/Server/Migration/MigrateSessionKeys.cs

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,13 @@ private async Task<bool> MigrateKeysFromStoreAsync()
3535
await WaitForConfigPropagationAsync().ConfigureAwait(false);
3636

3737
// Discover Vector Sets linked namespaces
38+
var allKeys = migrateTask.sketch.Keys.Select(t => t.Item1.ToArray());
3839
var indexesToMigrate = new Dictionary<byte[], byte[]>(ByteArrayComparer.Instance);
39-
_namespaces = clusterProvider.storeWrapper.DefaultDatabase.VectorManager.GetNamespacesForKeys(clusterProvider.storeWrapper, migrateTask.sketch.Keys.Select(t => t.Item1.ToArray()), indexesToMigrate);
40+
_namespaces = clusterProvider.storeWrapper.DefaultDatabase.VectorManager.GetNamespacesForKeys(clusterProvider.storeWrapper, allKeys, indexesToMigrate);
41+
42+
// Discover RangeIndex keys upfront
43+
var rangeIndexKeysToMigrate = clusterProvider.storeWrapper.DefaultDatabase.RangeIndexManager?.GetRangeIndexKeysForMigration(clusterProvider.storeWrapper, allKeys)
44+
?? new Dictionary<byte[], byte[]>(ByteArrayComparer.Instance);
4045

4146
// If we have any namespaces, that implies Vector Sets, and if we have any of THOSE
4247
// we need to reserve destination sets on the other side
@@ -46,8 +51,8 @@ private async Task<bool> MigrateKeysFromStoreAsync()
4651
return false;
4752
}
4853

49-
// Transmit keys from store
50-
if (!await migrateTask.TransmitKeysAsync(indexesToMigrate).ConfigureAwait(false))
54+
// Transmit keys from store (skipping VectorSet and RangeIndex keys, which are handled out-of-band)
55+
if (!await migrateTask.TransmitKeysAsync(indexesToMigrate, rangeIndexKeysToMigrate).ConfigureAwait(false))
5156
{
5257
logger?.LogError("Failed transmitting keys from store");
5358
return false;
@@ -122,6 +127,33 @@ private async Task<bool> MigrateKeysFromStoreAsync()
122127
return false;
123128
}
124129
}
130+
131+
// Migrate RangeIndex keys (snapshot + chunk stream).
132+
// Keys are already in the sketch (added by caller during key enumeration),
133+
// so they're protected by the TRANSMITTING status. Mark for deletion so
134+
// DeleteKeysAsync() handles them in the DELETING sketch status sequence.
135+
if (rangeIndexKeysToMigrate.Count > 0)
136+
{
137+
logger?.LogWarning("Migrating {count} RangeIndex keys via KEYS path", rangeIndexKeysToMigrate.Count);
138+
139+
foreach (var (key, stubBytes) in rangeIndexKeysToMigrate)
140+
{
141+
if (!await TransmitRangeIndexAsync(migrateTask, key, stubBytes).ConfigureAwait(false))
142+
{
143+
logger?.LogError("Failed to migrate RangeIndex key via KEYS path");
144+
return false;
145+
}
146+
}
147+
148+
// Mark all transmitted RI keys in the sketch for deletion by DeleteKeysAsync()
149+
var keys = migrateTask.sketch.Keys;
150+
for (var i = 0; i < keys.Count; i++)
151+
{
152+
if (rangeIndexKeysToMigrate.ContainsKey(keys[i].Item1.ToArray()))
153+
keys[i] = (keys[i].Item1, true);
154+
}
155+
}
156+
125157
// Final cleanup, which will also delete Vector Sets
126158
await DeleteKeysAsync().ConfigureAwait(false);
127159
}

libs/cluster/Server/Migration/MigrateSessionSlots.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,20 @@ async Task<bool> CreateAndRunMigrateTasksAsync(long beginAddress, long tailAddre
114114
return false;
115115
}
116116

117+
// Handle migration of discovered RangeIndex keys with sketch protection.
118+
// All RI keys are added to the sketch in one batch, then a single epoch
119+
// barrier per phase gates concurrent operations.
120+
var rangeIndexKeys = migrateOperation.SelectMany(static mo => mo.RangeIndexes).GroupBy(static g => g.Key, ByteArrayComparer.Instance).ToDictionary(static g => g.Key, g => g.First().Value, ByteArrayComparer.Instance);
121+
122+
if (rangeIndexKeys.Count > 0)
123+
{
124+
if (!await MigrateRangeIndexKeysAsync(migrateOperation[0], rangeIndexKeys).ConfigureAwait(false))
125+
return false;
126+
}
127+
128+
// TODO: Vector Set index keys are excluded from the sketch during scan (MigrateScanFunctions.cs),
129+
// so concurrent writes are not blocked during transmit and reads are not blocked during delete.
130+
// This should be fixed with the same sketch-protected batch pattern used for RangeIndex keys above.
117131
// Handle migration of discovered Vector Set keys now that they're namespaces have been moved
118132
var vectorSets = migrateOperation.SelectMany(static mo => mo.VectorSets).GroupBy(static g => g.Key, ByteArrayComparer.Instance).ToDictionary(static g => g.Key, g => g.First().Value, ByteArrayComparer.Instance);
119133

libs/cluster/Session/ClusterSession.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ internal sealed partial class ClusterSession : IClusterSession
5252

5353
private StringBasicContext stringBasicContext;
5454
private VectorBasicContext vectorBasicContext;
55+
private readonly RangeIndexMigrationReceiveState rangeIndexMigrationState;
5556

5657
public ClusterSession(ClusterProvider clusterProvider, TransactionManager txnManager, IGarnetAuthenticator authenticator, UserHandle userHandle, GarnetSessionMetrics sessionMetrics, BasicGarnetApi basicGarnetApi, StringBasicContext stringBasicContext, VectorBasicContext vectorBasicContext, INetworkSender networkSender, ILogger logger = null)
5758
{
@@ -65,6 +66,8 @@ public ClusterSession(ClusterProvider clusterProvider, TransactionManager txnMan
6566
this.vectorBasicContext = vectorBasicContext;
6667
this.networkSender = networkSender;
6768
this.logger = logger;
69+
var riManager = clusterProvider.storeWrapper.DefaultDatabase.RangeIndexManager;
70+
rangeIndexMigrationState = riManager != null ? new RangeIndexMigrationReceiveState(riManager) : null;
6871
}
6972

7073
public unsafe void ProcessClusterCommands(RespCommand command, VectorManager vectorManager, ref SessionParseState parseState, ref byte* dcurr, ref byte* dend)
@@ -197,6 +200,8 @@ public void UnsafeSetConfig(string replicaOf = null)
197200

198201
public void Dispose()
199202
{
203+
rangeIndexMigrationState?.Dispose();
204+
200205
// Call dispose on ref of this session if this session is a replication task
201206
if (IsReplicating)
202207
replicaReplayDriverStore?.Dispose();

0 commit comments

Comments
 (0)