Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions libs/resources/RespCommandsDocs.json
Original file line number Diff line number Diff line change
Expand Up @@ -1216,6 +1216,43 @@
]
}
]
},
{
"Command": "CLIENT_REPLY",
"Name": "CLIENT|REPLY",
"Summary": "Instructs the server whether to reply to commands.",
"Group": "Connection",
"Complexity": "O(1)",
"Arguments": [
{
"TypeDiscriminator": "RespCommandContainerArgument",
"Name": "ON-OFF-SKIP",
"Type": "OneOf",
"Arguments": [
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "ON",
"DisplayText": "on",
"Type": "PureToken",
"Token": "ON"
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "OFF",
"DisplayText": "off",
"Type": "PureToken",
"Token": "OFF"
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "SKIP",
"DisplayText": "skip",
"Type": "PureToken",
"Token": "SKIP"
}
]
}
]
}
]
},
Expand Down
7 changes: 7 additions & 0 deletions libs/resources/RespCommandsInfo.json
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,13 @@
"Arity": -3,
"Flags": "Admin, Loading, NoScript, Stale",
"AclCategories": "Admin, Connection, Dangerous, Slow"
},
{
"Command": "CLIENT_REPLY",
"Name": "CLIENT|REPLY",
"Arity": 3,
"Flags": "Loading, NoScript, Stale",
"AclCategories": "Connection, Slow"
}
]
},
Expand Down
57 changes: 57 additions & 0 deletions libs/server/Resp/ClientCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,19 @@

namespace Garnet.server
{
/// <summary>
/// Reply suppression mode controlled by the <c>CLIENT REPLY</c> subcommand.
/// </summary>
internal enum ClientReplyMode : byte
{
/// <summary>Normal replies are sent (default).</summary>
On = 0,
/// <summary>All replies are suppressed until a <c>CLIENT REPLY ON</c> is received.</summary>
Off,
/// <summary>The reply for the next command is suppressed; mode returns to <see cref="On"/> after.</summary>
Skip,
}

/// <summary>
/// Server session for RESP protocol - client commands are in this file
/// </summary>
Expand Down Expand Up @@ -640,5 +653,49 @@ private bool NetworkCLIENTUNBLOCK()

return true;
}

/// <summary>
/// CLIENT REPLY ON|OFF|SKIP — controls per-connection reply suppression.
/// OFF suppresses all replies until ON; SKIP suppresses only the next command's reply.
/// The OFF and SKIP commands themselves produce no reply; ON replies with +OK.
/// </summary>
private bool NetworkCLIENTREPLY()
{
if (parseState.Count != 1)
{
return AbortWithWrongNumberOfArguments("client|reply");
}

var modeSpan = parseState.GetArgSliceByRef(0).ReadOnlySpan;

if (modeSpan.EqualsUpperCaseSpanIgnoringCase(CmdStrings.ON))
{
clientReplyMode = ClientReplyMode.On;
// The ON command itself must reply +OK even if we just transitioned out of OFF/SKIP.
// Clear the suppression flag set at command-start so the +OK actually flushes.
suppressCurrentReply = false;
while (!RespWriteUtils.TryWriteDirect(CmdStrings.RESP_OK, ref dcurr, dend))
SendAndReset();
}
else if (modeSpan.EqualsUpperCaseSpanIgnoringCase(CmdStrings.OFF))
{
clientReplyMode = ClientReplyMode.Off;
// No reply.
}
else if (modeSpan.EqualsUpperCaseSpanIgnoringCase(CmdStrings.SKIP))
{
// SKIP only arms when we're currently On. If already Off it stays Off; if already Skip it stays Skip
// (a second SKIP just re-arms — it does not stack).
if (clientReplyMode == ClientReplyMode.On)
clientReplyMode = ClientReplyMode.Skip;
// No reply.
}
else
{
return AbortWithErrorMessage(CmdStrings.RESP_SYNTAX_ERROR);
}

return true;
}
}
}
2 changes: 2 additions & 0 deletions libs/server/Resp/CmdStrings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,8 @@ static partial class CmdStrings
public static ReadOnlySpan<byte> GETNAME => "GETNAME"u8;
public static ReadOnlySpan<byte> SETINFO => "SETINFO"u8;
public static ReadOnlySpan<byte> UNBLOCK => "UNBLOCK"u8;
public static ReadOnlySpan<byte> REPLY => "REPLY"u8;
public static ReadOnlySpan<byte> SKIP => "SKIP"u8;
public static ReadOnlySpan<byte> USER => "USER"u8;
public static ReadOnlySpan<byte> ADDR => "ADDR"u8;
public static ReadOnlySpan<byte> LADDR => "LADDR"u8;
Expand Down
6 changes: 6 additions & 0 deletions libs/server/Resp/Parser/RespCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ public enum RespCommand : ushort
CLIENT_SETNAME,
CLIENT_SETINFO,
CLIENT_UNBLOCK,
CLIENT_REPLY,

