diff --git a/libs/resources/RespCommandsDocs.json b/libs/resources/RespCommandsDocs.json index bf42982faa7..9978e660d44 100644 --- a/libs/resources/RespCommandsDocs.json +++ b/libs/resources/RespCommandsDocs.json @@ -1216,6 +1216,43 @@ ] } ] + }, + { + "Command": "CLIENT_REPLY", + "Name": "CLIENT|REPLY", + "Summary": "Instructs the server whether to reply to commands.", + "Group": "Connection", + "Complexity": "O(1)", + "Arguments": [ + { + "TypeDiscriminator": "RespCommandContainerArgument", + "Name": "ON-OFF-SKIP", + "Type": "OneOf", + "Arguments": [ + { + "TypeDiscriminator": "RespCommandBasicArgument", + "Name": "ON", + "DisplayText": "on", + "Type": "PureToken", + "Token": "ON" + }, + { + "TypeDiscriminator": "RespCommandBasicArgument", + "Name": "OFF", + "DisplayText": "off", + "Type": "PureToken", + "Token": "OFF" + }, + { + "TypeDiscriminator": "RespCommandBasicArgument", + "Name": "SKIP", + "DisplayText": "skip", + "Type": "PureToken", + "Token": "SKIP" + } + ] + } + ] } ] }, diff --git a/libs/resources/RespCommandsInfo.json b/libs/resources/RespCommandsInfo.json index 9d52db69325..172758269ee 100644 --- a/libs/resources/RespCommandsInfo.json +++ b/libs/resources/RespCommandsInfo.json @@ -590,6 +590,13 @@ "Arity": -3, "Flags": "Admin, Loading, NoScript, Stale", "AclCategories": "Admin, Connection, Dangerous, Slow" + }, + { + "Command": "CLIENT_REPLY", + "Name": "CLIENT|REPLY", + "Arity": 3, + "Flags": "Loading, NoScript, Stale", + "AclCategories": "Connection, Slow" } ] }, diff --git a/libs/server/Resp/ClientCommands.cs b/libs/server/Resp/ClientCommands.cs index 82ebcc966cc..d76cd436af0 100644 --- a/libs/server/Resp/ClientCommands.cs +++ b/libs/server/Resp/ClientCommands.cs @@ -11,6 +11,19 @@ namespace Garnet.server { + /// + /// Reply suppression mode controlled by the CLIENT REPLY subcommand. + /// + internal enum ClientReplyMode : byte + { + /// Normal replies are sent (default). + On = 0, + /// All replies are suppressed until a CLIENT REPLY ON is received. + Off, + /// The reply for the next command is suppressed; mode returns to after. + Skip, + } + /// /// Server session for RESP protocol - client commands are in this file /// @@ -640,5 +653,49 @@ private bool NetworkCLIENTUNBLOCK() return true; } + + /// + /// CLIENT REPLY ON|OFF|SKIP — controls per-connection reply suppression. + /// OFF suppresses all replies until ON; SKIP suppresses only the next command's reply. + /// The OFF and SKIP commands themselves produce no reply; ON replies with +OK. + /// + private bool NetworkCLIENTREPLY() + { + if (parseState.Count != 1) + { + return AbortWithWrongNumberOfArguments("client|reply"); + } + + var modeSpan = parseState.GetArgSliceByRef(0).ReadOnlySpan; + + if (modeSpan.EqualsUpperCaseSpanIgnoringCase(CmdStrings.ON)) + { + clientReplyMode = ClientReplyMode.On; + // The ON command itself must reply +OK even if we just transitioned out of OFF/SKIP. + // Clear the suppression flag set at command-start so the +OK actually flushes. + suppressCurrentReply = false; + while (!RespWriteUtils.TryWriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) + SendAndReset(); + } + else if (modeSpan.EqualsUpperCaseSpanIgnoringCase(CmdStrings.OFF)) + { + clientReplyMode = ClientReplyMode.Off; + // No reply. + } + else if (modeSpan.EqualsUpperCaseSpanIgnoringCase(CmdStrings.SKIP)) + { + // SKIP only arms when we're currently On. If already Off it stays Off; if already Skip it stays Skip + // (a second SKIP just re-arms — it does not stack). + if (clientReplyMode == ClientReplyMode.On) + clientReplyMode = ClientReplyMode.Skip; + // No reply. + } + else + { + return AbortWithErrorMessage(CmdStrings.RESP_SYNTAX_ERROR); + } + + return true; + } } } \ No newline at end of file diff --git a/libs/server/Resp/CmdStrings.cs b/libs/server/Resp/CmdStrings.cs index c782589528a..a0a8516a545 100644 --- a/libs/server/Resp/CmdStrings.cs +++ b/libs/server/Resp/CmdStrings.cs @@ -429,6 +429,8 @@ static partial class CmdStrings public static ReadOnlySpan GETNAME => "GETNAME"u8; public static ReadOnlySpan SETINFO => "SETINFO"u8; public static ReadOnlySpan UNBLOCK => "UNBLOCK"u8; + public static ReadOnlySpan REPLY => "REPLY"u8; + public static ReadOnlySpan SKIP => "SKIP"u8; public static ReadOnlySpan USER => "USER"u8; public static ReadOnlySpan ADDR => "ADDR"u8; public static ReadOnlySpan LADDR => "LADDR"u8; diff --git a/libs/server/Resp/Parser/RespCommand.cs b/libs/server/Resp/Parser/RespCommand.cs index 313866cbe06..08bd9905541 100644 --- a/libs/server/Resp/Parser/RespCommand.cs +++ b/libs/server/Resp/Parser/RespCommand.cs @@ -282,6 +282,7 @@ public enum RespCommand : ushort CLIENT_SETNAME, CLIENT_SETINFO, CLIENT_UNBLOCK, + CLIENT_REPLY, MONITOR, MODULE, @@ -465,6 +466,7 @@ public static class RespCommandExtensions RespCommand.CLIENT_SETNAME, RespCommand.CLIENT_SETINFO, RespCommand.CLIENT_UNBLOCK, + RespCommand.CLIENT_REPLY, // Command RespCommand.COMMAND, RespCommand.COMMAND_COUNT, @@ -2125,6 +2127,10 @@ private RespCommand SlowParseCommand(ReadOnlySpan command, ref int count, { return RespCommand.CLIENT_UNBLOCK; } + else if (subCommand.SequenceEqual(CmdStrings.REPLY)) + { + return RespCommand.CLIENT_REPLY; + } string errMsg = string.Format(CmdStrings.GenericErrUnknownSubCommandNoHelp, Encoding.UTF8.GetString(subCommand), diff --git a/libs/server/Resp/RespServerSession.cs b/libs/server/Resp/RespServerSession.cs index 93b5cd9a85d..2364872e8cb 100644 --- a/libs/server/Resp/RespServerSession.cs +++ b/libs/server/Resp/RespServerSession.cs @@ -207,6 +207,26 @@ public IGarnetServer Server /// string clientLibVersion = null; + /// + /// Current CLIENT REPLY mode for this connection (controls reply suppression). + /// + internal ClientReplyMode clientReplyMode = ClientReplyMode.On; + + /// + /// True while the current command's output should be discarded (i.e. CLIENT REPLY OFF/SKIP is active for this command). + /// Set at the start of each command in ProcessMessages and may be cleared by NetworkCLIENTREPLY when it processes ON + /// (so the +OK reply for `CLIENT REPLY ON` is allowed through). + /// + bool suppressCurrentReply = false; + + /// + /// Snapshot of dcurr at the start of the current command in ProcessMessages. + /// When is set this is the rewind floor — bytes from earlier (non-suppressed) + /// commands in the same batch live at [head, cmdReplyFloor) and must not be discarded. + /// uses this floor so a mid-command flush triggered by a suppressed write does not corrupt pipelined responses. + /// + byte* cmdReplyFloor; + /// /// Flag indicating whether any of the commands in one message /// requires us to block on AOF before sending response over the network @@ -623,6 +643,18 @@ private void ProcessMessages(ref TBasicApi basicApi, ref TTx while (bytesRead - readHead >= 4) { + // CLIENT REPLY: snapshot reply-suppression state for this command BEFORE parsing. + // - modeAtStart drives whether the buffer position we capture now (cmdStartPtr) gets + // rewound after the command runs (discarding the reply bytes). + // - suppressCurrentReply also gates SendAndReset() so any mid-command flush is + // discarded instead of being sent to the network. We set it before ParseCommand so + // parse-time error replies (e.g. "unknown command") are also gated. + // CLIENT REPLY ON clears suppressCurrentReply inside its handler so the +OK reply + // for the ON command itself is allowed through. + var modeAtStart = clientReplyMode; + cmdReplyFloor = dcurr; + suppressCurrentReply = modeAtStart != ClientReplyMode.On; + // First, parse the command, making sure we have the entire command available // We use endReadHead to track the end of the current command // On success, readHead is left at the start of the command payload for legacy operators @@ -632,6 +664,7 @@ private void ProcessMessages(ref TBasicApi basicApi, ref TTx if (!commandReceived) { endReadHead = readHead = _origReadHead; + suppressCurrentReply = false; break; } @@ -695,6 +728,30 @@ private void ProcessMessages(ref TBasicApi basicApi, ref TTx containsSlowCommand = true; } + // CLIENT REPLY: end-of-command suppression handling. Applies to INVALID/parse-error + // commands as well so things like "GET" with no args don't leak "-ERR unknown command". + // suppressCurrentReply may have been cleared by NetworkCLIENTREPLY when it + // executed `CLIENT REPLY ON` (so the +OK reply is preserved). + if (suppressCurrentReply) + { + // Discard anything this command wrote since cmdReplyFloor. SendAndReset has + // also been gated mid-command (it may have rotated the buffer if prior + // non-suppressed replies needed flushing, in which case cmdReplyFloor was + // updated to point at the new buffer head). + dcurr = cmdReplyFloor; + } + suppressCurrentReply = false; + + // Burn off a one-shot SKIP. The CLIENT REPLY command itself never burns the skip — + // a SKIP issued while already in Skip mode just re-arms (does not stack). Every + // other fully-received command — including parse-error / INVALID commands and + // unknown commands — consumes the SKIP, matching Redis semantics. Partial input + // is already handled by the !commandReceived early-break above. + if (modeAtStart == ClientReplyMode.Skip && cmd != RespCommand.CLIENT_REPLY) + { + clientReplyMode = ClientReplyMode.On; + } + // Advance read head variables to process the next command _origReadHead = readHead = endReadHead; @@ -1050,6 +1107,7 @@ private bool ProcessOtherCommands(RespCommand command, ref TGarnetAp RespCommand.CLIENT_SETNAME => NetworkCLIENTSETNAME(), RespCommand.CLIENT_SETINFO => NetworkCLIENTSETINFO(), RespCommand.CLIENT_UNBLOCK => NetworkCLIENTUNBLOCK(), + RespCommand.CLIENT_REPLY => NetworkCLIENTREPLY(), RespCommand.COMMAND => NetworkCOMMAND(), RespCommand.COMMAND_COUNT => NetworkCOMMAND_COUNT(), RespCommand.COMMAND_DOCS => NetworkCOMMAND_DOCS(), @@ -1318,6 +1376,37 @@ private unsafe bool Write(int seqNo, ref byte* dst, int length) [MethodImpl(MethodImplOptions.AggressiveInlining)] internal void SendAndReset() { + // CLIENT REPLY OFF/SKIP: discard bytes for the currently-suppressed command without + // touching any prior (non-suppressed) replies queued earlier in the same batch. + if (suppressCurrentReply) + { + byte* head = networkSender.GetResponseObjectHead(); + if (dcurr > cmdReplyFloor) + { + // Suppressed write made progress for the current command — drop just those + // bytes (the floor onward) and keep [head, cmdReplyFloor) intact. + dcurr = cmdReplyFloor; + return; + } + if (cmdReplyFloor > head) + { + // No progress on the suppressed write yet, but earlier commands' replies sit + // at [head, cmdReplyFloor). Flush those and rotate to a fresh buffer so the + // retry has full space — then continue suppressing in the new buffer. + Send(head); + networkSender.GetResponseObject(); + cmdReplyFloor = networkSender.GetResponseObjectHead(); + dcurr = cmdReplyFloor; + dend = networkSender.GetResponseObjectTail(); + return; + } + // No prior bytes and no progress: the single write is larger than the entire + // response buffer. Surface the same fatal condition as the non-suppressed path + // rather than spinning forever. + GarnetException.Throw("Failed to write to response buffer", LogLevel.Critical); + return; + } + byte* d = networkSender.GetResponseObjectHead(); if ((int)(dcurr - d) > 0) { @@ -1338,6 +1427,15 @@ internal void SendAndReset() [MethodImpl(MethodImplOptions.AggressiveInlining)] internal void SendAndReset(IMemoryOwner memory, int length) { + // CLIENT REPLY OFF/SKIP: drop the payload entirely without flushing the buffer. The + // buffer may legitimately contain earlier non-suppressed replies sitting at + // [head, cmdReplyFloor); those get flushed at end-of-batch by the normal path. + if (suppressCurrentReply) + { + memory.Dispose(); + return; + } + // Copy allocated memory to main buffer and send fixed (byte* _src = memory.Memory.Span) { diff --git a/playground/CommandInfoUpdater/SupportedCommand.cs b/playground/CommandInfoUpdater/SupportedCommand.cs index 2ec48386586..9db3d2ea31a 100644 --- a/playground/CommandInfoUpdater/SupportedCommand.cs +++ b/playground/CommandInfoUpdater/SupportedCommand.cs @@ -54,6 +54,7 @@ public class SupportedCommand new("CLIENT|SETNAME", RespCommand.CLIENT_SETNAME), new("CLIENT|SETINFO", RespCommand.CLIENT_SETINFO), new("CLIENT|UNBLOCK", RespCommand.CLIENT_UNBLOCK), + new("CLIENT|REPLY", RespCommand.CLIENT_REPLY), ]), new("CLUSTER", RespCommand.CLUSTER, StoreType.None, [ diff --git a/test/Garnet.test/ClientReplyTests.cs b/test/Garnet.test/ClientReplyTests.cs new file mode 100644 index 00000000000..bcce010dd12 --- /dev/null +++ b/test/Garnet.test/ClientReplyTests.cs @@ -0,0 +1,392 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.IO; +using System.Net.Sockets; +using System.Text; +using Allure.NUnit; +using NUnit.Framework; +using NUnit.Framework.Legacy; +using StackExchange.Redis; + +namespace Garnet.test +{ + /// + /// Tests for the CLIENT REPLY ON|OFF|SKIP subcommand. + /// Uses a raw TCP socket because StackExchange.Redis cannot tolerate reply suppression. + /// + [AllureNUnit] + [TestFixture] + public class ClientReplyTests : AllureTestBase + { + GarnetServer server; + + [SetUp] + public void Setup() + { + TestUtils.DeleteDirectory(TestUtils.MethodTestDir, wait: true); + server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir); + server.Start(); + } + + [TearDown] + public void TearDown() + { + server.Dispose(); + TestUtils.OnTearDown(); + } + + /// + /// Small helper that wraps a raw TCP connection to the test Garnet server and exposes + /// Send/TryRead so tests can verify byte-level reply suppression behaviour. + /// + sealed class RawConn : IDisposable + { + readonly TcpClient client; + readonly NetworkStream stream; + + public RawConn(int readTimeoutMs = 500) + { + var ep = (System.Net.IPEndPoint)TestUtils.EndPoint; + client = new TcpClient(); + client.Connect(ep.Address, ep.Port); + stream = client.GetStream(); + stream.ReadTimeout = readTimeoutMs; + } + + public void SendRaw(string s) + { + var bytes = Encoding.ASCII.GetBytes(s); + stream.Write(bytes, 0, bytes.Length); + stream.Flush(); + } + + /// Encode args as a RESP array of bulk strings and send. + public void SendCommand(params string[] args) + { + var sb = new StringBuilder(); + sb.Append('*').Append(args.Length).Append("\r\n"); + foreach (var a in args) + { + sb.Append('$').Append(Encoding.ASCII.GetByteCount(a)).Append("\r\n").Append(a).Append("\r\n"); + } + SendRaw(sb.ToString()); + } + + /// Returns whatever bytes are available within the read timeout, or empty string on timeout. + public string TryRead() + { + var buf = new byte[8192]; + try + { + int n = stream.Read(buf, 0, buf.Length); + return n <= 0 ? string.Empty : Encoding.ASCII.GetString(buf, 0, n); + } + catch (IOException) + { + return string.Empty; + } + } + + /// + /// Read exactly .Length bytes from the socket, one byte + /// at a time, so we never over-read and leave leftovers buffered for later assertions. + /// Returns whatever was accumulated before a read timeout or EOF. + /// + public string ReadExpected(string expected) + { + var sb = new StringBuilder(); + var one = new byte[1]; + while (sb.Length < expected.Length) + { + try + { + int n = stream.Read(one, 0, 1); + if (n <= 0) break; + sb.Append((char)one[0]); + } + catch (IOException) + { + break; + } + } + return sb.ToString(); + } + + /// + /// Read a single RESP simple-string (+...) or error (-...) reply, + /// up to and including the trailing CRLF. One byte at a time so subsequent reads + /// don't pick up any stray bytes from this reply. + /// + public string ReadSimpleReply() + { + var sb = new StringBuilder(); + var one = new byte[1]; + int prev = -1; + while (true) + { + try + { + int n = stream.Read(one, 0, 1); + if (n <= 0) break; + } + catch (IOException) + { + break; + } + sb.Append((char)one[0]); + if (prev == '\r' && one[0] == '\n') break; + prev = one[0]; + } + return sb.ToString(); + } + + public void Dispose() + { + stream?.Dispose(); + client?.Dispose(); + } + } + + [Test] + public void ClientReplyOnReturnsOK() + { + using var c = new RawConn(); + c.SendCommand("CLIENT", "REPLY", "ON"); + ClassicAssert.AreEqual("+OK\r\n", c.ReadExpected("+OK\r\n")); + } + + [Test] + public void ClientReplyOffSuppressesResponses() + { + using var c = new RawConn(); + + // OFF itself produces no reply. + c.SendCommand("CLIENT", "REPLY", "OFF"); + ClassicAssert.AreEqual(string.Empty, c.TryRead()); + + // Subsequent commands produce no replies. + c.SendCommand("PING"); + ClassicAssert.AreEqual(string.Empty, c.TryRead()); + + c.SendCommand("SET", "k", "v"); + ClassicAssert.AreEqual(string.Empty, c.TryRead()); + + c.SendCommand("GET", "k"); + ClassicAssert.AreEqual(string.Empty, c.TryRead()); + + // ON returns +OK and replies resume. + c.SendCommand("CLIENT", "REPLY", "ON"); + ClassicAssert.AreEqual("+OK\r\n", c.ReadExpected("+OK\r\n")); + + c.SendCommand("PING"); + ClassicAssert.AreEqual("+PONG\r\n", c.ReadExpected("+PONG\r\n")); + } + + [Test] + public void ClientReplySkipSuppressesOnlyNextCommand() + { + using var c = new RawConn(); + + // SKIP itself produces no reply. + c.SendCommand("CLIENT", "REPLY", "SKIP"); + ClassicAssert.AreEqual(string.Empty, c.TryRead()); + + // Next command's reply is suppressed. + c.SendCommand("PING"); + ClassicAssert.AreEqual(string.Empty, c.TryRead()); + + // The command after that replies normally. + c.SendCommand("PING"); + ClassicAssert.AreEqual("+PONG\r\n", c.ReadExpected("+PONG\r\n")); + } + + [Test] + public void ClientReplyWrongArityReturnsError() + { + using var c = new RawConn(); + + // Zero mode args. + c.SendCommand("CLIENT", "REPLY"); + var reply = c.ReadSimpleReply(); + StringAssert.StartsWith("-ERR", reply); + StringAssert.EndsWith("\r\n", reply); + + // Two mode args. + c.SendCommand("CLIENT", "REPLY", "ON", "EXTRA"); + reply = c.ReadSimpleReply(); + StringAssert.StartsWith("-ERR", reply); + StringAssert.EndsWith("\r\n", reply); + } + + [Test] + public void ClientReplyInvalidModeReturnsSyntaxError() + { + using var c = new RawConn(); + c.SendCommand("CLIENT", "REPLY", "GARBAGE"); + var reply = c.ReadExpected("-ERR syntax error\r\n"); + ClassicAssert.AreEqual("-ERR syntax error\r\n", reply); + } + + [Test] + public void ClientReplyCaseInsensitive() + { + using var c = new RawConn(); + + // lower case off + c.SendCommand("client", "reply", "off"); + ClassicAssert.AreEqual(string.Empty, c.TryRead()); + + c.SendCommand("PING"); + ClassicAssert.AreEqual(string.Empty, c.TryRead()); + + // Mixed-case On + c.SendCommand("CLIENT", "REPLY", "On"); + ClassicAssert.AreEqual("+OK\r\n", c.ReadExpected("+OK\r\n")); + + // Mixed-case Off + c.SendCommand("CLIENT", "REPLY", "Off"); + ClassicAssert.AreEqual(string.Empty, c.TryRead()); + + c.SendCommand("CLIENT", "REPLY", "ON"); + ClassicAssert.AreEqual("+OK\r\n", c.ReadExpected("+OK\r\n")); + } + + [Test] + public void ClientReplyOffPersistsAcrossManyCommands() + { + const int N = 50; + + using var writer = new RawConn(); + writer.SendCommand("CLIENT", "REPLY", "OFF"); + ClassicAssert.AreEqual(string.Empty, writer.TryRead()); + + for (int i = 0; i < N; i++) + { + writer.SendCommand("SET", "k" + i, "v" + i); + } + // No bytes should have been emitted for any of the 50 SETs. + ClassicAssert.AreEqual(string.Empty, writer.TryRead()); + + writer.SendCommand("CLIENT", "REPLY", "ON"); + ClassicAssert.AreEqual("+OK\r\n", writer.ReadExpected("+OK\r\n")); + + // Verify on a separate connection that the SETs actually executed. + using var reader = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var db = reader.GetDatabase(0); + for (int i = 0; i < N; i++) + { + ClassicAssert.AreEqual("v" + i, (string)db.StringGet("k" + i)); + } + } + + [Test] + public void ClientReplyOffErrorAlsoSuppressed() + { + using var c = new RawConn(); + + c.SendCommand("CLIENT", "REPLY", "OFF"); + ClassicAssert.AreEqual(string.Empty, c.TryRead()); + + // Invalid args to GET would normally produce an error reply. + c.SendCommand("GET"); + ClassicAssert.AreEqual(string.Empty, c.TryRead()); + + // CLIENT REPLY GARBAGE while OFF — syntax error normally, must also be suppressed. + c.SendCommand("CLIENT", "REPLY", "GARBAGE"); + ClassicAssert.AreEqual(string.Empty, c.TryRead()); + + // ON restores; connection is still healthy. + c.SendCommand("CLIENT", "REPLY", "ON"); + ClassicAssert.AreEqual("+OK\r\n", c.ReadExpected("+OK\r\n")); + + c.SendCommand("PING"); + ClassicAssert.AreEqual("+PONG\r\n", c.ReadExpected("+PONG\r\n")); + } + + [Test] + public void ClientReplySkipDoesNotStack() + { + using var c = new RawConn(); + + // Two SKIPs in a row: only ONE following normal command should be suppressed. + c.SendCommand("CLIENT", "REPLY", "SKIP"); + ClassicAssert.AreEqual(string.Empty, c.TryRead()); + + c.SendCommand("CLIENT", "REPLY", "SKIP"); + ClassicAssert.AreEqual(string.Empty, c.TryRead()); + + // First PING after the two SKIPs: suppressed. + c.SendCommand("PING"); + ClassicAssert.AreEqual(string.Empty, c.TryRead()); + + // Second PING: replied to normally. + c.SendCommand("PING"); + ClassicAssert.AreEqual("+PONG\r\n", c.ReadExpected("+PONG\r\n")); + } + + /// + /// Unknown commands must burn a pending SKIP (the SKIP target was that command, + /// even though the server can't dispatch it). Matches Redis semantics. + /// + [Test] + public void ClientReplySkipBurnedByUnknownCommand() + { + using var c = new RawConn(); + + c.SendCommand("CLIENT", "REPLY", "SKIP"); + ClassicAssert.AreEqual(string.Empty, c.TryRead()); + + // Unknown command — its error reply is suppressed AND it burns the SKIP. + c.SendCommand("NOSUCHCMD"); + ClassicAssert.AreEqual(string.Empty, c.TryRead()); + + // Next command replies normally (SKIP was burned, not still pending). + c.SendCommand("PING"); + ClassicAssert.AreEqual("+PONG\r\n", c.ReadExpected("+PONG\r\n")); + } + + /// + /// Pipeline four commands in a single TCP write where SKIP is interleaved. + /// Verifies the reply stream ordering is preserved: the prior PING's reply must + /// not be dropped when the next command runs under suppression. + /// + [Test] + public void ClientReplyPipelinedSkipBetweenReplies() + { + using var c = new RawConn(); + + var sb = new StringBuilder(); + sb.Append("*1\r\n$4\r\nPING\r\n"); // -> +PONG + sb.Append("*3\r\n$6\r\nCLIENT\r\n$5\r\nREPLY\r\n$4\r\nSKIP\r\n"); // -> (none) + sb.Append("*1\r\n$4\r\nPING\r\n"); // -> suppressed + sb.Append("*1\r\n$4\r\nPING\r\n"); // -> +PONG + c.SendRaw(sb.ToString()); + + const string expected = "+PONG\r\n+PONG\r\n"; + ClassicAssert.AreEqual(expected, c.ReadExpected(expected)); + } + + /// + /// Pipeline OFF/ON transitions inside a single batch. Prior PING reply must survive + /// the buffer-rewind that the suppressed PING triggers, and the trailing CLIENT REPLY + /// ON must produce its +OK. + /// + [Test] + public void ClientReplyPipelinedOffOnOrdering() + { + using var c = new RawConn(); + + var sb = new StringBuilder(); + sb.Append("*1\r\n$4\r\nPING\r\n"); // -> +PONG + sb.Append("*3\r\n$6\r\nCLIENT\r\n$5\r\nREPLY\r\n$3\r\nOFF\r\n"); // -> (none) + sb.Append("*1\r\n$4\r\nPING\r\n"); // -> suppressed + sb.Append("*3\r\n$6\r\nCLIENT\r\n$5\r\nREPLY\r\n$2\r\nON\r\n"); // -> +OK + c.SendRaw(sb.ToString()); + + const string expected = "+PONG\r\n+OK\r\n"; + ClassicAssert.AreEqual(expected, c.ReadExpected(expected)); + } + } +} \ No newline at end of file diff --git a/test/Garnet.test/Resp/ACL/RespCommandTests.cs b/test/Garnet.test/Resp/ACL/RespCommandTests.cs index 96b51e1674c..5e4082126cd 100644 --- a/test/Garnet.test/Resp/ACL/RespCommandTests.cs +++ b/test/Garnet.test/Resp/ACL/RespCommandTests.cs @@ -708,6 +708,23 @@ static async Task DoClientIdAsync(GarnetClient client) } } + [Test] + public async Task ClientReplyACLsAsync() + { + await CheckCommandsAsync( + "CLIENT REPLY", + [DoClientReplyAsync] + ).ConfigureAwait(false); + + static async Task DoClientReplyAsync(GarnetClient client) + { + // CLIENT REPLY ON is the only mode that produces a reply (+OK); + // OFF/SKIP are write-only and would block ExecuteForString*. + var resp = await client.ExecuteForStringResultAsync("CLIENT", ["REPLY", "ON"]).ConfigureAwait(false); + ClassicAssert.AreEqual("OK", resp); + } + } + [Test] public async Task ClientInfoACLsAsync() { diff --git a/test/Garnet.test/RespCommandTests.cs b/test/Garnet.test/RespCommandTests.cs index cf3f0a2a946..ef6c8618360 100644 --- a/test/Garnet.test/RespCommandTests.cs +++ b/test/Garnet.test/RespCommandTests.cs @@ -514,6 +514,7 @@ public void AofIndependentCommandsTest() RespCommand.CLIENT_SETNAME, RespCommand.CLIENT_SETINFO, RespCommand.CLIENT_UNBLOCK, + RespCommand.CLIENT_REPLY, // Command RespCommand.COMMAND, RespCommand.COMMAND_COUNT,