Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Orleans.Runtime/Catalog/ActivationData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ internal sealed partial class ActivationData :
private readonly WorkItemGroup _workItemGroup;
private readonly List<(Message Message, CoarseStopwatch QueuedTime)> _waitingRequests = new();
private readonly Dictionary<Message, CoarseStopwatch> _runningRequests = new();
private readonly SingleWaiterAutoResetEvent _workSignal = new() { RunContinuationsAsynchronously = true };
private readonly ActivationAutoResetEvent _workSignal;
private GrainLifecycle? _lifecycle;
private Queue<object>? _pendingOperations;
private Message? _blockingRequest;
Expand Down Expand Up @@ -101,6 +101,7 @@ public ActivationData(
Debug.Assert(_serviceScope != null, "_serviceScope must not be null.");
_workItemGroup = createWorkItemGroup(this);
Debug.Assert(_workItemGroup != null, "_workItemGroup must not be null.");
_workSignal = new(_workItemGroup);
}

internal void SetActivationActivity(Activity activity)
Expand Down
204 changes: 204 additions & 0 deletions src/Orleans.Runtime/Versions/ActivationAutoResetEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
#nullable enable

using System;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;
using Orleans.Runtime.Scheduler;

namespace Orleans.Runtime;

/// <summary>
/// Represents a synchronization event that, when signaled, resets automatically after releasing a single waiter.
/// This type supports concurrent signalers but only a single waiter.
/// </summary>
internal sealed class ActivationAutoResetEvent(WorkItemGroup scheduler) : IValueTaskSource
{
// Signaled indicates that the event has been signaled and not yet reset.
private const uint SignaledFlag = 1;

// Waiting indicates that a waiter is present and waiting for the event to be signaled.
private const uint WaitingFlag = 1 << 1;

// ResetMask is used to clear both status flags.
private const uint ResetMask = ~SignaledFlag & ~WaitingFlag;

private ActivationValueTaskSource _waitSource = new(scheduler);
private volatile uint _status;

ValueTaskSourceStatus IValueTaskSource.GetStatus(short token) => _waitSource.GetStatus(token);

void IValueTaskSource.OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags)
=> _waitSource.OnCompleted(continuation, state, token, flags);

void IValueTaskSource.GetResult(short token)
{
_waitSource.GetResult(token);
_waitSource.Reset();
ResetStatus();
}

/// <summary>
/// Signal the waiter.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Signal()
{
if ((_status & SignaledFlag) == SignaledFlag)
{
return;
}

var status = Interlocked.Or(ref _status, SignaledFlag);
if ((status & SignaledFlag) != SignaledFlag && (status & WaitingFlag) == WaitingFlag)
{
Debug.Assert((_status & (SignaledFlag | WaitingFlag)) == (SignaledFlag | WaitingFlag));
_waitSource.SetResult();
}
}