MONITOR,
MODULE,
Expand Down Expand Up @@ -465,6 +466,7 @@ public static class RespCommandExtensions
RespCommand.CLIENT_SETNAME,
RespCommand.CLIENT_SETINFO,
RespCommand.CLIENT_UNBLOCK,
RespCommand.CLIENT_REPLY,
// Command
RespCommand.COMMAND,
RespCommand.COMMAND_COUNT,
Expand Down Expand Up @@ -2125,6 +2127,10 @@ private RespCommand SlowParseCommand(ReadOnlySpan<byte> command, ref int count,
{
return RespCommand.CLIENT_UNBLOCK;
}
else if (subCommand.SequenceEqual(CmdStrings.REPLY))
{
return RespCommand.CLIENT_REPLY;
}

string errMsg = string.Format(CmdStrings.GenericErrUnknownSubCommandNoHelp,
Encoding.UTF8.GetString(subCommand),
Expand Down
98 changes: 98 additions & 0 deletions libs/server/Resp/RespServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,26 @@ public IGarnetServer Server
/// </summary>
string clientLibVersion = null;

/// <summary>
/// Current CLIENT REPLY mode for this connection (controls reply suppression).
/// </summary>
internal ClientReplyMode clientReplyMode = ClientReplyMode.On;

/// <summary>
/// True while the current command's output should be discarded (i.e. CLIENT REPLY OFF/SKIP is active for this command).
/// Set at the start of each command in ProcessMessages and may be cleared by NetworkCLIENTREPLY when it processes ON
/// (so the +OK reply for `CLIENT REPLY ON` is allowed through).
/// </summary>
bool suppressCurrentReply = false;

/// <summary>
/// Snapshot of <c>dcurr</c> at the start of the current command in ProcessMessages.
/// When <see cref="suppressCurrentReply"/> is set this is the rewind floor — bytes from earlier (non-suppressed)
/// commands in the same batch live at [head, cmdReplyFloor) and must not be discarded. <see cref="SendAndReset()"/>
/// uses this floor so a mid-command flush triggered by a suppressed write does not corrupt pipelined responses.
/// </summary>
byte* cmdReplyFloor;

/// <summary>
/// Flag indicating whether any of the commands in one message
/// requires us to block on AOF before sending response over the network
Expand Down Expand Up @@ -623,6 +643,18 @@ private void ProcessMessages<TBasicApi, TTxnApi>(ref TBasicApi basicApi, ref TTx

while (bytesRead - readHead >= 4)
{
// CLIENT REPLY: snapshot reply-suppression state for this command BEFORE parsing.
// - modeAtStart drives whether the buffer position we capture now (cmdStartPtr) gets
// rewound after the command runs (discarding the reply bytes).
// - suppressCurrentReply also gates SendAndReset() so any mid-command flush is
// discarded instead of being sent to the network. We set it before ParseCommand so
// parse-time error replies (e.g. "unknown command") are also gated.
// CLIENT REPLY ON clears suppressCurrentReply inside its handler so the +OK reply
// for the ON command itself is allowed through.
var modeAtStart = clientReplyMode;
cmdReplyFloor = dcurr;
suppressCurrentReply = modeAtStart != ClientReplyMode.On;

// First, parse the command, making sure we have the entire command available
// We use endReadHead to track the end of the current command
// On success, readHead is left at the start of the command payload for legacy operators
Expand All @@ -632,6 +664,7 @@ private void ProcessMessages<TBasicApi, TTxnApi>(ref TBasicApi basicApi, ref TTx
if (!commandReceived)
{
endReadHead = readHead = _origReadHead;
suppressCurrentReply = false;
break;
}

Expand Down Expand Up @@ -695,6 +728,30 @@ private void ProcessMessages<TBasicApi, TTxnApi>(ref TBasicApi basicApi, ref TTx
containsSlowCommand = true;
}

// CLIENT REPLY: end-of-command suppression handling. Applies to INVALID/parse-error
// commands as well so things like "GET" with no args don't leak "-ERR unknown command".
// suppressCurrentReply may have been cleared by NetworkCLIENTREPLY when it
// executed `CLIENT REPLY ON` (so the +OK reply is preserved).
if (suppressCurrentReply)
{
// Discard anything this command wrote since cmdReplyFloor. SendAndReset has
// also been gated mid-command (it may have rotated the buffer if prior
// non-suppressed replies needed flushing, in which case cmdReplyFloor was
// updated to point at the new buffer head).
dcurr = cmdReplyFloor;
}
suppressCurrentReply = false;

// Burn off a one-shot SKIP. The CLIENT REPLY command itself never burns the skip —
// a SKIP issued while already in Skip mode just re-arms (does not stack). Every
// other fully-received command — including parse-error / INVALID commands and
// unknown commands — consumes the SKIP, matching Redis semantics. Partial input
// is already handled by the !commandReceived early-break above.
if (modeAtStart == ClientReplyMode.Skip && cmd != RespCommand.CLIENT_REPLY)
{
clientReplyMode = ClientReplyMode.On;
}

// Advance read head variables to process the next command
_origReadHead = readHead = endReadHead;

Expand Down Expand Up @@ -1050,6 +1107,7 @@ private bool ProcessOtherCommands<TGarnetApi>(RespCommand command, ref TGarnetAp
RespCommand.CLIENT_SETNAME => NetworkCLIENTSETNAME(),
RespCommand.CLIENT_SETINFO => NetworkCLIENTSETINFO(),
RespCommand.CLIENT_UNBLOCK => NetworkCLIENTUNBLOCK(),
RespCommand.CLIENT_REPLY => NetworkCLIENTREPLY(),
RespCommand.COMMAND => NetworkCOMMAND(),
RespCommand.COMMAND_COUNT => NetworkCOMMAND_COUNT(),
RespCommand.COMMAND_DOCS => NetworkCOMMAND_DOCS(),
Expand Down Expand Up @@ -1318,6 +1376,37 @@ private unsafe bool Write(int seqNo, ref byte* dst, int length)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void SendAndReset()
{
// CLIENT REPLY OFF/SKIP: discard bytes for the currently-suppressed command without
// touching any prior (non-suppressed) replies queued earlier in the same batch.
if (suppressCurrentReply)
{
byte* head = networkSender.GetResponseObjectHead();
if (dcurr > cmdReplyFloor)
{
// Suppressed write made progress for the current command — drop just those
// bytes (the floor onward) and keep [head, cmdReplyFloor) intact.
dcurr = cmdReplyFloor;
return;
}
if (cmdReplyFloor > head)
{
// No progress on the suppressed write yet, but earlier commands' replies sit
// at [head, cmdReplyFloor). Flush those and rotate to a fresh buffer so the
// retry has full space — then continue suppressing in the new buffer.
Send(head);
networkSender.GetResponseObject();
cmdReplyFloor = networkSender.GetResponseObjectHead();
dcurr = cmdReplyFloor;
dend = networkSender.GetResponseObjectTail();
return;
}
// No prior bytes and no progress: the single write is larger than the entire
// response buffer. Surface the same fatal condition as the non-suppressed path
// rather than spinning forever.
GarnetException.Throw("Failed to write to response buffer", LogLevel.Critical);
return;
}

byte* d = networkSender.GetResponseObjectHead();
if ((int)(dcurr - d) > 0)
{
Expand All @@ -1338,6 +1427,15 @@ internal void SendAndReset()
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void SendAndReset(IMemoryOwner<byte> memory, int length)
{
// CLIENT REPLY OFF/SKIP: drop the payload entirely without flushing the buffer. The
// buffer may legitimately contain earlier non-suppressed replies sitting at
// [head, cmdReplyFloor); those get flushed at end-of-batch by the normal path.
if (suppressCurrentReply)
{
memory.Dispose();
return;
}

// Copy allocated memory to main buffer and send
fixed (byte* _src = memory.Memory.Span)
{
Expand Down
1 change: 1 addition & 0 deletions playground/CommandInfoUpdater/SupportedCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class SupportedCommand
new("CLIENT|SETNAME", RespCommand.CLIENT_SETNAME),
new("CLIENT|SETINFO", RespCommand.CLIENT_SETINFO),
new("CLIENT|UNBLOCK", RespCommand.CLIENT_UNBLOCK),
new("CLIENT|REPLY", RespCommand.CLIENT_REPLY),
]),
new("CLUSTER", RespCommand.CLUSTER, StoreType.None,
[
Expand Down
Loading
Loading