Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Orleans.Core.Abstractions/Core/IGrainContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public interface IWorkItemScheduler
/// </summary>
/// <param name="action">The work item.</param>
/// <param name="state">The state passed when invoking the item.</param>
void QueueAction(Action<object> action, object state);
void QueueAction(Action<object?> action, object? state);
Comment on lines 204 to +207
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Orleans.Configuration;
Expand Down Expand Up @@ -61,7 +62,7 @@ private partial class ActivationDataActivator : IGrainContextActivator
private readonly IServiceProvider _serviceProvider;
private readonly GrainTypeSharedContext _sharedComponents;
private readonly Func<IGrainContext, WorkItemGroup> _createWorkItemGroup;
private readonly Action<object?> _startActivation;
private readonly SendOrPostCallback _startActivation;

public ActivationDataActivator(
IGrainActivator grainActivator,
Expand All @@ -82,7 +83,7 @@ public ActivationDataActivator(
_workItemGroupLogger,
_activationTaskSchedulerLogger,
_schedulingOptions);
_startActivation = state => ((ActivationData)state!).Start(_grainActivator);
_startActivation = state => ((ActivationData)state!).Start(_grainActivator);
}

public IGrainContext CreateContext(GrainAddress activationAddress)
Expand All @@ -94,12 +95,7 @@ public IGrainContext CreateContext(GrainAddress activationAddress)
_sharedComponents);

using var ecSuppressor = ExecutionContext.SuppressFlow();
_ = Task.Factory.StartNew(
_startActivation,
context,
CancellationToken.None,
TaskCreationOptions.DenyChildAttach,
context.ActivationTaskScheduler);
context.WorkItemGroup.Post(_startActivation, context);
return context;
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/Orleans.Runtime/Catalog/ActivationData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ internal sealed partial class ActivationData :
private readonly WorkItemGroup _workItemGroup;
private readonly List<(Message Message, CoarseStopwatch QueuedTime)> _waitingRequests = new();
private readonly Dictionary<Message, CoarseStopwatch> _runningRequests = new();
private readonly SingleWaiterAutoResetEvent _workSignal = new() { RunContinuationsAsynchronously = true };
private readonly WorkItemGroupWaiter _workSignal;
private GrainLifecycle? _lifecycle;
private Queue<object>? _pendingOperations;
private Message? _blockingRequest;
Expand Down Expand Up @@ -99,6 +99,7 @@ public ActivationData(
Debug.Assert(_serviceScope != null, "_serviceScope must not be null.");
_workItemGroup = createWorkItemGroup(this);
Debug.Assert(_workItemGroup != null, "_workItemGroup must not be null.");
_workSignal = new WorkItemGroupWaiter(_workItemGroup);
}

internal void SetActivationActivity(Activity activity)
Expand Down Expand Up @@ -140,6 +141,8 @@ public void Start(IGrainActivator grainActivator)
}
}

public WorkItemGroup WorkItemGroup => _workItemGroup;

public ActivationTaskScheduler ActivationTaskScheduler => _workItemGroup.TaskScheduler;
public IGrainRuntime GrainRuntime => _shared.Runtime;
public object? GrainInstance { get; private set; }
Expand Down
2 changes: 1 addition & 1 deletion src/Orleans.Runtime/Scheduler/IWorkItem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ internal interface IWorkItem
IGrainContext GrainContext { get; }
void Execute();

internal static readonly Action<object> ExecuteWorkItem = state => ((IWorkItem)state).Execute();
internal static readonly Action<object?> ExecuteWorkItem = state => ((IWorkItem)state!).Execute();
}
}
6 changes: 3 additions & 3 deletions src/Orleans.Runtime/Scheduler/TaskSchedulerUtils.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#nullable enable
using System;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using Orleans.Runtime.Internal;

