Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Orleans.Core/Messaging/ClientMessageCenter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -370,13 +370,15 @@ public void RejectMessage(Message msg, string reason, Exception exc = null)
if (msg.Direction != Message.Directions.Request)
{
LogDroppingMessage(msg, reason);
msg.ReleaseDropped("DroppedNonRequest");
}
else
{
LogRejectingMessage(msg, reason);
MessagingInstruments.OnRejectedMessage(msg);
var error = this.messageFactory.CreateRejectionResponse(msg, Message.RejectionTypes.Unrecoverable, reason, exc);
DispatchLocalMessage(error);
msg.ReleaseDropped("RejectedRequest");
}
}

Expand Down
105 changes: 103 additions & 2 deletions src/Orleans.Core/Messaging/Message.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
Expand All @@ -15,6 +16,14 @@ internal sealed class Message : ISpanFormattable
[NonSerialized]
private short _retryCount;

[NonSerialized]
private int _refCount;

#if DEBUG
[NonSerialized]
private string? _lastTransferTag;
#endif

public CoarseStopwatch _timeToExpiry;

public object? BodyObject { get; set; }
Expand Down Expand Up @@ -303,6 +312,73 @@ internal void AddToCacheInvalidationHeader(GrainAddress invalidAddress, GrainAdd
}
}

internal void InitializeRefCount()
{
// Messages are acquired once when checked out from the pool.
// Additional owners must call Acquire() and Release().
_refCount = 1;
#if DEBUG
_lastTransferTag = null;
#endif
}

internal void Acquire()
{
var newRefCount = Interlocked.Increment(ref _refCount);
Debug.Assert(newRefCount > 1);
}

internal void Release()
{
var newRefCount = Interlocked.Decrement(ref _refCount);
if (newRefCount == 0)
{
MessagePool.ReturnCore(this);
}
else if (newRefCount < 0)
{
// Ref count should never go negative - indicates a double release.
#if DEBUG
Debug.Fail($"Message ref count went negative. Last transfer tag: '{_lastTransferTag}'");
#else
Debug.Fail("Message ref count went negative.");
Comment on lines +340 to +344

Copilot AI Apr 30, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Release() only calls Debug.Fail when the ref count goes negative. In non-DEBUG builds that check is effectively a no-op, so double-release/use-after-release can silently corrupt the pool (eg, decrementing a message which has been re-acquired by another owner). Consider throwing/fail-fast (or at least gating return-to-pool with an additional state/version check) when newRefCount < 0 to prevent silent corruption in production.

Suggested change
// Ref count should never go negative - indicates a double release.
#if DEBUG
Debug.Fail($"Message ref count went negative. Last transfer tag: '{_lastTransferTag}'");
#else
Debug.Fail("Message ref count went negative.");
// Ref count should never go negative - indicates a double release or use-after-release.
#if DEBUG
throw new InvalidOperationException($"Message ref count went negative. Last transfer tag: '{_lastTransferTag}'");
#else
throw new InvalidOperationException("Message ref count went negative.");

Copilot uses AI. Check for mistakes.
#endif
}
}

[Conditional("DEBUG")]
internal void MarkTransferred(string tag)
{
#if DEBUG
_lastTransferTag = tag;
#endif
}

/// <summary>
/// Releases this message after it has been dropped (expired, rejected, blocked, etc).
/// Marks the transfer for debugging and releases the reference.
/// </summary>
/// <param name="reason">A short description of why the message was dropped.</param>
internal void ReleaseDropped(string reason)
{
MarkTransferred($"Dropped:{reason}");
Release();
}

/// <summary>
/// Asserts that this message has not been released (refcount > 0).
/// Only executes in DEBUG builds.
/// </summary>
[Conditional("DEBUG")]
internal void AssertNotReleased([System.Runtime.CompilerServices.CallerMemberName] string? caller = null)
{
#if DEBUG
var currentRefCount = Volatile.Read(ref _refCount);
Debug.Assert(currentRefCount > 0,
$"Message used after release. Caller: {caller}, RefCount: {currentRefCount}, LastTransfer: {_lastTransferTag}");
#endif
}

public override string ToString() => $"{this}";

string IFormattable.ToString(string? format, IFormatProvider? formatProvider) => ToString();
Expand Down Expand Up @@ -371,6 +447,31 @@ static bool Append(ref Span<char> dst, ReadOnlySpan<char> value)

internal bool IsPing() => _requestContextData?.TryGetValue(RequestContext.PING_APPLICATION_HEADER, out var value) == true && value is bool isPing && isPing;

/// <summary>
/// Resets the message to its default state for reuse.
/// </summary>
internal void Reset()
{
_retryCount = 0;
_timeToExpiry = default;
BodyObject = null;
_headers = default;
_id = default;
_refCount = 0;
#if DEBUG
_lastTransferTag = null;
#endif

_requestContextData = null;
_targetSilo = null;
_targetGrain = default;
_sendingSilo = null;
_sendingGrain = default;
_interfaceVersion = 0;
_interfaceType = default;
_cacheInvalidationHeader = null;
}

