Skip to content

Commit 2808511

Browse files
badrishcCopilot
andcommitted
Fix pub/sub: restrict commands in RESP2 subscription mode and fix reentrant lock crash
Fixes #1615. Two related bugs: 1. RESP2 subscription mode allowed arbitrary commands (GET, SET, PUBLISH, etc.) instead of restricting to only (P|S)SUBSCRIBE/(P|S)UNSUBSCRIBE/PING/QUIT per the Redis protocol. Added IsAllowedInSubscriptionMode() check in ProcessMessages that rejects disallowed commands with a Redis-compatible error message. 2. PUBLISH from a subscriber session caused SynchronizationLockException because the Publish() callback re-entered the spinlock already held by TryConsumeMessages. Fixed by tracking the command-processing thread ID and detecting reentrant calls in Publish()/PatternPublish() to skip lock acquire/release when on the same thread. RESP3 sessions are not restricted since push message types are distinguishable from regular responses. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent e8760eb commit 2808511

5 files changed

Lines changed: 184 additions & 9 deletions

File tree

libs/server/Resp/CmdStrings.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,7 @@ static partial class CmdStrings
328328
public const string GenericUnknownClientType = "ERR Unknown client type '{0}'";
329329
public const string GenericErrDuplicateFilter = "ERR Filter '{0}' defined multiple times";
330330
public const string GenericPubSubCommandDisabled = "ERR {0} is disabled, enable it with --pubsub option.";
331+
public const string GenericPubSubCommandNotAllowed = "ERR Can't execute '{0}': only (P|S)SUBSCRIBE / (P|S)UNSUBSCRIBE / PING / QUIT are allowed in this context";
331332
public const string GenericErrLonLat = "ERR invalid longitude,latitude pair {0:F6},{1:F6}";
332333
public const string GenericErrStoreCommand = "ERR STORE option in {0} is not compatible with WITHDIST, WITHHASH and WITHCOORD options";
333334
public const string GenericErrCommandDisallowedWithOption =

libs/server/Resp/Parser/RespCommand.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -627,6 +627,23 @@ public static bool IsClusterSubCommand(this RespCommand cmd)
627627
bool inRange = test <= (RespCommand.CLUSTER_SYNC - RespCommand.CLUSTER_ADDSLOTS);
628628
return inRange;
629629
}
630+
631+
/// <summary>
632+
/// Returns true if <paramref name="cmd"/> is allowed while a session is in
633+
/// pub/sub subscription mode (RESP2). Per the Redis protocol, only
634+
/// (P|S)SUBSCRIBE, (P|S)UNSUBSCRIBE, PING, and QUIT are valid in this state.
635+
/// </summary>
636+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
637+
public static bool IsAllowedInSubscriptionMode(this RespCommand cmd)
638+
{
639+
return cmd is RespCommand.SUBSCRIBE
640+
or RespCommand.UNSUBSCRIBE
641+
or RespCommand.PSUBSCRIBE
642+
or RespCommand.PUNSUBSCRIBE
643+
or RespCommand.SSUBSCRIBE
644+
or RespCommand.PING
645+
or RespCommand.QUIT;
646+
}
630647
}
631648

632649
/// <summary>

libs/server/Resp/PubSubCommands.cs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44
using System;
55
using System.Collections.Generic;
6-
using System.Diagnostics;
76
using Garnet.common;
87
using Tsavorite.core;
98

@@ -21,9 +20,15 @@ internal sealed unsafe partial class RespServerSession : ServerSessionBase
2120
/// <inheritdoc />
2221
public override unsafe void Publish(PinnedSpanByte key, PinnedSpanByte value)
2322
{
23+
// When a session publishes to a channel it is itself subscribed to, the Broadcast
24+
// callback is invoked on the same thread that already holds the network sender lock
25+
// via TryConsumeMessages. Detect this reentrant case and write directly to the
26+
// existing output buffer instead of re-entering the lock.
27+
var reentrant = commandProcessingThreadId == Environment.CurrentManagedThreadId;
2428
try
2529
{
26-
networkSender.EnterAndGetResponseObject(out dcurr, out dend);
30+
if (!reentrant)
31+
networkSender.EnterAndGetResponseObject(out dcurr, out dend);
2732

2833
WritePushLength(3);
2934

@@ -35,7 +40,7 @@ public override unsafe void Publish(PinnedSpanByte key, PinnedSpanByte value)
3540
WriteDirectLargeRespString(value.ReadOnlySpan);
3641

3742
// Flush the publish message for this subscriber
38-
if (dcurr > networkSender.GetResponseObjectHead())
43+
if (!reentrant && dcurr > networkSender.GetResponseObjectHead())
3944
Send(networkSender.GetResponseObjectHead());
4045
}
4146
catch
@@ -44,16 +49,19 @@ public override unsafe void Publish(PinnedSpanByte key, PinnedSpanByte value)
4449
}
4550
finally
4651
{
47-
networkSender.ExitAndReturnResponseObject();
52+
if (!reentrant)
53+
networkSender.ExitAndReturnResponseObject();
4854
}
4955
}
5056