/// <summary>
/// Wait for the event to be signaled.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ValueTask WaitAsync()
{
var status = Interlocked.Or(ref _status, WaitingFlag);
if ((status & WaitingFlag) == WaitingFlag)
{
ThrowConcurrentWaitersNotSupported();
}

if ((status & SignaledFlag) == SignaledFlag)
{
ResetStatus();
return default;
}

return new(this, _waitSource.Version);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void ResetStatus()
{
var status = Interlocked.And(ref _status, ResetMask);
Debug.Assert((status & (WaitingFlag | SignaledFlag)) == (WaitingFlag | SignaledFlag));
}

private static void ThrowConcurrentWaitersNotSupported() => throw new InvalidOperationException("Concurrent waiters are not supported");

[StructLayout(LayoutKind.Auto)]
private struct ActivationValueTaskSource
{
private static readonly Action<object?> Sentinel = CompletionSentinel;

private Action<object?>? _continuation;
private object? _continuationState;
private readonly WorkItemGroup _scheduler;
private short _version;
private bool _completed;

public ActivationValueTaskSource(WorkItemGroup scheduler) : this()
{
_scheduler = scheduler;
}

public readonly short Version => _version;

public ValueTaskSourceStatus GetStatus(short token)
{
ValidateToken(token);

// If completion wins the race but has not yet stored the sentinel, force OnCompleted to schedule the continuation.
return Volatile.Read(ref _continuation) is null || !_completed
? ValueTaskSourceStatus.Pending
: ValueTaskSourceStatus.Succeeded;
}

[StackTraceHidden]
public readonly void GetResult(short token)
{
if (token != _version || !_completed)
{
ThrowInvalidOperationException();
}
}

public void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags)
{
ArgumentNullException.ThrowIfNull(continuation);
ValidateToken(token);

object? storedContinuation = _continuation;
if (storedContinuation is null)
{
_continuationState = state;
storedContinuation = Interlocked.CompareExchange(ref _continuation, continuation, null);
if (storedContinuation is null)
{
return;
}
}

if (!ReferenceEquals(storedContinuation, Sentinel))
{
ThrowInvalidOperationException();
}

QueueContinuation(continuation, state);
}
Comment on lines +131 to +153

public void SetResult()
{
if (_completed)
{
ThrowInvalidOperationException();
}

_completed = true;

var continuation =
Volatile.Read(ref _continuation) ??
Interlocked.CompareExchange(ref _continuation, Sentinel, null);

if (continuation is not null)
{
QueueContinuation(continuation, _continuationState);
}
}

public void Reset()
{
_version++;
_continuation = null;
_continuationState = null;
_completed = false;
}

private readonly void QueueContinuation(Action<object?> continuation, object? state)
{
_scheduler.QueueAction(continuation, state!);
}

private readonly void ValidateToken(short token)
{
if (token != _version)
{
ThrowInvalidOperationException();
}
}

private static void CompletionSentinel(object? _)
{
Debug.Fail("The sentinel delegate should never be invoked.");
throw new InvalidOperationException("The sentinel delegate should never be invoked.");
}

[DoesNotReturn, StackTraceHidden]
private static void ThrowInvalidOperationException() => throw new InvalidOperationException();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#nullable enable

using Microsoft.Extensions.Logging.Abstractions;
using Orleans.Runtime;
using UnitTests.TesterInternal;
using Xunit;

namespace UnitTests.SchedulerTests;

[TestCategory("BVT"), TestCategory("Scheduler")]
public class ActivationAutoResetEventTests
{
[Fact]
public async Task SignalBeforeWaitCompletesSynchronously()
{
using var context = UnitTestSchedulingContext.Create(NullLoggerFactory.Instance);
var signal = new ActivationAutoResetEvent(context.WorkItemGroup);

signal.Signal();
var wait = signal.WaitAsync();

Assert.True(wait.IsCompletedSuccessfully);
await wait;
}

[Fact]
public async Task WaitBeforeSignalCompletesAfterSignal()
{
using var context = UnitTestSchedulingContext.Create(NullLoggerFactory.Instance);
var signal = new ActivationAutoResetEvent(context.WorkItemGroup);

var wait = signal.WaitAsync();
Assert.False(wait.IsCompleted);

signal.Signal();

await wait.AsTask().WaitAsync(TimeSpan.FromSeconds(5));
}

[Fact]
public async Task SignalCanBeReusedAcrossRepeatedCycles()
{
using var context = UnitTestSchedulingContext.Create(NullLoggerFactory.Instance);
var signal = new ActivationAutoResetEvent(context.WorkItemGroup);

signal.Signal();
await signal.WaitAsync();

var wait = signal.WaitAsync();
Assert.False(wait.IsCompleted);

signal.Signal();
await wait.AsTask().WaitAsync(TimeSpan.FromSeconds(5));

signal.Signal();
await signal.WaitAsync();
}

[Fact]
public async Task ConcurrentWaitersAreRejected()
{
using var context = UnitTestSchedulingContext.Create(NullLoggerFactory.Instance);
var signal = new ActivationAutoResetEvent(context.WorkItemGroup);

var wait = signal.WaitAsync();
Assert.False(wait.IsCompleted);
Assert.Throws<InvalidOperationException>(() => signal.WaitAsync());

signal.Signal();
await wait.AsTask().WaitAsync(TimeSpan.FromSeconds(5));
}

[Fact]
public async Task ContinuationRunsOnScheduler()
{
using var context = UnitTestSchedulingContext.Create(NullLoggerFactory.Instance);
var signal = new ActivationAutoResetEvent(context.WorkItemGroup);
var observedContext = new TaskCompletionSource<IGrainContext?>(TaskCreationOptions.RunContinuationsAsynchronously);

var wait = WaitAndCaptureContext(signal, observedContext);
signal.Signal();

Assert.Same(context, await observedContext.Task.WaitAsync(TimeSpan.FromSeconds(5)));
await wait.WaitAsync(TimeSpan.FromSeconds(5));
}

[Fact]
public async Task WaitAndSignalRaceCompletesRepeatedly()
{
using var context = UnitTestSchedulingContext.Create(NullLoggerFactory.Instance);
var signal = new ActivationAutoResetEvent(context.WorkItemGroup);

for (var i = 0; i < 1_000; i++)
{
var wait = signal.WaitAsync();
_ = Task.Run(signal.Signal);
await wait.AsTask().WaitAsync(TimeSpan.FromSeconds(5));
}
}

private static async Task WaitAndCaptureContext(ActivationAutoResetEvent signal, TaskCompletionSource<IGrainContext?> observedContext)
{
await signal.WaitAsync();
observedContext.SetResult(RuntimeContext.Current);
}
}
Loading