[Flags]
internal enum MessageFlags : ushort
{
Expand All @@ -386,10 +487,10 @@ internal enum MessageFlags : ushort
HasTimeToLive = 1 << 8,

// Message cannot be forwarded to another activation.
IsLocalOnly = 1 << 9,
IsLocalOnly = 1 << 9,

// Message must not trigger grain activation or extend an activation's lifetime.
SuppressKeepAlive = 1 << 10,
SuppressKeepAlive = 1 << 10,

// The most significant bit is reserved, possibly for use to indicate more data follows.
Reserved = 1 << 15,
Expand Down
46 changes: 21 additions & 25 deletions src/Orleans.Core/Messaging/MessageFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,14 @@ public MessageFactory(DeepCopier deepCopier, ILogger<MessageFactory> logger, Mes

public Message CreateMessage(object body, InvokeMethodOptions options)
{
var message = new Message
{
Direction = (options & InvokeMethodOptions.OneWay) != 0 ? Message.Directions.OneWay : Message.Directions.Request,
Id = GetNextCorrelationId(),
IsReadOnly = (options & InvokeMethodOptions.ReadOnly) != 0,
IsUnordered = (options & InvokeMethodOptions.Unordered) != 0,
IsAlwaysInterleave = (options & InvokeMethodOptions.AlwaysInterleave) != 0,
BodyObject = body,
RequestContextData = RequestContextExtensions.Export(_deepCopier),
};
var message = MessagePool.Get();
message.Direction = (options & InvokeMethodOptions.OneWay) != 0 ? Message.Directions.OneWay : Message.Directions.Request;
message.Id = GetNextCorrelationId();
message.IsReadOnly = (options & InvokeMethodOptions.ReadOnly) != 0;
message.IsUnordered = (options & InvokeMethodOptions.Unordered) != 0;
message.IsAlwaysInterleave = (options & InvokeMethodOptions.AlwaysInterleave) != 0;
message.BodyObject = body;
message.RequestContextData = RequestContextExtensions.Export(_deepCopier);

_messagingTrace.OnCreateMessage(message);
return message;
Expand All @@ -55,21 +53,19 @@ private CorrelationId GetNextCorrelationId()

public Message CreateResponseMessage(Message request)
{
var response = new Message
{
IsSystemMessage = request.IsSystemMessage,
Direction = Message.Directions.Response,
Id = request.Id,
IsReadOnly = request.IsReadOnly,
IsAlwaysInterleave = request.IsAlwaysInterleave,
TargetSilo = request.SendingSilo,
TargetGrain = request.SendingGrain,
SendingSilo = request.TargetSilo,
SendingGrain = request.TargetGrain,
CacheInvalidationHeader = request.CacheInvalidationHeader,
TimeToLive = request.TimeToLive,
RequestContextData = RequestContextExtensions.Export(_deepCopier),
};
var response = MessagePool.Get();
response.IsSystemMessage = request.IsSystemMessage;
response.Direction = Message.Directions.Response;
response.Id = request.Id;
response.IsReadOnly = request.IsReadOnly;
response.IsAlwaysInterleave = request.IsAlwaysInterleave;
response.TargetSilo = request.SendingSilo;
response.TargetGrain = request.SendingGrain;
response.SendingSilo = request.TargetSilo;
response.SendingGrain = request.TargetGrain;
response.CacheInvalidationHeader = request.CacheInvalidationHeader;
response.TimeToLive = request.TimeToLive;
response.RequestContextData = RequestContextExtensions.Export(_deepCopier);

_messagingTrace.OnCreateMessage(response);
return response;
Expand Down
120 changes: 120 additions & 0 deletions src/Orleans.Core/Messaging/MessagePool.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
#nullable enable
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;

namespace Orleans.Runtime
{
/// <summary>
/// A thread-local object pool for <see cref="Message"/> instances.
/// </summary>
internal static class MessagePool
{
private static readonly ThreadLocal<Stack<Message>> _messages = new(() => new());

#if DEBUG
/// <summary>
/// Tracks all messages that have been allocated but not returned to the pool.
/// Only available in DEBUG builds. Must be enabled via <see cref="EnableLeakTracking"/>.
/// </summary>
private static readonly ConcurrentDictionary<Message, MessageAllocationInfo> _outstandingMessages = new();

/// <summary>
/// When true, tracks all message allocations for leak detection.
/// Only available in DEBUG builds.
/// </summary>
public static bool EnableLeakTracking { get; set; }

/// <summary>
/// Gets all messages that have been allocated but not returned to the pool.
/// Only available in DEBUG builds and when <see cref="EnableLeakTracking"/> is true.
/// </summary>
public static IReadOnlyCollection<MessageAllocationInfo> GetOutstandingMessages()
{
return _outstandingMessages.Values.ToArray();
}

/// <summary>
/// Clears the outstanding messages tracking. Call this at the start of a test.
/// </summary>
public static void ClearLeakTracking()
{
_outstandingMessages.Clear();
}

/// <summary>
/// Information about a message allocation for leak tracking.
/// </summary>
public sealed class MessageAllocationInfo
{
public Message Message { get; }
public string AllocationStack { get; }
public DateTime AllocationTime { get; }

public MessageAllocationInfo(Message message, string allocationStack)
{
Message = message;
AllocationStack = allocationStack;
AllocationTime = DateTime.UtcNow;
}

public override string ToString() =>
$"Message allocated at {AllocationTime:HH:mm:ss.fff}, Direction={Message.Direction}, Id={Message.Id}\nStack:\n{AllocationStack}";
}
#endif

/// <summary>
/// The maximum number of messages to keep per thread.
/// </summary>
public static int MaxPoolSizePerThread { get; set; } = 128;

/// <summary>
/// Gets a message from the pool, or creates a new one if the pool is empty.
/// </summary>
public static Message Get()
{
var stack = _messages.Value!;
if (!stack.TryPop(out var message))
{
message = new Message();
}

message.InitializeRefCount();

#if DEBUG
if (EnableLeakTracking)
{
var info = new MessageAllocationInfo(message, Environment.StackTrace);
_outstandingMessages[message] = info;
}
#endif

return message;
}

/// <summary>
/// Returns a message to the pool after resetting it.

Copilot AI Apr 30, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The XML doc says Return "Returns a message to the pool after resetting it", but the implementation delegates to message.Release(), which only resets/returns when the ref count reaches 0. Consider adjusting the doc comment (or renaming) to reflect that Return is just a release of the caller’s reference and may not return to the pool if there are other owners.

Suggested change
/// Returns a message to the pool after resetting it.
/// Releases the caller's reference to a message.
/// The message is reset and returned to the pool only when its reference count reaches zero.

Copilot uses AI. Check for mistakes.
/// </summary>
public static void Return(Message message) => message.Release();

internal static void ReturnCore(Message message)
{
#if DEBUG
if (EnableLeakTracking)
{
_outstandingMessages.TryRemove(message, out _);
}
#endif

message.Reset();

var stack = _messages.Value!;
if (stack.Count < MaxPoolSizePerThread)
{
stack.Push(message);
}
}
}
}
3 changes: 2 additions & 1 deletion src/Orleans.Core/Messaging/MessageSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public MessageSerializer(
var body = input.Slice(bodyOffset, bodyLength);

// Build message
message = new();
message = MessagePool.Get();

if (header.IsSingleSegment)
{
var headersReader = Reader.Create(header.First.Span, _deserializationSession);
Expand Down
17 changes: 17 additions & 0 deletions src/Orleans.Core/Networking/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,12 @@ private async Task ProcessOutgoing()
{
throw;
}

if (message is not null)
{
inflight.Remove(message);
message = null;
}
}

var flushResult = await output.FlushAsync();
Expand All @@ -382,6 +388,13 @@ private async Task ProcessOutgoing()
break;
}

// Release the send pipeline's reference after bytes have been flushed.
foreach (var msg in inflight)
{
msg.MarkTransferred("Connection.ProcessOutgoing:Sent");
msg.Release();
}

inflight.Clear();
}
Comment on lines 385 to 399

Copilot AI Apr 30, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ProcessOutgoing breaks on flushResult.IsCompleted || flushResult.IsCanceled before releasing the messages still in inflight. With ref-counted pooled messages, that will leak references on connection shutdown/cancel. Ensure inflight messages are released/cleared on all exit paths (including completed/canceled flush and exceptions), e.g., release before break and/or in finally.

Copilot uses AI. Check for mistakes.
}
Expand Down Expand Up @@ -490,6 +503,8 @@ private bool HandleSendMessageFailure(Message message, Exception exception)
response.BodyObject = Response.FromException(exception);

this.MessageCenter.DispatchLocalMessage(response);
message.MarkTransferred("Connection.HandleSendMessageFailure:RequestFailed");
message.Release();
}
else if (message.Direction == Message.Directions.Response && message.RetryCount < MessagingOptions.DEFAULT_MAX_MESSAGE_SEND_RETRIES)
{
Expand All @@ -509,6 +524,8 @@ private bool HandleSendMessageFailure(Message message, Exception exception)
message);

MessagingInstruments.OnDroppedSentMessage(message);
message.MarkTransferred("Connection.HandleSendMessageFailure:Dropped");
message.Release();
}

return true;
Expand Down
Loading
Loading