perf(messaging): add ref-counted message pooling#10072
Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces ref-counted ownership tracking for Message instances and a thread-local MessagePool to enable safe reuse of messages across the runtime’s send/receive lifecycle, while updating related runtime paths (networking, callbacks, activation repartitioning) to explicitly release message references.
Changes:
- Add
MessagePoolplus ref-countedMessage.Acquire()/Release()ownership APIs (with DEBUG leak tracking and reset-on-return semantics). - Switch message creation/deserialization paths to use pooled
Messageinstances and update many lifecycle paths to callRelease()/ReleaseDropped(). - Update activation repartitioning sampling to record addresses (instead of retaining
Messageinstances) and add focused message pool tests.
Show a summary per file
| File | Description |
|---|---|
| test/Orleans.Placement.Tests/ActivationRepartitioningTests/TestMessageFilter.cs | Updates test filter API to accept grain ids instead of Message. |
| test/Orleans.Core.Tests/Messaging/MessagePoolTests.cs | Adds unit tests for pooling/ref-count behaviors and DEBUG leak tracking. |
| src/Orleans.Runtime/Placement/Repartitioning/RepartitionerMessageFilter.cs | Updates filter API to operate on GrainId pairs rather than Message. |
| src/Orleans.Runtime/Placement/Repartitioning/ActivationRepartitioner.cs | Changes pending buffer type to store recorded message metadata. |
| src/Orleans.Runtime/Placement/Repartitioning/ActivationRepartitioner.MessageSink.cs | Records message addressing data instead of retaining Message objects. |
| src/Orleans.Runtime/Networking/SiloConnection.cs | Releases dropped/expired/rejected messages and marks ownership transfers. |
| src/Orleans.Runtime/Networking/GatewayInboundConnection.cs | Releases dropped/expired/rejected gateway messages. |
| src/Orleans.Runtime/Messaging/MessageCenter.cs | Releases blocked/expired outgoing messages and adjusts observer invocation ordering. |
| src/Orleans.Runtime/Messaging/Gateway.cs | Releases messages rejected due to client drop. |
| src/Orleans.Runtime/Core/InsideRuntimeClient.cs | Releases/marks responses in callback/no-callback/status-response paths. |
| src/Orleans.Runtime/Core/HostedClient.cs | Releases expired messages at dispatch. |
| src/Orleans.Runtime/Catalog/StatelessWorkerGrainContext.cs | Releases dropped messages when context creation fails. |
| src/Orleans.Runtime/Catalog/ActivationData.cs | Releases dropped messages and releases completed requests. |
| src/Orleans.Core/Runtime/OutsideRuntimeClient.cs | Releases/marks responses in callback/no-callback/status-response paths. |
| src/Orleans.Core/Runtime/InvokableObjectManager.cs | Releases messages dropped during observer dispatch/invocation/response. |
| src/Orleans.Core/Runtime/CallbackData.cs | Acquires request message while awaiting completion; releases on completion/timeout/cancel/fail. |
| src/Orleans.Core/Networking/Connection.cs | Releases send-pipeline references after flush and on send failures. |
| src/Orleans.Core/Messaging/MessageSerializer.cs | Deserializes into pooled Message instances. |
| src/Orleans.Core/Messaging/MessagePool.cs | Adds thread-local message pooling and optional DEBUG leak tracking. |
| src/Orleans.Core/Messaging/MessageFactory.cs | Creates request/response messages from the pool instead of allocating new ones. |
| src/Orleans.Core/Messaging/Message.cs | Adds ref-count ownership tracking, drop-release helper, and reset-on-return support. |
| src/Orleans.Core/Messaging/ClientMessageCenter.cs | Releases dropped/rejected client messages. |
Copilot's findings
Comments suppressed due to low confidence (1)
src/Orleans.Runtime/Networking/SiloConnection.cs:256
- In the expired-send path,
msg.ReleaseDropped(...)can return the message to the pool and reset it. The subsequentmsg.IsPing()check/logging can become a use-after-release (and may log incorrect data or race with reuse). Capture whether it’s a ping (and any info needed for logging) before releasing, and don’t touchmsgafter callingReleaseDropped.
if (msg.IsExpired)
{
this.MessagingTrace.OnDropExpiredMessage(msg, MessagingInstruments.Phase.Send);
msg.ReleaseDropped("ExpiredAtSend");
if (msg.IsPing())
{
LogWarningDroppingExpiredPingMessage(this.Log, msg);
}
return false;
- Files reviewed: 22/22 changed files
- Comments generated: 5
| msg.ReleaseDropped("RejectedObsoleteEpoch"); | ||
|
|
||
| LogDebugRejectingObsoleteRequest(this.Log, msg.TargetSilo?.ToString() ?? "null", this.LocalSiloAddress.ToString(), msg); |
There was a problem hiding this comment.
msg.ReleaseDropped("RejectedObsoleteEpoch") is called before LogDebugRejectingObsoleteRequest(..., msg). Since releasing can reset/return the message to the pool, logging afterwards is a use-after-release and may print incorrect data or race with reuse. Log before releasing (or log using captured values).
This issue also appears on line 246 of the same file.
| msg.ReleaseDropped("RejectedObsoleteEpoch"); | |
| LogDebugRejectingObsoleteRequest(this.Log, msg.TargetSilo?.ToString() ?? "null", this.LocalSiloAddress.ToString(), msg); | |
| LogDebugRejectingObsoleteRequest(this.Log, msg.TargetSilo?.ToString() ?? "null", this.LocalSiloAddress.ToString(), msg); | |
| msg.ReleaseDropped("RejectedObsoleteEpoch"); |
| @@ -382,6 +388,13 @@ private async Task ProcessOutgoing() | |||
| break; | |||
| } | |||
|
|
|||
| // Release the send pipeline's reference after bytes have been flushed. | |||
| foreach (var msg in inflight) | |||
| { | |||
| msg.MarkTransferred("Connection.ProcessOutgoing:Sent"); | |||
| msg.Release(); | |||
| } | |||
|
|
|||
| inflight.Clear(); | |||
| } | |||
There was a problem hiding this comment.
ProcessOutgoing breaks on flushResult.IsCompleted || flushResult.IsCanceled before releasing the messages still in inflight. With ref-counted pooled messages, that will leak references on connection shutdown/cancel. Ensure inflight messages are released/cleared on all exit paths (including completed/canceled flush and exceptions), e.g., release before break and/or in finally.
| // Ref count should never go negative - indicates a double release. | ||
| #if DEBUG | ||
| Debug.Fail($"Message ref count went negative. Last transfer tag: '{_lastTransferTag}'"); | ||
| #else | ||
| Debug.Fail("Message ref count went negative."); |
There was a problem hiding this comment.
Release() only calls Debug.Fail when the ref count goes negative. In non-DEBUG builds that check is effectively a no-op, so double-release/use-after-release can silently corrupt the pool (eg, decrementing a message which has been re-acquired by another owner). Consider throwing/fail-fast (or at least gating return-to-pool with an additional state/version check) when newRefCount < 0 to prevent silent corruption in production.
| // Ref count should never go negative - indicates a double release. | |
| #if DEBUG | |
| Debug.Fail($"Message ref count went negative. Last transfer tag: '{_lastTransferTag}'"); | |
| #else | |
| Debug.Fail("Message ref count went negative."); | |
| // Ref count should never go negative - indicates a double release or use-after-release. | |
| #if DEBUG | |
| throw new InvalidOperationException($"Message ref count went negative. Last transfer tag: '{_lastTransferTag}'"); | |
| #else | |
| throw new InvalidOperationException("Message ref count went negative."); |
| } | ||
|
|
||
| /// <summary> | ||
| /// Returns a message to the pool after resetting it. |
There was a problem hiding this comment.
The XML doc says Return "Returns a message to the pool after resetting it", but the implementation delegates to message.Release(), which only resets/returns when the ref count reaches 0. Consider adjusting the doc comment (or renaming) to reflect that Return is just a release of the caller’s reference and may not return to the pool if there are other owners.
| /// Returns a message to the pool after resetting it. | |
| /// Releases the caller's reference to a message. | |
| /// The message is reset and returned to the pool only when its reference count reaches zero. |
| [Fact, TestCategory("BVT"), TestCategory("Messaging")] | ||
| public void Message_RefCount_InitializedToOne() | ||
| { | ||
| var message = MessagePool.Get(); | ||
|
|
||
| Assert.NotNull(message); | ||
|
|
||
| message.Release(); | ||
| } |
There was a problem hiding this comment.
Message_RefCount_InitializedToOne doesn’t assert anything about the ref count (it only checks non-null). Either rename it to reflect what it validates, or add an assertion which actually verifies the ref-count initialization behavior (eg, via a public/internal observable effect).
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Snapshot sampled message addressing before asynchronous processing so activation repartitioning does not observe reset pooled messages. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
3e263c6 to
11233d3
Compare
Summary
MessagePooland ref-counted message ownership APIs for safe message reuse.Messageinstances and resets messages when they return to the pool.Messageinstances, and adds focused message pool tests.Validation
git diff --checkdotnet build src\Orleans.Core\Orleans.Core.csproj -mdotnet build src\Orleans.Runtime\Orleans.Runtime.csproj -mdotnet test test\Orleans.Core.Tests\Orleans.Core.Tests.csproj --filter MessagePool(20 passed)Dependencies / notes
NonSilo.Tests.csprojwas not present on the current base, so message pool tests live underOrleans.Core.Tests.Microsoft Reviewers: Open in CodeFlow