diff --git a/src/Orleans.Runtime/Catalog/ActivationCollector.cs b/src/Orleans.Runtime/Catalog/ActivationCollector.cs index ef7144a4efe..9cab47c93a2 100644 --- a/src/Orleans.Runtime/Catalog/ActivationCollector.cs +++ b/src/Orleans.Runtime/Catalog/ActivationCollector.cs @@ -103,7 +103,7 @@ public int GetNumRecentlyUsed(TimeSpan recencyPeriod) /// public void ScheduleCollection(ICollectibleGrainContext item, TimeSpan timeout, DateTime now) { - lock (item) + lock (ActivationData.GetSynchronizationLock(item)) { if (item.IsExemptFromCollection) { @@ -131,7 +131,7 @@ public bool TryCancelCollection(ICollectibleGrainContext item) if (item is null) return false; if (item.IsExemptFromCollection) return false; - lock (item) + lock (ActivationData.GetSynchronizationLock(item)) { DateTime ticket = item.CollectionTicket; if (default == ticket) return false; @@ -154,7 +154,7 @@ public bool TryRescheduleCollection(ICollectibleGrainContext item) { if (item.IsExemptFromCollection) return false; - lock (item) + lock (ActivationData.GetSynchronizationLock(item)) { if (TryRescheduleCollection_Impl(item, item.CollectionAgeLimit)) return true; @@ -236,7 +236,7 @@ public List ScanStale() // If the activation is to be reactivated, it's our job to clear the activation's copy of the ticket. foreach (var activation in activations) { - lock (activation) + lock (ActivationData.GetSynchronizationLock(activation)) { activation.CollectionTicket = default; if (!activation.IsValid) @@ -283,7 +283,7 @@ public List ScanAll(TimeSpan ageLimit) foreach (var kvp in bucket.Items) { var activation = kvp.Value; - lock (activation) + lock (ActivationData.GetSynchronizationLock(activation)) { if (!activation.IsValid) { @@ -691,7 +691,7 @@ public void Add(ICollectibleGrainContext item) public bool TryRemove(ICollectibleGrainContext item) { - lock (item) + lock (ActivationData.GetSynchronizationLock(item)) { if (item.CollectionTicket == default) { @@ -711,7 +711,7 @@ public List CancelAll() { // Attempt to cancel the item. if we succeed, it wasn't already cancelled and we can return it. otherwise, we silently ignore it. var item = pair.Value; - lock (item) + lock (ActivationData.GetSynchronizationLock(item)) { if (item.CollectionTicket == default) { diff --git a/src/Orleans.Runtime/Catalog/ActivationData.cs b/src/Orleans.Runtime/Catalog/ActivationData.cs index 02e7fa5a40b..7e44b43bb26 100644 --- a/src/Orleans.Runtime/Catalog/ActivationData.cs +++ b/src/Orleans.Runtime/Catalog/ActivationData.cs @@ -22,8 +22,7 @@ namespace Orleans.Runtime; /// /// Maintains additional per-activation state that is required for Orleans internal operations. -/// MUST lock this object for any concurrent access -/// MUST lock on `this` object because there are locks taken on ActivationData instances in various places in the codebase such as ActivationCollector.ScheduleCollection. +/// MUST use the activation synchronization lock for any concurrent access. /// Consider: compartmentalize by usage, e.g., using separate interfaces for data for catalog, etc. /// [DebuggerDisplay("GrainId = {GrainId}, State = {State}, Waiting = {WaitingCount}, Executing = {IsCurrentlyExecuting}")] @@ -40,6 +39,7 @@ internal sealed partial class ActivationData : IDisposable { private const string GrainAddressMigrationContextKey = "sys.addr"; + private readonly object _lock = new(); private readonly GrainTypeSharedContext _shared; private readonly IServiceScope _serviceScope; private readonly WorkItemGroup _workItemGroup; @@ -118,8 +118,8 @@ internal void SetActivationActivity(Activity activity) public void Start(IGrainActivator grainActivator) { Debug.Assert(Equals(ActivationTaskScheduler, TaskScheduler.Current)); - // locking on `this` is intentional as there are other places in the codebase taking locks on ActivationData instances - lock (this) + // The activation synchronization lock protects mutable activation state. + lock (_lock) { try { @@ -148,6 +148,7 @@ public void Start(IGrainActivator grainActivator) public ActivationState State { get; private set; } = ActivationState.Creating; public PlacementStrategy PlacementStrategy => _shared.PlacementStrategy; public DateTime CollectionTicket { get; set; } + internal object SynchronizationLock => _lock; public IServiceProvider ActivationServices => _serviceScope.ServiceProvider; public ActivationId ActivationId => Address.ActivationId; public IGrainLifecycle ObservableLifecycle @@ -155,12 +156,15 @@ public IGrainLifecycle ObservableLifecycle get { if (_lifecycle is { } lifecycle) return lifecycle; - lock (this) { return _lifecycle ??= new GrainLifecycle(_shared.Logger); } + lock (_lock) { return _lifecycle ??= new GrainLifecycle(_shared.Logger); } } } internal GrainTypeSharedContext Shared => _shared; + internal static object GetSynchronizationLock(ICollectibleGrainContext context) => + context is ActivationData activation ? activation._lock : context; + public GrainId GrainId => Address.GrainId; public bool IsExemptFromCollection => _shared.CollectionAgeLimit == Timeout.InfiniteTimeSpan; public DateTime KeepAliveUntil { get; set; } = DateTime.MinValue; @@ -186,7 +190,7 @@ public SiloAddress? ForwardingAddress get => _extras?.ForwardingAddress; set { - lock (this) + lock (_lock) { _extras ??= new(); _extras.ForwardingAddress = value; @@ -203,7 +207,7 @@ public GrainAddress? PreviousRegistration get => _extras?.PreviousRegistration; set { - lock (this) + lock (_lock) { _extras ??= new(); _extras.PreviousRegistration = value; @@ -218,7 +222,7 @@ private DeactivationReason DeactivationReason get => _extras?.DeactivationReason ?? default; set { - lock (this) + lock (_lock) { _extras ??= new(); _extras.DeactivationReason = value; @@ -231,7 +235,7 @@ private HashSet? Timers get => _extras?.Timers; set { - lock (this) + lock (_lock) { _extras ??= new(); _extras.Timers = value; @@ -244,7 +248,7 @@ private DateTime? DeactivationStartTime get => _extras?.DeactivationStartTime; set { - lock (this) + lock (_lock) { _extras ??= new(); _extras.DeactivationStartTime = value; @@ -257,7 +261,7 @@ private bool IsStuckDeactivating get => _extras?.IsStuckDeactivating ?? false; set { - lock (this) + lock (_lock) { _extras ??= new(); _extras.IsStuckDeactivating = value; @@ -270,7 +274,7 @@ private bool IsStuckProcessingMessage get => _extras?.IsStuckProcessingMessage ?? false; set { - lock (this) + lock (_lock) { _extras ??= new(); _extras.IsStuckProcessingMessage = value; @@ -283,7 +287,7 @@ private DehydrationContextHolder? DehydrationContext get => _extras?.DehydrationContext; set { - lock (this) + lock (_lock) { _extras ??= new(); _extras.DehydrationContext = value; @@ -360,7 +364,7 @@ public void SetComponent(Type componentType, object? instance) throw new ArgumentException("Cannot override a component which is implemented by this grain context"); } - lock (this) + lock (_lock) { if (instance == null) { @@ -377,7 +381,7 @@ internal void SetGrainInstance(object grainInstance) { ArgumentNullException.ThrowIfNull(grainInstance); - lock (this) + lock (_lock) { if (GrainInstance is not null) { @@ -444,7 +448,7 @@ public void SetState(ActivationState state) internal int GetRequestCount() { - lock (this) + lock (_lock) { return _runningRequests.Count + WaitingCount; } @@ -452,7 +456,7 @@ internal int GetRequestCount() internal List DequeueAllWaitingRequests() { - lock (this) + lock (_lock) { var result = new List(_waitingRequests.Count); foreach (var (message, _) in _waitingRequests) @@ -510,7 +514,7 @@ public void DelayDeactivation(TimeSpan timespan) private void ScheduleOperation(object operation) { - lock (this) + lock (_lock) { _pendingOperations ??= new(); _pendingOperations.Enqueue(operation); @@ -521,7 +525,7 @@ private void ScheduleOperation(object operation) private void CancelPendingOperations() { - lock (this) + lock (_lock) { // If the grain is currently activating, cancel that operation. if (_pendingOperations is not { Count: > 0 } operations) @@ -569,7 +573,7 @@ private void CancelPendingOperations() public void Migrate(Dictionary? requestContext, CancellationToken cancellationToken = default) { - lock (this) + lock (_lock) { if (State is not (ActivationState.Activating or ActivationState.Valid or ActivationState.Deactivating)) { @@ -595,7 +599,7 @@ public void Deactivate(DeactivationReason reason, ActivityContext? activityConte ? ActivitySources.LifecycleGrainSource.StartActivity(ActivityNames.DeactivateGrain, ActivityKind.Internal, parentContext:parent) : ActivitySources.LifecycleGrainSource.StartActivity(ActivityNames.DeactivateGrain); - lock (this) + lock (_lock) { try { @@ -677,7 +681,7 @@ private void DeactivateStuckActivation() void IGrainTimerRegistry.OnTimerCreated(IGrainTimer timer) { - lock (this) + lock (_lock) { Timers ??= new HashSet(); Timers.Add(timer); @@ -686,7 +690,7 @@ void IGrainTimerRegistry.OnTimerCreated(IGrainTimer timer) void IGrainTimerRegistry.OnTimerDisposed(IGrainTimer timer) { - lock (this) // need to lock since dispose can be called on finalizer thread, outside grain context (not single threaded). + lock (_lock) // need to lock since dispose can be called on finalizer thread, outside grain context (not single threaded). { if (Timers is null) { @@ -699,7 +703,7 @@ void IGrainTimerRegistry.OnTimerDisposed(IGrainTimer timer) private void DisposeTimers() { - lock (this) + lock (_lock) { if (Timers is null) { @@ -724,7 +728,7 @@ public void AnalyzeWorkload(DateTime now, IMessageCenter messageCenter, MessageF var longQueueTimeDuration = options.RequestQueueDelayWarningTime; List? diagnostics = null; - lock (this) + lock (_lock) { if (State != ActivationState.Valid) { @@ -826,7 +830,7 @@ void GetStatusList([NotNull] ref List? diagnostics) internal string ToDetailedString(bool includeExtraDetails = false) { - lock (this) + lock (_lock) { var currentlyExecuting = includeExtraDetails ? _blockingRequest : null; return @$"[Activation: {Address.SiloAddress}/{GrainId}{ActivationId} {GetActivationInfoString()} State={State} NonReentrancyQueueSize={WaitingCount} NumRunning={_runningRequests.Count} IdlenessTimeSpan={GetIdleness()} CollectionAgeLimit={_shared.CollectionAgeLimit}{(currentlyExecuting != null ? " CurrentlyExecuting=" : null)}{currentlyExecuting}]"; @@ -859,7 +863,7 @@ public async ValueTask DisposeAsync() CancelPendingOperations(); - lock (this) + lock (_lock) { _shared.InternalRuntime.ActivationWorkingSet.OnDeactivated(this); SetState(ActivationState.Invalid); @@ -959,7 +963,7 @@ public TExtensionInterface GetExtension() bool IActivationWorkingSetMember.IsCandidateForRemoval(bool wouldRemove) { const int IdlenessLowerBound = 10_000; - lock (this) + lock (_lock) { var inactive = IsInactive && _idleDuration.ElapsedMilliseconds > IdlenessLowerBound; @@ -986,7 +990,7 @@ private async Task RunMessageLoop() if (!IsCurrentlyExecuting) { bool hasPendingOperations; - lock (this) + lock (_lock) { hasPendingOperations = _pendingOperations is { Count: > 0 }; } @@ -1014,7 +1018,7 @@ void ProcessPendingRequests() do { Message? message = null; - lock (this) + lock (_lock) { if (_waitingRequests.Count <= i) { @@ -1229,7 +1233,7 @@ async Task ProcessOperationsAsync() object? op = null; while (true) { - lock (this) + lock (_lock) { Debug.Assert(_pendingOperations is not null); @@ -1307,7 +1311,7 @@ private void RehydrateInternal(IRehydrationContext context) rehydrateSpan?.SetTag(ActivityTagKeys.ActivationId, ActivationId.ToString()); } - lock (this) + lock (_lock) { if (State != ActivationState.Creating) { @@ -1355,7 +1359,7 @@ private void OnDehydrate(IDehydrationContext context) { LogDehydratingActivation(_shared.Logger); - lock (this) + lock (_lock) { Debug.Assert(context is not null); @@ -1469,7 +1473,7 @@ static async ValueTask OnCompleteAsync(ActivationData activation, Message messag /// The message that has just completed processing. private void OnCompletedRequest(Message message) { - lock (this) + lock (_lock) { _runningRequests.Remove(message); @@ -1523,7 +1527,7 @@ public void ReceiveMessage(Message message) private void ReceiveResponse(Message message) { - lock (this) + lock (_lock) { if (State == ActivationState.Invalid) { @@ -1549,7 +1553,7 @@ private void ReceiveRequest(Message message) return; } - lock (this) + lock (_lock) { _waitingRequests.Add((message, CoarseStopwatch.StartNew())); } @@ -1562,7 +1566,7 @@ private void ReceiveRequest(Message message) /// private void RejectAllQueuedMessages() { - lock (this) + lock (_lock) { List msgs = DequeueAllWaitingRequests(); if (msgs == null || msgs.Count <= 0) return; @@ -1581,7 +1585,7 @@ private void RejectAllQueuedMessages() private void RerouteAllQueuedMessages() { - lock (this) + lock (_lock) { List msgs = DequeueAllWaitingRequests(); if (msgs is not { Count: > 0 }) @@ -1745,7 +1749,7 @@ private async Task ActivateAsync(Dictionary? requestContextData, } } - lock (this) + lock (_lock) { SetState(ActivationState.Activating); } @@ -1827,7 +1831,7 @@ private async Task ActivateAsync(Dictionary? requestContextData, } } - lock (this) + lock (_lock) { if (State is ActivationState.Activating) { @@ -2082,7 +2086,7 @@ async ValueTask StartMigrationAsync(DehydrationContextHolder context, IAct private TaskCompletionSource GetDeactivationCompletionSource() { - lock (this) + lock (_lock) { _extras ??= new(); return _extras.DeactivationTask ??= new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); @@ -2210,7 +2214,7 @@ bool TryCancelRequest() { Message? message = null; var wasWaiting = false; - lock (this) + lock (_lock) { // Check the running requests. foreach (var candidate in _runningRequests.Keys) @@ -2369,12 +2373,13 @@ private sealed class DeactivationInfo private abstract class Command(CancellationTokenSource cts) : IDisposable { private bool _disposed; + private readonly object _lock = new(); private readonly CancellationTokenSource _cts = cts; public CancellationToken CancellationToken => _cts.Token; public virtual void Cancel() { - lock (this) + lock (_lock) { if (_disposed) return; _cts.Cancel(); @@ -2385,7 +2390,7 @@ public virtual void Dispose() { try { - lock (this) + lock (_lock) { _disposed = true; _cts.Dispose(); diff --git a/src/Orleans.Runtime/Catalog/ActivationMigrationManager.cs b/src/Orleans.Runtime/Catalog/ActivationMigrationManager.cs index bdf350554f5..5a0072e33af 100644 --- a/src/Orleans.Runtime/Catalog/ActivationMigrationManager.cs +++ b/src/Orleans.Runtime/Catalog/ActivationMigrationManager.cs @@ -126,7 +126,7 @@ public async ValueTask AcceptMigratingGrains(List migrati var allActiveOrTerminal = true; foreach (var activation in activations) { - lock (activation) + lock (activation.SynchronizationLock) { if (activation.State is not (ActivationState.Valid or ActivationState.Invalid)) { diff --git a/src/Orleans.Runtime/Catalog/IncomingRequestMonitor.cs b/src/Orleans.Runtime/Catalog/IncomingRequestMonitor.cs index df296736ab1..16dc5dcb075 100644 --- a/src/Orleans.Runtime/Catalog/IncomingRequestMonitor.cs +++ b/src/Orleans.Runtime/Catalog/IncomingRequestMonitor.cs @@ -123,7 +123,7 @@ private async Task Run() foreach (var activationEntry in _recentlyUsedActivations) { var activation = activationEntry.Key; - lock (activation) + lock (activation.SynchronizationLock) { activation.AnalyzeWorkload(now, messageCenter, _messageFactory, options); }