5157
/// <inheritdoc />
5258
public override unsafe void PatternPublish(PinnedSpanByte pattern, PinnedSpanByte key, PinnedSpanByte value)
5359
{
60+
var reentrant = commandProcessingThreadId == Environment.CurrentManagedThreadId;
5461
try
5562
{
56-
networkSender.EnterAndGetResponseObject(out dcurr, out dend);
63+
if (!reentrant)
64+
networkSender.EnterAndGetResponseObject(out dcurr, out dend);
5765

5866
WritePushLength(4);
5967

@@ -65,7 +73,7 @@ public override unsafe void PatternPublish(PinnedSpanByte pattern, PinnedSpanByt
6573
WriteDirectLargeRespString(key.ReadOnlySpan);
6674
WriteDirectLargeRespString(value.ReadOnlySpan);
6775

68-
if (dcurr > networkSender.GetResponseObjectHead())
76+
if (!reentrant && dcurr > networkSender.GetResponseObjectHead())
6977
Send(networkSender.GetResponseObjectHead());
7078
}
7179
catch
@@ -74,7 +82,8 @@ public override unsafe void PatternPublish(PinnedSpanByte pattern, PinnedSpanByt
7482
}
7583
finally
7684
{
77-
networkSender.ExitAndReturnResponseObject();
85+
if (!reentrant)
86+
networkSender.ExitAndReturnResponseObject();
7887
}
7988
}
8089

@@ -100,7 +109,6 @@ private bool NetworkPUBLISH(RespCommand cmd)
100109
return AbortWithErrorMessage(CmdStrings.RESP_ERR_GENERIC_CLUSTER_DISABLED);
101110
}
102111

103-
Debug.Assert(isSubscriptionSession == false);
104112
// PUBLISH channel message => [*3\r\n$7\r\nPUBLISH\r\n$]7\r\nchannel\r\n$7\r\message\r\n
105113

106114
var key = parseState.GetArgSliceByRef(0);

libs/server/Resp/RespServerSession.cs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ internal sealed unsafe partial class RespServerSession : ServerSessionBase
8080

8181
int opCount;
8282

83+
// Thread ID of the thread currently processing commands (used by Publish/PatternPublish
84+
// callbacks to detect reentrant calls when the same session publishes to a self-subscribed channel)
85+
int commandProcessingThreadId;
86+
8387
/// <summary>
8488
/// Current database session items
8589
/// </summary>
@@ -441,7 +445,9 @@ public override int TryConsumeMessages(byte* reqBuffer, int bytesReceived)
441445
clusterSession?.AcquireCurrentEpoch();
442446
recvBufferPtr = reqBuffer;
443447
networkSender.EnterAndGetResponseObject(out dcurr, out dend);
448+
commandProcessingThreadId = Environment.CurrentManagedThreadId;
444449
ProcessMessages();
450+
commandProcessingThreadId = 0;
445451
recvBufferPtr = null;
446452
}
447453
catch (RespParsingException ex)
@@ -575,7 +581,14 @@ private void ProcessMessages()
575581

