Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
270 changes: 245 additions & 25 deletions src/Orleans.Core/Runtime/CallbackData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,122 @@ namespace Orleans.Runtime
{
internal sealed partial class CallbackData
{
private readonly SharedCallbackData shared;
private readonly IResponseCompletionSource context;
private readonly ApplicationRequestInstruments _applicationRequestInstruments;
private SharedCallbackData shared = null!;
private IResponseCompletionSource context = null!;
private ApplicationRequestInstruments _applicationRequestInstruments = null!;
private int completed;
private StatusResponse? lastKnownStatus;
private ValueStopwatch stopwatch;
private CancellationTokenRegistration _cancellationTokenRegistration;
private CorrelationId _correlationId;
private int _refCount;

/// <summary>
/// Parameterless constructor for object pooling.
/// </summary>
internal CallbackData()
{
}

public CallbackData(
SharedCallbackData shared,
IResponseCompletionSource ctx,
Message msg,
ApplicationRequestInstruments applicationRequestInstruments)
{
Initialize(shared, ctx, msg, applicationRequestInstruments);
}

/// <summary>
/// Initializes the callback data for use. Called after retrieving from the pool.
/// Does NOT set ref count - that is done by <see cref="CallbackDataOwner"/> constructor.
/// </summary>
public void Initialize(SharedCallbackData shared, IResponseCompletionSource ctx, Message msg, ApplicationRequestInstruments applicationRequestInstruments)
{
Debug.Assert(_refCount == 0, "CallbackData ref count should be 0 before initialization");
this.shared = shared;
this.context = ctx;
this.Message = msg;
_applicationRequestInstruments = applicationRequestInstruments;
this._correlationId = msg.Id;
this.stopwatch = ValueStopwatch.StartNew();
}

public Message Message { get; } // might hold metadata used by response pipeline
/// <summary>
/// Resets the callback data for return to the pool.
/// </summary>
internal void Reset()
{
Debug.Assert(_refCount == 0, "CallbackData ref count should be 0 before reset");
shared = null!;
context = null!;
_applicationRequestInstruments = null!;
completed = 0;
lastKnownStatus = null;
stopwatch = default;
_cancellationTokenRegistration.Dispose();
_cancellationTokenRegistration = default;
_correlationId = default;
Message = null!;
}

/// <summary>
/// Acquires the initial owner reference. Must only be called once after initialization.
/// </summary>
/// <returns>The previous ref count (should be 0).</returns>
internal int AcquireOwnerReference()
{
return Interlocked.Increment(ref _refCount) - 1;
}

/// <summary>
/// Attempts to acquire a borrowed lease on this callback, incrementing the ref count.
/// Returns true only if the ref count is positive (object is still owned).
/// </summary>
/// <returns>True if the lease was acquired, false if the object is being/has been returned to pool.</returns>
internal bool TryAcquireLease()
{
// Spin until we either successfully increment or detect ref count is 0
while (true)
{
var currentRefCount = Volatile.Read(ref _refCount);

// If ref count is 0 or negative, the object is being returned to pool or already pooled
if (currentRefCount <= 0)
{
return false;
}

// Try to increment the ref count
if (Interlocked.CompareExchange(ref _refCount, currentRefCount + 1, currentRefCount) == currentRefCount)
{
return true;
}

// CAS failed, spin and retry
}
}

/// <summary>
/// Releases a lease on this callback, decrementing the ref count.
/// If the ref count reaches zero, returns the callback to the pool.
/// </summary>
internal void ReleaseLease()
{
var newRefCount = Interlocked.Decrement(ref _refCount);
if (newRefCount == 0)
{
CallbackDataPool.ReturnCore(this);
}
else if (newRefCount < 0)
{
// This should never happen - indicates a bug
Debug.Fail("CallbackData ref count went negative");
throw new InvalidOperationException("CallbackData ref count went negative - indicates a double release bug");
}
}

public Message Message { get; private set; } = null!; // might hold metadata used by response pipeline

public bool IsCompleted => this.completed == 1;

Expand Down Expand Up @@ -59,8 +153,16 @@ private void SignalCancellation()
}
}

public void OnStatusUpdate(StatusResponse status)
public void OnStatusUpdate(CorrelationId messageId, StatusResponse status)
{
// Validate that the status update is for this callback instance.
// This is necessary because the callback may have been returned to the pool
// and reused for a different message between TryGetValue and OnStatusUpdate.
if (_correlationId != messageId)
{
return;
}

this.lastKnownStatus = status;
}

Expand Down Expand Up @@ -102,12 +204,22 @@ private void OnCancellation()

