From ef28029d3487466f0166fd24115bf46ee04817e4 Mon Sep 17 00:00:00 2001 From: Kiran Muthabatulla Date: Fri, 3 Apr 2026 10:27:23 -0700 Subject: [PATCH] Fix transaction handling and slot validation logic --- libs/server/Transaction/TransactionManager.cs | 9 +- .../server/Transaction/TxnClusterSlotCheck.cs | 29 ++++- libs/server/Transaction/TxnRespCommands.cs | 11 +- .../PerfProbe/PerfProbe.csproj | 10 ++ .../_perf_probe_txn/PerfProbe/Program.cs | 115 ++++++++++++++++++ 5 files changed, 160 insertions(+), 14 deletions(-) create mode 100644 playground/_perf_probe_txn/PerfProbe/PerfProbe.csproj create mode 100644 playground/_perf_probe_txn/PerfProbe/Program.cs diff --git a/libs/server/Transaction/TransactionManager.cs b/libs/server/Transaction/TransactionManager.cs index f53e21d7f33..fb822ba8799 100644 --- a/libs/server/Transaction/TransactionManager.cs +++ b/libs/server/Transaction/TransactionManager.cs @@ -195,6 +195,7 @@ internal void Reset(bool isRunning) { txnKeysParseState.Count = 0; saveKeyRecvBufferPtr = null; + firstKeyInCurrentRecvBuffer = 0; txnScratchBufferAllocator.Reset(); } } @@ -339,12 +340,8 @@ internal void AddTransactionStoreType(StoreType storeType) internal void GetSlotVerificationInput(byte* recvBufferPtr, byte sessionAsking, out ClusterSlotVerificationInput clusterSlotVerificationInput) { - // Copy keys if buffer changed since last queued command - if (recvBufferPtr != saveKeyRecvBufferPtr) - { - CopyExistingKeysToScratchBuffer(); - saveKeyRecvBufferPtr = recvBufferPtr; - } + // Materialize only keys captured from the previous receive buffer when it changes. + OnRecvBufferChanged(recvBufferPtr); watchContainer.SaveKeysToKeyList(this); clusterSlotVerificationInput = new ClusterSlotVerificationInput diff --git a/libs/server/Transaction/TxnClusterSlotCheck.cs b/libs/server/Transaction/TxnClusterSlotCheck.cs index 21d7f2d00f4..9bdb11f4b11 100644 --- a/libs/server/Transaction/TxnClusterSlotCheck.cs +++ b/libs/server/Transaction/TxnClusterSlotCheck.cs @@ -10,6 +10,27 @@ sealed unsafe partial class TransactionManager { readonly bool clusterEnabled; internal byte* saveKeyRecvBufferPtr; + int firstKeyInCurrentRecvBuffer; + + public void BeginKeyTrackingForCurrentBuffer(byte* recvBufferPtr) + { + if (!clusterEnabled) return; + + saveKeyRecvBufferPtr = recvBufferPtr; + firstKeyInCurrentRecvBuffer = txnKeysParseState.Count; + } + + public void OnRecvBufferChanged(byte* recvBufferPtr) + { + if (!clusterEnabled || recvBufferPtr == saveKeyRecvBufferPtr) + return; + + Debug.Assert(firstKeyInCurrentRecvBuffer <= txnKeysParseState.Count); + + CopyKeysToScratchBuffer(firstKeyInCurrentRecvBuffer); + firstKeyInCurrentRecvBuffer = txnKeysParseState.Count; + saveKeyRecvBufferPtr = recvBufferPtr; + } /// /// Keep track of actual key accessed by command @@ -37,7 +58,13 @@ public void CopyExistingKeysToScratchBuffer() { Debug.Assert(clusterEnabled); - for (var i = 0; i < txnKeysParseState.Count; i++) + CopyKeysToScratchBuffer(0); + firstKeyInCurrentRecvBuffer = txnKeysParseState.Count; + } + + void CopyKeysToScratchBuffer(int startIndex) + { + for (var i = startIndex; i < txnKeysParseState.Count; i++) { ref var key = ref txnKeysParseState.GetArgSliceByRef(i); key = txnScratchBufferAllocator.CreateArgSlice(key.ReadOnlySpan); diff --git a/libs/server/Transaction/TxnRespCommands.cs b/libs/server/Transaction/TxnRespCommands.cs index 1483b4ec303..35a730e93cf 100644 --- a/libs/server/Transaction/TxnRespCommands.cs +++ b/libs/server/Transaction/TxnRespCommands.cs @@ -29,8 +29,8 @@ private bool NetworkMULTI() txnManager.txnStartHead = readHead; txnManager.state = TxnState.Started; txnManager.operationCntTxn = 0; - // Track receive buffer ptr for key pointer adjustment at EXEC time - txnManager.saveKeyRecvBufferPtr = recvBufferPtr; + // Track receive buffer for incremental key materialization during queuing + txnManager.BeginKeyTrackingForCurrentBuffer(recvBufferPtr); while (!RespWriteUtils.TryWriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) SendAndReset(); @@ -184,11 +184,8 @@ private bool NetworkSKIP(RespCommand cmd) return true; } - if (clusterSession != null && recvBufferPtr != txnManager.saveKeyRecvBufferPtr) - { - txnManager.CopyExistingKeysToScratchBuffer(); - txnManager.saveKeyRecvBufferPtr = recvBufferPtr; - } + if (clusterSession != null) + txnManager.OnRecvBufferChanged(recvBufferPtr); txnManager.LockKeys(commandInfo); diff --git a/playground/_perf_probe_txn/PerfProbe/PerfProbe.csproj b/playground/_perf_probe_txn/PerfProbe/PerfProbe.csproj new file mode 100644 index 00000000000..2f6f13cf038 --- /dev/null +++ b/playground/_perf_probe_txn/PerfProbe/PerfProbe.csproj @@ -0,0 +1,10 @@ + + + + Exe + net10.0 + disable + enable + + + diff --git a/playground/_perf_probe_txn/PerfProbe/Program.cs b/playground/_perf_probe_txn/PerfProbe/Program.cs new file mode 100644 index 00000000000..b5b37c4c374 --- /dev/null +++ b/playground/_perf_probe_txn/PerfProbe/Program.cs @@ -0,0 +1,115 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; + +const int WarmupIterations = 5; +const int MeasureIterations = 15; + +var scenarios = new[] +{ + new Scenario("Typical", 5000, 32, 32), + new Scenario("Stress", 20000, 48, 8), +}; + +Console.WriteLine("Txn key-copy perf probe (old vs new)"); +Console.WriteLine(); + +foreach (var scenario in scenarios) +{ + var oldStats = Measure(scenario, RunOld); + var newStats = Measure(scenario, RunNew); + + var speedup = oldStats.MeanMs / newStats.MeanMs; + var allocReduction = 100.0 * (oldStats.MeanAllocBytes - newStats.MeanAllocBytes) / oldStats.MeanAllocBytes; + + Console.WriteLine($"Scenario: {scenario.Name}"); + Console.WriteLine($" Old : {oldStats.MeanMs,8:F2} ms, alloc {oldStats.MeanAllocBytes / (1024.0 * 1024.0),8:F2} MB"); + Console.WriteLine($" New : {newStats.MeanMs,8:F2} ms, alloc {newStats.MeanAllocBytes / (1024.0 * 1024.0),8:F2} MB"); + Console.WriteLine($" Delta: {speedup,8:F2}x faster, alloc {allocReduction,8:F2}% lower"); + Console.WriteLine(); +} + +static Stats Measure(Scenario scenario, Action run) +{ + for (var i = 0; i < WarmupIterations; i++) + run(scenario); + + var totalMs = 0.0; + var totalAlloc = 0L; + + for (var i = 0; i < MeasureIterations; i++) + { + GC.Collect(); + GC.WaitForPendingFinalizers(); + GC.Collect(); + + var allocStart = GC.GetAllocatedBytesForCurrentThread(); + var sw = Stopwatch.StartNew(); + run(scenario); + sw.Stop(); + var allocEnd = GC.GetAllocatedBytesForCurrentThread(); + + totalMs += sw.Elapsed.TotalMilliseconds; + totalAlloc += allocEnd - allocStart; + } + + return new Stats(totalMs / MeasureIterations, totalAlloc / MeasureIterations); +} + +static void RunOld(Scenario s) +{ + var txnKeys = new List(s.TotalKeys); + var scratch = new List(s.TotalKeys * 4); + + for (var i = 0; i < s.TotalKeys; i++) + { + if (i > 0 && i % s.BufferSwitchEvery == 0) + { + // Old behavior: copy all tracked keys each time receive buffer changes. + for (var k = 0; k < txnKeys.Count; k++) + { + var src = txnKeys[k]; + var dst = new byte[src.Length]; + Buffer.BlockCopy(src, 0, dst, 0, src.Length); + scratch.Add(dst); + txnKeys[k] = dst; + } + } + + var key = new byte[s.KeySize]; + key[0] = (byte)i; + txnKeys.Add(key); + } +} + +static void RunNew(Scenario s) +{ + var txnKeys = new List(s.TotalKeys); + var scratch = new List(s.TotalKeys * 2); + var firstKeyInCurrentBuffer = 0; + + for (var i = 0; i < s.TotalKeys; i++) + { + if (i > 0 && i % s.BufferSwitchEvery == 0) + { + // New behavior: copy only keys from the previous receive buffer segment. + for (var k = firstKeyInCurrentBuffer; k < txnKeys.Count; k++) + { + var src = txnKeys[k]; + var dst = new byte[src.Length]; + Buffer.BlockCopy(src, 0, dst, 0, src.Length); + scratch.Add(dst); + txnKeys[k] = dst; + } + + firstKeyInCurrentBuffer = txnKeys.Count; + } + + var key = new byte[s.KeySize]; + key[0] = (byte)i; + txnKeys.Add(key); + } +} + +readonly record struct Scenario(string Name, int TotalKeys, int KeySize, int BufferSwitchEvery); +readonly record struct Stats(double MeanMs, long MeanAllocBytes);