#nullable disable
namespace Orleans.Runtime.Scheduler
{
internal static class TaskSchedulerUtils
Expand All @@ -18,7 +18,7 @@ public static void QueueAction(this ActivationTaskScheduler taskScheduler, Actio
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void QueueAction(this ActivationTaskScheduler taskScheduler, Action<object> action, object state)
public static void QueueAction(this ActivationTaskScheduler taskScheduler, Action<object?> action, object? state)
{
using var suppressExecutionContext = new ExecutionContextSuppressor();

Expand All @@ -29,7 +29,7 @@ public static void QueueAction(this ActivationTaskScheduler taskScheduler, Actio
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void QueueWorkItem(this WorkItemGroup scheduler, IWorkItem workItem)
{
QueueAction(scheduler.TaskScheduler, IWorkItem.ExecuteWorkItem, workItem);
scheduler.QueueAction(IWorkItem.ExecuteWorkItem, workItem);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down
155 changes: 139 additions & 16 deletions src/Orleans.Runtime/Scheduler/WorkItemGroup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
Expand All @@ -12,8 +13,46 @@

namespace Orleans.Runtime.Scheduler;

internal readonly struct WorkItem
{
public enum WorkItemType : byte
{
Task = 0,
SendOrPostCallback = 1,
ActionOfObject = 2
}

public readonly object Callback;
public readonly object? State;
public readonly WorkItemType Type;

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public WorkItem(Task task)
{
Callback = task;
State = null;
Type = WorkItemType.Task;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public WorkItem(SendOrPostCallback callback, object? state)
{
Callback = callback;
State = state;
Type = WorkItemType.SendOrPostCallback;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public WorkItem(Action<object?> callback, object? state)
{
Callback = callback;
State = state;
Type = WorkItemType.ActionOfObject;
}
}

[DebuggerDisplay("WorkItemGroup Context={GrainContext} State={_state}")]
internal sealed partial class WorkItemGroup : IThreadPoolWorkItem, IWorkItemScheduler
internal sealed partial class WorkItemGroup : SynchronizationContext, IThreadPoolWorkItem, IWorkItemScheduler
{
private enum WorkGroupStatus : byte
{
Expand All @@ -28,7 +67,7 @@ private enum WorkGroupStatus : byte
#else
private readonly object _lockObj = new();
#endif
private readonly Queue<Task> _workItems = new();
private readonly Queue<WorkItem> _workItems = new();
private readonly SchedulingOptions _schedulingOptions;

private long _totalItemsEnqueued;
Expand All @@ -39,6 +78,9 @@ private enum WorkGroupStatus : byte
private Task? _currentTask;
private long _currentTaskStarted;

// Dummy task used to make TaskScheduler.Current return our scheduler
private readonly Task _schedulerTask;

internal ActivationTaskScheduler TaskScheduler { get; }

public IGrainContext GrainContext { get; set; }
Expand All @@ -60,6 +102,11 @@ public WorkItemGroup(
_state = WorkGroupStatus.Waiting;
_log = logger;
TaskScheduler = new ActivationTaskScheduler(this, activationTaskSchedulerLogger);

// Create a dummy task associated with our scheduler (never actually runs)
// We set m_taskScheduler directly so TaskScheduler.Current returns our scheduler
_schedulerTask = new Task(() => { }, TaskCreationOptions.None);
GetTaskSchedulerRef(_schedulerTask) = TaskScheduler;
}

/// <summary>
Expand All @@ -76,12 +123,17 @@ public void EnqueueTask(Task task)
}
#endif

EnqueueWorkItem(new WorkItem(task));
}

private void EnqueueWorkItem(WorkItem workItem)
{
lock (_lockObj)
{
long thisSequenceNumber = _totalItemsEnqueued++;
int count = _workItems.Count;

_workItems.Enqueue(task);
_workItems.Enqueue(workItem);
int maxPendingItemsLimit = _schedulingOptions.MaxPendingWorkItemsSoftLimit;
if (maxPendingItemsLimit > 0 && count > maxPendingItemsLimit)
{
Expand All @@ -103,7 +155,10 @@ public void EnqueueTask(Task task)
#if DEBUG
if (_log.IsEnabled(LogLevel.Trace))
{
LogTraceAddToRunQueue(_log, task, thisSequenceNumber, GrainContext);
_log.LogTrace(
"Add to RunQueue #{SequenceNumber}, onto {GrainContext}",
thisSequenceNumber,
GrainContext);
}
#endif
ScheduleExecution(this);
Expand All @@ -121,9 +176,12 @@ private void LogTooManyTasksInQueue(int count, int maxPendingItemsLimit)
/// </summary>
internal IEnumerable<Task> GetScheduledTasks()
{
foreach (var task in _workItems)
lock (_lockObj)
{
yield return task;
var tasks = _workItems
.Where(item => item.Type == WorkItem.WorkItemType.Task)
.Select(item => Unsafe.As<Task>(item.Callback));
return [.. tasks];
}
}

Expand All @@ -133,6 +191,11 @@ internal IEnumerable<Task> GetScheduledTasks()
public void Execute()
{
RuntimeContext.SetExecutionContext(GrainContext, out var originalContext);

// Set t_currentTask so TaskScheduler.Current returns our ActivationTaskScheduler
var previousTask = GetCurrentTask();
SetCurrentTask(_schedulerTask);

var turnWarningDurationMs = (long)Math.Ceiling(_schedulingOptions.TurnWarningLengthThreshold.TotalMilliseconds);
var activationSchedulingQuantumMs = (long)_schedulingOptions.ActivationSchedulingQuantum.TotalMilliseconds;
try
Expand All @@ -143,15 +206,15 @@ public void Execute()
loopStart = taskStart = taskEnd = Environment.TickCount64;
do
{
Task task;
WorkItem workItem;
lock (_lockObj)
{
_state = WorkGroupStatus.Running;

// Get the first Work Item on the list
if (_workItems.Count > 0)
{
_currentTask = task = _workItems.Dequeue();
workItem = _workItems.Dequeue();
_currentTaskStarted = taskStart;
}
else
Expand All @@ -161,12 +224,27 @@ public void Execute()
}
}

#if DEBUG
LogTaskStart(task);
#endif
try
{
TaskScheduler.RunTaskFromWorkItemGroup(task);
switch (workItem.Type)
{
case WorkItem.WorkItemType.Task:
{
var task = Unsafe.As<Task>(workItem.Callback);
_currentTask = task;
#if DEBUG
LogTaskStart(task);
#endif
TaskScheduler.RunTaskFromWorkItemGroup(task);
}
break;
case WorkItem.WorkItemType.SendOrPostCallback:
Unsafe.As<SendOrPostCallback>(workItem.Callback)(workItem.State);
break;
case WorkItem.WorkItemType.ActionOfObject:
Unsafe.As<Action<object?>>(workItem.Callback)(workItem.State);
break;
}
}
finally
{
Expand All @@ -177,7 +255,10 @@ public void Execute()
if (taskDurationMs > turnWarningDurationMs)
{
SchedulerInstruments.LongRunningTurnsCounter.Add(1);
LogLongRunningTurn(task, taskDurationMs);
if (workItem.Type == WorkItem.WorkItemType.Task)
{
LogLongRunningTurn(Unsafe.As<Task>(workItem.Callback), taskDurationMs);
}
}

_currentTask = null;
Expand Down Expand Up @@ -206,7 +287,7 @@ public void Execute()
_state = WorkGroupStatus.Waiting;
}
}

SetCurrentTask(previousTask);
RuntimeContext.ResetExecutionContext(originalContext);
}
}
Expand Down Expand Up @@ -281,8 +362,8 @@ public string DumpStatus()
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void ScheduleExecution(WorkItemGroup workItem) => ThreadPool.UnsafeQueueUserWorkItem(workItem, preferLocal: true);

public void QueueAction(Action action) => TaskScheduler.QueueAction(action);
public void QueueAction(Action<object> action, object state) => TaskScheduler.QueueAction(action, state);
public void QueueAction(Action action) => EnqueueWorkItem(new WorkItem((Action<object?>)(static state => ((Action)state!)()), action));
public void QueueAction(Action<object?> action, object? state) => EnqueueWorkItem(new WorkItem(action, state));
public void QueueTask(Task task) => task.Start(TaskScheduler);

[LoggerMessage(
Expand Down Expand Up @@ -323,4 +404,46 @@ public string DumpStatus()
Message = "Task {Task} in WorkGroup {GrainContext} took elapsed time {Duration} for execution, which is longer than {TurnWarningLengthThreshold}. Running on thread {Thread}"
)]
private static partial void LogWarningLongRunningTurn(ILogger logger, object task, string grainContext, string duration, TimeSpan turnWarningLengthThreshold, string thread);

#region SynchronizationContext overrides

/// <summary>
/// Asynchronously posts a callback to be executed on this WorkItemGroup.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public override void Post(SendOrPostCallback d, object? state) => EnqueueWorkItem(new WorkItem(d, state));

/// <summary>
/// Synchronously sends a callback. Not supported - throws.
/// </summary>
public override void Send(SendOrPostCallback d, object? state) => throw new NotSupportedException();

/// <summary>
/// Creates a copy (returns same instance for single-threaded behavior).
/// </summary>
public override SynchronizationContext CreateCopy() => this;

#endregion

#region UnsafeAccessor methods for Task internals

/// <summary>
/// Gets a reference to the thread-static Task.t_currentTask field.
/// </summary>
[UnsafeAccessor(UnsafeAccessorKind.StaticField, Name = "t_currentTask")]
private static extern ref Task? GetCurrentTaskRef(Task? _);

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static Task? GetCurrentTask() => GetCurrentTaskRef(null);

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void SetCurrentTask(Task? task) => GetCurrentTaskRef(null) = task;

/// <summary>
/// Sets the internal m_taskScheduler field on a Task.
/// </summary>
[UnsafeAccessor(UnsafeAccessorKind.Field, Name = "m_taskScheduler")]
private static extern ref TaskScheduler? GetTaskSchedulerRef(Task task);

#endregion
}
Loading
Loading