stopwatch.Stop();
SignalCancellation();
shared.Unregister(Message);
_applicationRequestInstruments.OnAppRequestsEnd((long)stopwatch.Elapsed.TotalMilliseconds);
_applicationRequestInstruments.OnAppRequestsTimedOut();
OrleansCallBackDataEvent.Instance.OnCanceled(Message);
context.Complete(Response.FromException(new OperationCanceledException(_cancellationTokenRegistration.Token)));

// Capture locals before Unregister, which may return this to the pool
var elapsedMs = (long)stopwatch.Elapsed.TotalMilliseconds;
var msg = Message;
var ctx = context;
var instruments = _applicationRequestInstruments;
var cancellationToken = _cancellationTokenRegistration.Token;
_cancellationTokenRegistration.Dispose();

// Unregister last - this may return the callback to the pool
shared.Unregister(msg);

instruments.OnAppRequestsEnd(elapsedMs);
instruments.OnAppRequestsCanceled();
OrleansCallBackDataEvent.Instance.OnCanceled(msg);
ctx.Complete(Response.FromException(new OperationCanceledException(cancellationToken)));
}

public void OnTimeout()
Expand All @@ -123,21 +235,28 @@ public void OnTimeout()
SignalCancellation();
}

this.shared.Unregister(this.Message);
// Capture locals before Unregister, which may return this to the pool
_cancellationTokenRegistration.Dispose();
_applicationRequestInstruments.OnAppRequestsEnd((long)this.stopwatch.Elapsed.TotalMilliseconds);
_applicationRequestInstruments.OnAppRequestsTimedOut();
var elapsedMs = (long)this.stopwatch.Elapsed.TotalMilliseconds;
var msg = this.Message;
var statusMessage = lastKnownStatus is StatusResponse status ? $"Last known status is {status}. " : string.Empty;
var timeout = GetResponseTimeout();
var logger = this.shared.Logger;
var ctx = this.context;
var instruments = _applicationRequestInstruments;

OrleansCallBackDataEvent.Instance.OnTimeout(this.Message);
// Unregister last - this may return the callback to the pool
this.shared.Unregister(msg);

var msg = this.Message; // Local working copy
instruments.OnAppRequestsEnd(elapsedMs);
instruments.OnAppRequestsTimedOut();

var statusMessage = lastKnownStatus is StatusResponse status ? $"Last known status is {status}. " : string.Empty;
var timeout = GetResponseTimeout();
LogTimeout(this.shared.Logger, timeout, msg, statusMessage);
OrleansCallBackDataEvent.Instance.OnTimeout(msg);

LogTimeout(logger, timeout, msg, statusMessage);

var exception = new TimeoutException($"Response did not arrive on time in {timeout} for message: {msg}. {statusMessage}");
context.Complete(Response.FromException(exception));
ctx.Complete(Response.FromException(exception));
}

public void OnTargetSiloFail()
Expand All @@ -148,16 +267,24 @@ public void OnTargetSiloFail()
}

this.stopwatch.Stop();
this.shared.Unregister(this.Message);
_cancellationTokenRegistration.Dispose();
_applicationRequestInstruments.OnAppRequestsEnd((long)this.stopwatch.Elapsed.TotalMilliseconds);

OrleansCallBackDataEvent.Instance.OnTargetSiloFail(this.Message);
// Capture locals before Unregister, which may return this to the pool
_cancellationTokenRegistration.Dispose();
var elapsedMs = (long)this.stopwatch.Elapsed.TotalMilliseconds;
var msg = this.Message;
var statusMessage = lastKnownStatus is StatusResponse status ? $"Last known status is {status}. " : string.Empty;
LogTargetSiloFail(this.shared.Logger, msg, statusMessage, Constants.TroubleshootingHelpLink);
var logger = this.shared.Logger;
var ctx = this.context;
var instruments = _applicationRequestInstruments;

// Unregister last - this may return the callback to the pool
this.shared.Unregister(msg);
instruments.OnAppRequestsEnd(elapsedMs);

OrleansCallBackDataEvent.Instance.OnTargetSiloFail(msg);
LogTargetSiloFail(logger, msg, statusMessage, Constants.TroubleshootingHelpLink);
var exception = new SiloUnavailableException($"The target silo became unavailable for message: {msg}. {statusMessage}See {Constants.TroubleshootingHelpLink} for troubleshooting help.");
this.context.Complete(Response.FromException(exception));
ctx.Complete(Response.FromException(exception));
}

public void DoCallback(Message response)
Expand Down Expand Up @@ -227,4 +354,97 @@ static void HandleRejectionResponse(IResponseCompletionSource context, Rejection
)]
private static partial void LogTargetSiloFail(ILogger logger, Message message, string statusMessage, string troubleshootingHelpLink);
}

