Skip to content

Commit 9e45701

Browse files
Tiago NapoliCopilot
andcommitted
Align Migration.cs with current main APIs and update migration docs
- Update RangeIndexManager.Migration.cs for 7 API changes from main: dataDir->riLogRoot, liveIndexes key nint->Guid, SnapshotToFile->CPR pattern, evicted tree paths, RecoverFromCprSnapshot, LogDataPathFor, stub.ResetFlags() - Document TRYAGAIN behavior for RI commands during migration instead of spin-wait, with rationale (pipeline stall, client timeout, and connection reset risks) - Add Redis protocol context and SE.Redis client handling details Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 72f03d3 commit 9e45701

2 files changed

Lines changed: 108 additions & 41 deletions

File tree

libs/server/Resp/RangeIndex/RangeIndexManager.Migration.cs

Lines changed: 32 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using System.IO;
77
using System.Runtime.CompilerServices;
88
using System.Runtime.InteropServices;
9+
using System.Threading;
910
using Garnet.common;
1011
using Garnet.server.BfTreeInterop;
1112
using Microsoft.Extensions.Logging;
@@ -82,11 +83,11 @@ internal RangeIndexMigrationReader SnapshotRangeIndexAndCreateReader(LocalServer
8283

8384
/// <summary>
8485
/// Derive a temporary file path for an in-progress inbound migration.
85-
/// Format: {dataDir}/rangeindex/.migration-tmp/{guid}.bftree
86+
/// Format: {riLogRoot}/.migration-tmp/{guid}.bftree
8687
/// </summary>
8788
internal string DeriveTempMigrationPath()
8889
{
89-
var tmpDir = Path.Combine(dataDir ?? string.Empty, "rangeindex", ".migration-tmp");
90+
var tmpDir = Path.Combine(riLogRoot ?? string.Empty, ".migration-tmp");
9091
Directory.CreateDirectory(tmpDir);
9192
return Path.Combine(tmpDir, $"{Guid.NewGuid():N}.bftree");
9293
}
@@ -99,7 +100,7 @@ internal unsafe bool PublishMigratedIndex(ReadOnlySpan<byte> keyBytes, ReadOnlyS
99100
{
100101
try
101102
{
102-
var workingPath = DeriveWorkingPath(keyBytes);
103+
var workingPath = LogDataPathFor(keyBytes);
103104
Directory.CreateDirectory(Path.GetDirectoryName(workingPath)!);
104105

105106
// If a data file already exists (e.g., from a previous migration of the same key
@@ -111,21 +112,18 @@ internal unsafe bool PublishMigratedIndex(ReadOnlySpan<byte> keyBytes, ReadOnlyS
111112

112113
ref readonly var srcStub = ref ReadIndex(stubBytes);
113114

114-
var bfTree = BfTreeService.RecoverFromSnapshot(
115+
var scratchPath = LogScratchPathFor(keyBytes);
116+
var bfTree = BfTreeService.RecoverFromCprSnapshot(
115117
workingPath,
116-
(StorageBackendType)srcStub.StorageBackend,
117-
srcStub.CacheSize,
118-
srcStub.MinRecordSize,
119-
srcStub.MaxRecordSize,
120-
srcStub.MaxKeyLen,
121-
srcStub.LeafPageSize);
118+
scratchPath,
119+
(StorageBackendType)srcStub.StorageBackend);
122120

123121
Span<byte> newStubBytes = stackalloc byte[IndexSizeBytes];
124122
stubBytes.CopyTo(newStubBytes);
125123
ref var newStub = ref Unsafe.As<byte, RangeIndexStub>(
126124
ref MemoryMarshal.GetReference(newStubBytes));
127125
newStub.TreeHandle = bfTree.NativePtr;
128-
newStub.Flags = 0;
126+
newStub.ResetFlags();
129127
newStub.SerializationPhase = 0;
130128

131129
var parseState = new SessionParseState();
@@ -164,10 +162,10 @@ internal unsafe bool PublishMigratedIndex(ReadOnlySpan<byte> keyBytes, ReadOnlyS
164162
/// <summary>
165163
/// Source side: snapshot a BfTree for migration under an exclusive lock.
166164
/// Acquires the exclusive lock, re-reads the stub from the store to get a fresh
167-
/// <c>TreeHandle</c>. If the tree is live, snapshots it to a temporary migration
168-
/// file. If evicted, copies the existing flush/checkpoint snapshot file to a
169-
/// temporary migration file. The temp file is safe from concurrent overwrites
170-
/// by <see cref="SnapshotTreeForFlush"/> or the native BfTree.
165+
/// <c>TreeHandle</c>. If the tree is live, takes a CPR snapshot and copies
166+
/// the scratch file to a temporary migration file (following the same pattern as
167+
/// <see cref="SnapshotForFlushViaCpr"/>). If evicted, copies the working
168+
/// <c>data.bftree</c> file (same source as checkpoint pending entries).
171169
/// </summary>
172170
internal bool SnapshotForMigration(StorageSession session, ReadOnlySpan<byte> keyBytes, out string path, out long totalBytes)
173171
{
@@ -176,6 +174,7 @@ internal bool SnapshotForMigration(StorageSession session, ReadOnlySpan<byte> ke
176174

177175
Span<byte> stubSpan = stackalloc byte[IndexSizeBytes];
178176
var keyHash = session.stringBasicContext.GetKeyHash((FixedSpanByteKey)PinnedSpanByte.FromPinnedSpan(keyBytes));
177+
var hashPrefix = HashKeyToPrefix(keyBytes);
179178
var migrationPath = DeriveTempMigrationPath();
180179

181180
rangeIndexLocks.AcquireExclusiveLock(keyHash, out var lockToken);
@@ -201,26 +200,33 @@ internal bool SnapshotForMigration(StorageSession session, ReadOnlySpan<byte> ke
201200
return false;
202201
}
203202

204-
if (stub.TreeHandle != nint.Zero && liveIndexes.TryGetValue(stub.TreeHandle, out var treeEntry))
203+
var keyId = KeyId(keyBytes);
204+
if (stub.TreeHandle != nint.Zero && liveIndexes.TryGetValue(keyId, out var treeEntry) && treeEntry.Tree != null)
205205
{
206-
// Tree is live — snapshot to a temp migration file (safe from concurrent OnFlush overwrites)
207-
treeEntry.Tree.SnapshotToFile(migrationPath);
206+
// Tree is live — CPR snapshot + copy from scratch path (same pattern as SnapshotForFlushViaCpr)
207+
while (!treeEntry.TryClaimSnapshot()) Thread.Yield();
208+
try
209+
{
210+
BfTreeService.CprSnapshotByPtr(treeEntry.Tree.NativePtr);
211+
File.Copy(LogScratchPath(hashPrefix), migrationPath, overwrite: true);
212+
}
213+
finally
214+
{
215+
treeEntry.ReleaseSnapshot();
216+
}
208217
}
209218
else
210219
{
211-
// Tree was evicted — copy the existing flush/checkpoint file to a temp migration file.
212-
// Tsavorite guarantees OnFlush runs before OnEvict, so a snapshot file must exist.
213-
var existingPath = stub.IsRecovered && recoveredCheckpointToken != Guid.Empty
214-
? DeriveCheckpointPath(keyBytes, recoveredCheckpointToken)
215-
: DeriveFlushPath(keyBytes);
220+
// Tree was evicted — copy the working data.bftree file (same source as checkpoint pending entries)
221+
var dataPath = LogDataPath(hashPrefix);
216222

217-
if (!File.Exists(existingPath))
223+
if (!File.Exists(dataPath))
218224
{
219-
logger?.LogWarning("SnapshotForMigration: expected snapshot file not found: {Path}", existingPath);
225+
logger?.LogWarning("SnapshotForMigration: data.bftree not found: {Path}", dataPath);
220226
return false;
221227
}
222228

223-
File.Copy(existingPath, migrationPath, overwrite: true);
229+
File.Copy(dataPath, migrationPath, overwrite: true);
224230
}
225231
}
226232
finally

website/docs/dev/range-index-resp-api.md

Lines changed: 76 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2673,7 +2673,7 @@ and restores from the snapshot files at the expected paths.
26732673
26742674
**Problem.** During slot migration, a stub-only transfer is insufficient: the 51-byte stub
26752675
points to a process-local native `BfTree` whose on-disk data file lives outside Tsavorite
2676-
(`{dataDir}/rangeindex/{key_hash}/data.bftree`). The target node has no access to that
2676+
(`{riLogRoot}/{key_hash}/data.bftree`). The target node has no access to that
26772677
file. We must ship the entire tree-data file alongside the stub, and the target must
26782678
rebuild the native `BfTree` from it before publishing a usable stub into its main store.
26792679

@@ -2695,7 +2695,7 @@ Source side (async):
26952695
TransmitRangeIndexAsync loop:
26962696
reader.ReadNextChunkAsync(buffer)
26972697
├→ fileStream.ReadAsync() — async file read
2698-
└→ serializer.MoveNext(dest, fileData, out consumed) — sync framing
2698+
└→ serializer.SupplyFileData(buffer) + serializer.MoveNext(dest) — sync framing
26992699
WriteOrSendRecordSpanAsync() — send over network
27002700
27012701
Destination side (sync):
@@ -2757,21 +2757,24 @@ The stream format across one or more chunks:
27572757
3. **Snapshot details.** `SnapshotForMigration` (in `RangeIndexManager.Migration.cs`):
27582758
- Acquires the per-key exclusive lock.
27592759
- Re-reads the stub from the main store (`RIGET`) under the lock to get a fresh `TreeHandle`.
2760-
- If the tree is live (`TreeHandle ≠ 0`, in `liveIndexes`): snapshots to
2761-
`{dataDir}/rangeindex/.migration-tmp/{guid}.bftree`.
2762-
- If the tree was evicted: copies `flush.bftree` or `snapshot.{token}.bftree` to the
2763-
same temp directory.
2760+
- If the tree is live (`TreeHandle ≠ 0`, in `liveIndexes`): takes a CPR snapshot
2761+
(`CprSnapshotByPtr`) and copies the scratch file to
2762+
`{riLogRoot}/.migration-tmp/{guid}.bftree`, using `TryClaimSnapshot`/`ReleaseSnapshot`
2763+
to serialize against concurrent flush/checkpoint snapshots (same pattern as
2764+
`SnapshotForFlushViaCpr`).
2765+
- If the tree was evicted: copies the working `data.bftree` file (same source as
2766+
checkpoint pending entries) to the same temp directory.
27642767
- Releases the exclusive lock.
27652768

27662769
The temp file has a unique GUID name, protecting it from concurrent `OnFlush` overwrites,
27672770
checkpoint operations, and other migrations.
27682771

27692772
4. **Serializer architecture.** `RangeIndexChunkedSerializer` is a pure state machine with
2770-
phases: `KeyHeader → KeyData → FileHeader → FileData → Trailer → Done`. It takes file
2771-
data as input via `MoveNext(dest, fileData, out consumed)` rather than reading files
2772-
itself. Properties `NeedsFileData` and `FileDataRemaining` tell the caller when to supply
2773-
file bytes. The `RangeIndexMigrationReader` wrapper handles async file reads and loops
2774-
to handle phase transitions within a single call.
2773+
phases: `KeyHeader → KeyData → FileHeader → FileData → Trailer → Done`. File data is
2774+
supplied via `SupplyFileData(Memory<byte>)` rather than passed directly to `MoveNext`.
2775+
The `NeedsFileData` property tells the caller when to supply more file bytes. The
2776+
`RangeIndexMigrationReader` wrapper handles async file reads and loops to handle phase
2777+
transitions within a single call.
27752778

27762779
#### Source side — KEYS path
27772780

@@ -2797,21 +2800,79 @@ receiving a stream, only `SerializedRangeIndexStream` records are accepted.
27972800
`RangeIndexChunkedDeserializer` is a state machine that:
27982801
1. Parses the key length header and accumulates key bytes.
27992802
2. Parses the file size header.
2800-
3. Writes file bytes to a temp file (`{dataDir}/rangeindex/.migration-tmp/{guid}.bftree`),
2803+
3. Writes file bytes to a temp file (`{riLogRoot}/.migration-tmp/{guid}.bftree`),
28012804
updating an xxHash64 incrementally.
28022805
4. Parses the trailer: validates checksum, extracts stub.
28032806

28042807
On completion, `Publish()`:
2805-
1. Moves the temp file to the working path (`{dataDir}/rangeindex/{key_hash}/data.bftree`).
2808+
1. Moves the temp file to the working path (`{riLogRoot}/{key_hash}/data.bftree`).
28062809
If a data file already exists (e.g., from a previous migration of the same key), it is
28072810
deleted first — this enables round-trip migration (P0→P1→P0).
2808-
2. Calls `BfTreeService.RecoverFromSnapshot()` to load the native tree.
2809-
3. Rewrites the stub: `TreeHandle = bfTree.NativePtr`, clears `Flags` and `SerializationPhase`.
2811+
2. Calls `BfTreeService.RecoverFromCprSnapshot()` to load the native tree.
2812+
3. Rewrites the stub: `TreeHandle = bfTree.NativePtr`, calls `ResetFlags()` and clears `SerializationPhase`.
28102813
4. Publishes via `RICREATE` RMW into the main store.
28112814
5. Registers the tree via `RegisterIndex()`.
28122815

28132816
On disposal (error or abort), the temp file is deleted.
28142817

2818+
#### Write protection during migration
2819+
2820+
RI commands (`RI.SET`, `RI.CREATE`, `RI.DEL`, `RI.GET`, `RI.SCAN`, etc.) are classified as
2821+
data commands and have `KeySpecifications` in the command metadata JSON. This means the
2822+
generic slot verification layer (`CanServeSlot``NetworkMultiKeySlotVerify`) automatically
2823+
extracts the key, hashes it to a slot, and checks migration state — no special code is
2824+
needed inside the RI command handlers.
2825+
2826+
When a slot is in `MIGRATING` state, `CanOperateOnKey` calls
2827+
`migrationManager.CanAccessKey(key, slot, readOnly)`, which probes the sketch. Behavior
2828+
depends on the sketch status:
2829+
2830+
| Sketch Status | Read commands (RI.GET, RI.SCAN…) | Write commands (RI.SET, RI.CREATE…) |
2831+
|---|---|---|
2832+
| `INITIALIZING` | ✅ Allowed | ✅ Allowed |
2833+
| `TRANSMITTING` | ✅ Allowed |**Returns `-TRYAGAIN`** |
2834+
| `DELETING` | ❌ Returns `-TRYAGAIN` | ❌ Returns `-TRYAGAIN` |
2835+
| `MIGRATED` | ✅ Allowed | ✅ Allowed |
2836+
2837+
During the `TRANSMITTING` phase — while the CPR snapshot is being taken and streamed —
2838+
write commands return a `-TRYAGAIN` error immediately, signaling the client to retry later.
2839+
Read commands remain unblocked during transmission. During the `DELETING` phase, both reads
2840+
and writes return `-TRYAGAIN`.
2841+
2842+
**Why TRYAGAIN instead of spin-wait.** The default Garnet behavior for regular keys during
2843+
migration is to spin-wait (`Thread.Yield()` in a loop). This is acceptable for regular keys
2844+
because individual key transmission is fast. However, RangeIndex migration can take
2845+
significantly longer — the BfTree must be snapshotted via CPR and streamed as chunked data.
2846+
A long spin-wait has cascading effects on multiplexed clients like StackExchange.Redis:
2847+
2848+
- SE.Redis uses a single TCP connection with pipelined commands in FIFO order.
2849+
- If the server spin-waits on command A, **all subsequent pipelined commands on that
2850+
connection also stall** — even for unrelated keys.
2851+
- SE.Redis has a per-connection `SyncTimeout` (default 5s) / `AsyncTimeout`. Commands
2852+
exceeding the timeout are faulted with `TimeoutException` on the client side, but the
2853+
server-side command **continues executing** (fire-and-forget after timeout).
2854+
- If nothing is read from the pipe for 4× the timeout (~20s), SE.Redis detects a "dead
2855+
socket" and **kills the connection entirely**, triggering a reconnect.
2856+
2857+
Returning `-TRYAGAIN` avoids these problems: the response is immediate, the pipeline stays
2858+
unblocked, and the client can implement retry logic with backoff.
2859+
2860+
> **TRYAGAIN in the Redis protocol.** `-TRYAGAIN` is an official Redis cluster error
2861+
> (see `cluster.c:1466`). Redis uses it for multi-key commands during migration when some
2862+
> keys exist locally and some have already migrated — the command cannot be served atomically.
2863+
> Our use extends this semantics: the key exists, but it cannot be served right now because
2864+
> it is being serialized for migration. The error message and retry semantics are the same.
2865+
>
2866+
> **Client handling.** SE.Redis does not auto-retry on TRYAGAIN — it surfaces as a
2867+
> `RedisServerException("TRYAGAIN ...")` that the application must catch and retry manually.
2868+
> This is consistent with Redis behavior for TRYAGAIN (no client auto-retries it).
2869+
>
2870+
> **Comparison with Redis.** Redis is single-threaded, so migration and client commands are
2871+
> naturally serialized — no spin-wait or TRYAGAIN is needed for single-key operations.
2872+
> When a slot is `MIGRATING`, Redis serves the request if the key exists locally and returns
2873+
> `-ASK` redirect if it doesn't. Redis only returns `-TRYAGAIN` for multi-key commands where
2874+
> some keys have already migrated.
2875+
28152876
#### Failure modes
28162877

28172878
- **Transmit failure.** `TransmitRangeIndexAsync` catches all exceptions and returns `false`.

0 commit comments

Comments
 (0)