576582
if (CheckACLPermissions(cmd) && (noScriptPassed = CheckScriptPermissions(cmd)))
577583
{
578-
if (txnManager.state != TxnState.None)
584+
// In RESP2, only a small set of commands are allowed while in subscription mode.
585+
// RESP3 uses distinct push types for subscription messages, so all commands are valid.
586+
if (isSubscriptionSession && respProtocolVersion == 2 && !cmd.IsAllowedInSubscriptionMode())
587+
{
588+
while (!RespWriteUtils.TryWriteError(string.Format(CmdStrings.GenericPubSubCommandNotAllowed, cmd.ToString()), ref dcurr, dend))
589+
SendAndReset();
590+
}
591+
else if (txnManager.state != TxnState.None)
579592
{
580593
if (txnManager.state == TxnState.Running)
581594
{

test/Garnet.test/RespPubSubTests.cs

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using System.Security.Cryptography;
77
using System.Threading;
88
using Allure.NUnit;
9+
using Garnet.common;
910
using NUnit.Framework;
1011
using NUnit.Framework.Legacy;
1112
using StackExchange.Redis;
@@ -230,5 +231,140 @@ private void SubscribeAndPublish(ISubscriber sub, IDatabase db, RedisChannel cha
230231
ClassicAssert.IsTrue(repeat != 0, "Timeout waiting for subscription receive");
231232
}
232233
}
234+
235+
/// <summary>
236+
/// Verifies that disallowed commands (GET, SET, PUBLISH, MULTI) are rejected
237+
/// with an appropriate error when a RESP2 session is in subscription mode.
238+
/// </summary>
239+
[Test]
240+
public void PubSubModeRejectsDisallowedCommandsInResp2()
241+
{
242+
using var lightClientRequest = TestUtils.CreateRequest(countResponseType: CountResponseType.Bytes);
243+
244+
// Subscribe to enter subscription mode
245+
var subscribeResp = "*3\r\n$9\r\nsubscribe\r\n$3\r\nfoo\r\n:1\r\n";
246+
var response = lightClientRequest.Execute("SUBSCRIBE foo", subscribeResp.Length);
247+
ClassicAssert.AreEqual(subscribeResp, response);
248+
249+
// GET should be rejected
250+
var errorResp = "-ERR Can't execute 'GET': only (P|S)SUBSCRIBE / (P|S)UNSUBSCRIBE / PING / QUIT are allowed in this context\r\n";
251+
response = lightClientRequest.Execute("GET bar", errorResp.Length);
252+
ClassicAssert.AreEqual(errorResp, response);
253+
254+
// SET should be rejected
255+
errorResp = "-ERR Can't execute 'SET': only (P|S)SUBSCRIBE / (P|S)UNSUBSCRIBE / PING / QUIT are allowed in this context\r\n";
256+
response = lightClientRequest.Execute("SET bar value", errorResp.Length);
257+
ClassicAssert.AreEqual(errorResp, response);
258+
259+
// PUBLISH should be rejected
260+
errorResp = "-ERR Can't execute 'PUBLISH': only (P|S)SUBSCRIBE / (P|S)UNSUBSCRIBE / PING / QUIT are allowed in this context\r\n";
261+
response = lightClientRequest.Execute("PUBLISH foo bar", errorResp.Length);
262+
ClassicAssert.AreEqual(errorResp, response);
263+
264+
// MULTI should be rejected
265+
errorResp = "-ERR Can't execute 'MULTI': only (P|S)SUBSCRIBE / (P|S)UNSUBSCRIBE / PING / QUIT are allowed in this context\r\n";
266+
response = lightClientRequest.Execute("MULTI", errorResp.Length);
267+
ClassicAssert.AreEqual(errorResp, response);
268+
}
269+
270+
/// <summary>
271+
/// Verifies that allowed commands (PING, SUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE,
272+
/// PUNSUBSCRIBE, QUIT) work correctly in RESP2 subscription mode.
273+
/// </summary>
274+
[Test]
275+
public void PubSubModeAllowsValidCommandsInResp2()
276+
{
277+
using var lightClientRequest = TestUtils.CreateRequest(countResponseType: CountResponseType.Bytes);
278+
279+
// Enter subscription mode
280+
var subscribeResp = "*3\r\n$9\r\nsubscribe\r\n$3\r\nfoo\r\n:1\r\n";
281+
var response = lightClientRequest.Execute("SUBSCRIBE foo", subscribeResp.Length);
282+
ClassicAssert.AreEqual(subscribeResp, response);
283+
284+
// PING should return subscription-mode PONG
285+
var pongResp = "*2\r\n$4\r\npong\r\n$0\r\n\r\n";
286+
response = lightClientRequest.Execute("PING", pongResp.Length);
287+
ClassicAssert.AreEqual(pongResp, response);
288+
289+
// Another SUBSCRIBE should work (channel count goes to 2)
290+
subscribeResp = "*3\r\n$9\r\nsubscribe\r\n$3\r\nbar\r\n:2\r\n";
291+
response = lightClientRequest.Execute("SUBSCRIBE bar", subscribeResp.Length);
292+
ClassicAssert.AreEqual(subscribeResp, response);
293+
294+
// PSUBSCRIBE should work (channel count goes to 3)
295+
var psubResp = "*3\r\n$10\r\npsubscribe\r\n$4\r\nbaz*\r\n:3\r\n";
296+
response = lightClientRequest.Execute("PSUBSCRIBE baz*", psubResp.Length);
297+
ClassicAssert.AreEqual(psubResp, response);
298+
299+
// UNSUBSCRIBE should work (channel count goes to 2)
300+
var unsubResp = "*3\r\n$11\r\nunsubscribe\r\n$3\r\nbar\r\n:2\r\n";
301+
response = lightClientRequest.Execute("UNSUBSCRIBE bar", unsubResp.Length);
302+
ClassicAssert.AreEqual(unsubResp, response);
303+
304+
// PUNSUBSCRIBE should work (channel count goes to 1)
305+
var punsubResp = "*3\r\n$12\r\npunsubscribe\r\n$4\r\nbaz*\r\n:1\r\n";
306+
response = lightClientRequest.Execute("PUNSUBSCRIBE baz*", punsubResp.Length);
307+
ClassicAssert.AreEqual(punsubResp, response);
308+
309+
// Still in subscription mode - GET should be rejected
310+
var errorResp = "-ERR Can't execute 'GET': only (P|S)SUBSCRIBE / (P|S)UNSUBSCRIBE / PING / QUIT are allowed in this context\r\n";
311+
response = lightClientRequest.Execute("GET bar", errorResp.Length);
312+
ClassicAssert.AreEqual(errorResp, response);
313+
314+
// UNSUBSCRIBE last channel to exit subscription mode (channel count goes to 0)
315+
unsubResp = "*3\r\n$11\r\nunsubscribe\r\n$3\r\nfoo\r\n:0\r\n";
316+
response = lightClientRequest.Execute("UNSUBSCRIBE foo", unsubResp.Length);
317+
ClassicAssert.AreEqual(unsubResp, response);
318+
319+
// No longer in subscription mode - GET should work (key doesn't exist = null)
320+
var getResp = "$-1\r\n";
321+
response = lightClientRequest.Execute("GET bar", getResp.Length);
322+
ClassicAssert.AreEqual(getResp, response);
323+
}
324+
325+
/// <summary>
326+
/// Verifies that a RESP3 session in subscription mode can execute PUBLISH
327+
/// (including self-publish to its own subscribed channel) without a
328+
/// SynchronizationLockException crash. This was the core lock bug in issue #1615.
329+
/// </summary>
330+
[Test]
331+
public void PubSubSelfPublishResp3NoLockError()
332+
{
333+
// Use Newlines counting for HELLO 3 (variable-length map response)
334+
using var lightClientRequest = TestUtils.CreateRequest(countResponseType: CountResponseType.Newlines);
335+
336+
// Switch to RESP3
337+
var response = lightClientRequest.Execute("HELLO 3", 30);
338+
ClassicAssert.IsTrue(response.Contains("proto"));
339+
340+
// Switch to Bytes counting for precise response control
341+
lightClientRequest.countResponseType = CountResponseType.Bytes;
342+
343+
// Subscribe to a channel
344+
var subscribeResp = "*3\r\n$9\r\nsubscribe\r\n$3\r\nfoo\r\n:1\r\n";
345+
response = lightClientRequest.Execute("SUBSCRIBE foo", subscribeResp.Length);
346+
ClassicAssert.AreEqual(subscribeResp, response);
347+
348+
// Self-publish: this triggers the reentrant Publish() callback on the same session.
349+
// Before the fix, this would crash with SynchronizationLockException.
350+
//
351+
// Expected response consists of:
352+
// 1. Push message (self-notification): >3\r\n$7\r\nmessage\r\n$3\r\nfoo\r\n$3\r\nbar\r\n
353+
// 2. PUBLISH response: :1\r\n
354+
var pushMsg = ">3\r\n$7\r\nmessage\r\n$3\r\nfoo\r\n$3\r\nbar\r\n";
355+
var publishResp = ":1\r\n";
356+
var expectedPublishTotal = pushMsg + publishResp;
357+
response = lightClientRequest.Execute("PUBLISH foo bar", expectedPublishTotal.Length);
358+
ClassicAssert.AreEqual(expectedPublishTotal, response);
359+
360+
// Unsubscribe and verify server is still responsive
361+
var unsubResp = "*3\r\n$11\r\nunsubscribe\r\n$3\r\nfoo\r\n:0\r\n";
362+
response = lightClientRequest.Execute("UNSUBSCRIBE foo", unsubResp.Length);
363+
ClassicAssert.AreEqual(unsubResp, response);
364+
365+
// PING to confirm server health
366+
response = lightClientRequest.Execute("PING", "+PONG\r\n".Length);
367+
ClassicAssert.AreEqual("+PONG\r\n", response);
368+
}
233369
}
234370
}

0 commit comments

Comments
 (0)