/// <summary>
/// Owns a pooled <see cref="CallbackData"/> instance and manages its lifecycle.
/// This represents the "owner" - the code that creates this and stores it in the callbacks dictionary.
/// When the owner is done with the callback, it should call <see cref="Release"/> to return it to the pool.
/// The owner increments the ref count to 1 on construction.
/// </summary>
internal readonly struct CallbackDataOwner
{
/// <summary>
/// The callback data instance.
/// </summary>
public CallbackData Callback { get; }

public CallbackDataOwner(CallbackData callback)
{
Debug.Assert(callback is not null, "CallbackDataOwner requires a non-null callback");
Callback = callback;
var previousRefCount = callback.AcquireOwnerReference();
Debug.Assert(previousRefCount == 0, $"CallbackData ref count should have been 0 when creating owner, but was {previousRefCount}");
}

/// <summary>
/// Attempts to acquire a borrowed lease on the callback.
/// The returned lease MUST be disposed when done to release the reference count.
/// Use <see cref="CallbackDataLease.TryGetValue"/> to check if the lease is valid and get the callback.
/// </summary>
/// <returns>A lease that must be disposed. Check <see cref="CallbackDataLease.TryGetValue"/> to see if it's valid.</returns>
public CallbackDataLease Acquire()
{
Debug.Assert(Callback is not null, "CallbackDataOwner.Acquire called on default struct");
if (Callback.TryAcquireLease())
{
return new CallbackDataLease(Callback);
}

return default;
}

/// <summary>
/// Releases the owner's reference to the callback, potentially returning it to the pool.
/// This should be called when the callback is removed from the dictionary and is no longer needed.
/// </summary>
public void Release()
{
Debug.Assert(Callback is not null, "CallbackDataOwner.Release called on default struct");
Callback.ReleaseLease();
}
}

/// <summary>
/// A borrowed lease on a <see cref="CallbackData"/> instance.
/// This is a ref struct to ensure it cannot escape the current scope without being disposed.
/// Disposing the lease releases the reference count, potentially allowing the callback to be returned to the pool.
/// </summary>
internal ref struct CallbackDataLease
{
private CallbackData? _callback;

internal CallbackDataLease(CallbackData callback)
{
_callback = callback;
}

/// <summary>
/// Gets whether this lease is valid (successfully acquired a reference).
/// </summary>
public readonly bool IsValid => _callback is not null;

/// <summary>
/// Attempts to get the callback data if the lease is valid.
/// </summary>
/// <param name="callback">The callback data if valid, otherwise null.</param>
/// <returns>True if the lease is valid and the callback was returned.</returns>
public readonly bool TryGetValue([System.Diagnostics.CodeAnalysis.NotNullWhen(true)] out CallbackData? callback)
{
callback = _callback;
return callback is not null;
}

/// <summary>
/// Releases the lease, decrementing the reference count.
/// If this was the last reference, the callback will be returned to the pool.
/// </summary>
public void Dispose()
{
if (_callback is { } callback)
{
_callback = null;
callback.ReleaseLease();
}
}
}
}
59 changes: 59 additions & 0 deletions src/Orleans.Core/Runtime/CallbackDataPool.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#nullable enable
using System.Collections.Generic;
using System.Threading;

namespace Orleans.Runtime
{
/// <summary>
/// A thread-local object pool for <see cref="CallbackData"/> instances.
/// </summary>
internal static class CallbackDataPool
{
private static readonly ThreadLocal<Stack<CallbackData>> _callbacks = new(() => new());

/// <summary>
/// The maximum number of callbacks to keep per thread.
/// </summary>
public static int MaxPoolSizePerThread { get; set; } = 128;

/// <summary>
/// Gets a callback from the pool, or creates a new one if the pool is empty.
/// </summary>
public static CallbackData Get()
{
var stack = _callbacks.Value!;
if (stack.TryPop(out var callback))
{
return callback;
}

return new CallbackData();
}

/// <summary>
/// Returns a callback to the pool via the reference counting system.
/// This decrements the owner's reference count. The callback will be returned
/// to the pool when all leases have been released.
/// </summary>
/// <param name="owner">The owner of the callback.</param>
public static void Return(CallbackDataOwner owner)
{
owner.Release();
}

/// <summary>
/// Internal method called by <see cref="CallbackData.ReleaseLease"/> when ref count reaches zero.
/// Actually returns the callback to the pool after resetting it.
/// </summary>
internal static void ReturnCore(CallbackData callback)
{
callback.Reset();

var stack = _callbacks.Value!;
if (stack.Count < MaxPoolSizePerThread)
{
stack.Push(callback);
}
}
Comment on lines +10 to +57
}
}
Loading
Loading