diff --git a/Directory.Packages.props b/Directory.Packages.props index 19d25230020..e452fc9c579 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -14,6 +14,7 @@ + diff --git a/Orleans.slnx b/Orleans.slnx index 76880762779..f625f3df1d0 100644 --- a/Orleans.slnx +++ b/Orleans.slnx @@ -80,6 +80,7 @@ + diff --git a/src/AWS/Orleans.Journaling.S3/Orleans.Journaling.S3.csproj b/src/AWS/Orleans.Journaling.S3/Orleans.Journaling.S3.csproj new file mode 100644 index 00000000000..2667a82f7c4 --- /dev/null +++ b/src/AWS/Orleans.Journaling.S3/Orleans.Journaling.S3.csproj @@ -0,0 +1,30 @@ + + + + Microsoft.Orleans.Journaling.S3 + README.md + $(DefaultTargetFrameworks) + enable + enable + $(NoWarn);ORLEANSEXP005 + $(VersionSuffix).alpha.1 + alpha.1 + true + + + + + + + + + + + + + + + + + + diff --git a/src/AWS/Orleans.Journaling.S3/Properties/AssemblyInfo.cs b/src/AWS/Orleans.Journaling.S3/Properties/AssemblyInfo.cs new file mode 100644 index 00000000000..7220a0d344d --- /dev/null +++ b/src/AWS/Orleans.Journaling.S3/Properties/AssemblyInfo.cs @@ -0,0 +1,4 @@ +using System.Diagnostics.CodeAnalysis; + +[assembly: Experimental("ORLEANSEXP005")] + diff --git a/src/AWS/Orleans.Journaling.S3/README.md b/src/AWS/Orleans.Journaling.S3/README.md new file mode 100644 index 00000000000..ed8f51a534a --- /dev/null +++ b/src/AWS/Orleans.Journaling.S3/README.md @@ -0,0 +1,8 @@ +# Microsoft Orleans S3 Journaling + +`Microsoft.Orleans.Journaling.S3` provides an `IJournalStorage` implementation backed by Amazon S3 Express One Zone directory buckets. + +The provider uses S3 Express append writes (`WriteOffsetBytes`) for WAL appends. For local development and tests against S3-compatible emulators such as MinIO, set `UseS3ExpressAppend = false` to use a conditional read-modify-write append emulation. + +Buckets should be created ahead of time for AWS S3 Express One Zone. `CreateBucketIfNotExists` is intended for local emulators. + diff --git a/src/AWS/Orleans.Journaling.S3/ReadOnlySequenceStream.cs b/src/AWS/Orleans.Journaling.S3/ReadOnlySequenceStream.cs new file mode 100644 index 00000000000..b113feb7b79 --- /dev/null +++ b/src/AWS/Orleans.Journaling.S3/ReadOnlySequenceStream.cs @@ -0,0 +1,97 @@ +using System.Buffers; + +namespace Orleans.Journaling; + +internal sealed class ReadOnlySequenceStream(ReadOnlySequence sequence) : Stream +{ + private readonly ReadOnlySequence _sequence = sequence; + private long _position; + private bool _disposed; + + public override bool CanRead => !_disposed; + + public override bool CanSeek => !_disposed; + + public override bool CanWrite => false; + + public override long Length + { + get + { + ObjectDisposedException.ThrowIf(_disposed, this); + return _sequence.Length; + } + } + + public override long Position + { + get + { + ObjectDisposedException.ThrowIf(_disposed, this); + return _position; + } + + set + { + ObjectDisposedException.ThrowIf(_disposed, this); + if (value < 0 || value > _sequence.Length) + { + throw new ArgumentOutOfRangeException(nameof(value)); + } + + _position = value; + } + } + + public override void Flush() => ObjectDisposedException.ThrowIf(_disposed, this); + + public override int Read(byte[] buffer, int offset, int count) => Read(buffer.AsSpan(offset, count)); + + public override int Read(Span buffer) + { + ObjectDisposedException.ThrowIf(_disposed, this); + if (buffer.IsEmpty || _position >= _sequence.Length) + { + return 0; + } + + var length = (int)Math.Min(buffer.Length, _sequence.Length - _position); + _sequence.Slice(_position, length).CopyTo(buffer); + _position += length; + return length; + } + + public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + return new ValueTask(Read(buffer.Span)); + } + + public override long Seek(long offset, SeekOrigin origin) + { + ObjectDisposedException.ThrowIf(_disposed, this); + var newPosition = origin switch + { + SeekOrigin.Begin => offset, + SeekOrigin.Current => _position + offset, + SeekOrigin.End => _sequence.Length + offset, + _ => throw new ArgumentOutOfRangeException(nameof(origin)) + }; + + Position = newPosition; + return _position; + } + + public override void SetLength(long value) => throw new NotSupportedException("This stream is read-only."); + + public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException("This stream is read-only."); + + public override void Write(ReadOnlySpan buffer) => throw new NotSupportedException("This stream is read-only."); + + protected override void Dispose(bool disposing) + { + _disposed = true; + base.Dispose(disposing); + } +} + diff --git a/src/AWS/Orleans.Journaling.S3/S3JournalStorage.cs b/src/AWS/Orleans.Journaling.S3/S3JournalStorage.cs new file mode 100644 index 00000000000..36bc19a8989 --- /dev/null +++ b/src/AWS/Orleans.Journaling.S3/S3JournalStorage.cs @@ -0,0 +1,1267 @@ +using System.Buffers; +using System.Diagnostics; +using System.Globalization; +using System.Net; +using System.Security.Cryptography; +using Amazon.S3; +using Amazon.S3.Model; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Orleans.Storage; + +namespace Orleans.Journaling; + +internal sealed partial class S3JournalStorage : IJournalStorage +{ + internal const string FormatMetadataKey = "format"; + internal const string CheckpointMetadataKey = "checkpoint"; + internal const string CheckpointOffsetMetadataKey = "checkpoint-offset"; + internal const string WalGenerationMetadataKey = "wal-generation"; + internal const string MetadataVersionMetadataKey = "metadata-version"; + + private const string MetadataHeaderPrefix = "x-amz-meta-"; + private const int MaxAppendPartsPerObject = 10_000; + private const int HeadroomPartCount = 100; + private const int RequestCompactionPartCount = 9_800; + private const int CompactedWalMarkerBytes = 16; + + private readonly S3JournalStorageShared _shared; + private readonly IAmazonS3 _client; + private readonly JournalId _journalId; + private readonly string _walObjectKey; + private int _numParts; + private string? _walETag; + private WalProviderState _walProviderState; + + private bool WalExists => _walETag is not null; + + public bool IsCompactionRequested => _numParts > RequestCompactionPartCount; + + internal S3JournalStorage(S3JournalStorageShared shared, IAmazonS3 client, JournalId journalId) + { + ArgumentNullException.ThrowIfNull(shared); + ArgumentNullException.ThrowIfNull(client); + if (journalId.IsDefault) + { + throw new ArgumentException("The journal id must not be the default value.", nameof(journalId)); + } + + _shared = shared; + _client = client; + _journalId = journalId; + _walObjectKey = GetWalObjectKey(); + } + + public async ValueTask CreateIfNotExistsAsync( + IReadOnlyDictionary? metadata = null, + CancellationToken cancellationToken = default) + { + var startTimestamp = Stopwatch.GetTimestamp(); + var succeeded = false; + var callerMetadata = CopyAndValidateCallerMetadata(metadata); + try + { + var created = await CreateWalAsync( + checkpointName: null, + ifMatch: null, + ifNoneMatch: true, + cancellationToken, + callerMetadata).ConfigureAwait(false); + SetWal(created.Response.ETag, created.ProviderState, lastModified: null); + succeeded = true; + return true; + } + catch (AmazonS3Exception exception) when (IsObjectAlreadyExists(exception)) + { + succeeded = true; + return false; + } + finally + { + S3JournalStorageInstruments.OnOperationCompleted( + S3JournalStorageInstruments.OperationCreate, + Stopwatch.GetElapsedTime(startTimestamp), + bytes: 0, + succeeded); + } + } + + public async ValueTask GetMetadataAsync(CancellationToken cancellationToken = default) + { + var startTimestamp = Stopwatch.GetTimestamp(); + var succeeded = false; + try + { + var properties = await GetPropertiesCoreAsync(expectedETag: null, cancellationToken).ConfigureAwait(false); + succeeded = true; + return properties is null ? null : CreateJournalMetadata(properties.ETag, CopyMetadata(properties.Metadata)); + } + finally + { + S3JournalStorageInstruments.OnOperationCompleted( + S3JournalStorageInstruments.OperationGetMetadata, + Stopwatch.GetElapsedTime(startTimestamp), + bytes: 0, + succeeded); + } + } + + public async ValueTask UpdateMetadataAsync( + IReadOnlyDictionary? set = null, + IEnumerable? remove = null, + string? expectedETag = null, + CancellationToken cancellationToken = default) + { + var startTimestamp = Stopwatch.GetTimestamp(); + var succeeded = false; + var setValues = CopyAndValidateCallerMetadata(set); + var removeValues = CopyRemove(remove, setValues); + try + { + for (var attempt = 0; attempt < 3; attempt++) + { + var properties = await GetPropertiesCoreAsync(expectedETag: null, cancellationToken).ConfigureAwait(false); + if (properties is null) + { + succeeded = true; + return null; + } + + var walState = CreateWalState(properties); + var metadata = CopyMetadata(properties.Metadata); + if (expectedETag is not null + && !string.Equals(expectedETag, CreateMetadataETag(properties.ETag, metadata), StringComparison.Ordinal)) + { + succeeded = true; + return null; + } + + if (!ApplyCallerMetadataUpdate(metadata, setValues, removeValues)) + { + SetWal(walState.ETag, walState.ProviderState, walState.LastModified); + succeeded = true; + return CreateJournalMetadata(properties.ETag, metadata); + } + + metadata[MetadataVersionMetadataKey] = CreateMetadataVersion(); + var copyRequest = new CopyObjectRequest + { + SourceBucket = _shared.BucketName, + SourceKey = _walObjectKey, + DestinationBucket = _shared.BucketName, + DestinationKey = _walObjectKey, + MetadataDirective = S3MetadataDirective.REPLACE, + ETagToMatch = properties.ETag, + UnmodifiedSinceDate = properties.LastModified, + }; + ApplyObjectHeaders(copyRequest, metadata); + + try + { + var response = await _client.CopyObjectAsync(copyRequest, cancellationToken).ConfigureAwait(false); + SetWal(response.ETag, walState.ProviderState, lastModified: null); + succeeded = true; + return CreateJournalMetadata(response.ETag, metadata); + } + catch (AmazonS3Exception exception) when (exception.StatusCode is HttpStatusCode.PreconditionFailed) + { + if (expectedETag is not null) + { + succeeded = true; + return null; + } + } + } + + succeeded = true; + return null; + } + finally + { + S3JournalStorageInstruments.OnOperationCompleted( + S3JournalStorageInstruments.OperationUpdateMetadata, + Stopwatch.GetElapsedTime(startTimestamp), + bytes: 0, + succeeded); + } + } + + public async ValueTask AppendAsync(ReadOnlySequence value, CancellationToken cancellationToken) + { + var startTimestamp = Stopwatch.GetTimestamp(); + var succeeded = false; + + try + { + for (var attempt = 0; ; attempt++) + { + if (!WalExists) + { + await EnsureWalAsync(cancellationToken).ConfigureAwait(false); + } + + ThrowIfCompactionRequired(); + + var expectedETag = _walETag!; + var expectedProviderState = _walProviderState; + try + { + if (_shared.Options.UseS3ExpressAppend) + { + await AppendWithS3ExpressAsync(value, expectedETag, expectedProviderState, cancellationToken).ConfigureAwait(false); + } + else + { + await AppendWithConditionalRewriteAsync(value, expectedETag, cancellationToken).ConfigureAwait(false); + } + + LogAppend(_shared.Logger, value.Length, _shared.BucketName, _walObjectKey); + succeeded = true; + return; + } + catch (AmazonS3Exception exception) when (IsWalMutationConflict(exception)) + { + var refreshed = attempt < _shared.Options.MaxMetadataOnlyConflictRetries + ? await RetryAfterMetadataOnlyConflictAsync(attempt, expectedProviderState, cancellationToken).ConfigureAwait(false) + : null; + if (refreshed is not null) + { + continue; + } + + throw CreateInconsistentWalStateException( + "S3 journal WAL changed while appending; recovery is required.", + expectedETag, + exception); + } + } + } + finally + { + S3JournalStorageInstruments.OnOperationCompleted( + S3JournalStorageInstruments.OperationAppend, + Stopwatch.GetElapsedTime(startTimestamp), + value.Length, + succeeded); + } + } + + public async ValueTask DeleteAsync(CancellationToken cancellationToken) + { + var startTimestamp = Stopwatch.GetTimestamp(); + var succeeded = false; + try + { + WalState? walState; + var expectedETag = _walETag; + var expectedProviderState = _walProviderState; + try + { + walState = await TryLoadWalStateAsync(expectedETag, cancellationToken).ConfigureAwait(false); + } + catch (AmazonS3Exception exception) when (IsWalMutationConflict(exception)) + { + walState = expectedETag is not null + ? await TryRefreshWalStateAfterMetadataOnlyConflictAsync(expectedProviderState, cancellationToken).ConfigureAwait(false) + : null; + if (walState is null) + { + throw CreateInconsistentWalStateException( + "S3 journal WAL changed while deleting the journal; recovery is required.", + expectedETag, + exception); + } + } + + if (walState is null) + { + if (expectedETag is not null) + { + throw CreateInconsistentWalStateException( + "S3 journal WAL changed while deleting the journal; recovery is required.", + expectedETag); + } + + succeeded = true; + return; + } + + var checkpointName = walState.Value.Manifest.Checkpoint?.Name; + for (var attempt = 0; ; attempt++) + { + var deleteWalState = walState.Value; + try + { + var deleteRequest = new DeleteObjectRequest + { + BucketName = _shared.BucketName, + Key = _walObjectKey, + }; + if (_shared.Options.UseConditionalDelete) + { + deleteRequest.IfMatchSize = deleteWalState.ProviderState.ContentLength; + deleteRequest.IfMatchLastModifiedTime = deleteWalState.LastModified; + } + + await _client.DeleteObjectAsync(deleteRequest, cancellationToken).ConfigureAwait(false); + SetWal(eTag: null, providerState: default, lastModified: null); + break; + } + catch (AmazonS3Exception exception) when (IsWalMutationConflict(exception)) + { + var refreshed = attempt < _shared.Options.MaxMetadataOnlyConflictRetries + ? await RetryAfterMetadataOnlyConflictAsync(attempt, deleteWalState.ProviderState, cancellationToken).ConfigureAwait(false) + : null; + if (refreshed is { } refreshedState) + { + walState = refreshedState; + checkpointName = refreshedState.Manifest.Checkpoint?.Name; + continue; + } + + throw CreateInconsistentWalStateException( + "S3 journal WAL changed while deleting the journal; recovery is required.", + deleteWalState.ETag, + exception); + } + } + + if (checkpointName is not null) + { + await DeleteCheckpointIfExistsAsync(checkpointName, cancellationToken).ConfigureAwait(false); + } + + succeeded = true; + } + finally + { + S3JournalStorageInstruments.OnOperationCompleted( + S3JournalStorageInstruments.OperationDelete, + Stopwatch.GetElapsedTime(startTimestamp), + bytes: 0, + succeeded); + } + } + + public async ValueTask ReadAsync(IJournalStorageConsumer consumer, CancellationToken cancellationToken) + { + ArgumentNullException.ThrowIfNull(consumer); + var startTimestamp = Stopwatch.GetTimestamp(); + var succeeded = false; + var bytes = 0L; + + try + { + GetObjectResponse walResult; + try + { + walResult = await _client.GetObjectAsync( + new GetObjectRequest + { + BucketName = _shared.BucketName, + Key = _walObjectKey, + }, + cancellationToken).ConfigureAwait(false); + } + catch (AmazonS3Exception exception) when (exception.StatusCode is HttpStatusCode.NotFound) + { + SetWal(eTag: null, providerState: default, lastModified: null); + consumer.Complete(metadata: null); + succeeded = true; + return; + } + + using (walResult) + { + var walMetadata = CopyMetadata(walResult.Metadata); + var manifest = CreateWalManifest(walMetadata); + SetWal(walResult.ETag, CreateWalProviderState(manifest, walResult.ContentLength, walResult.PartsCount), walResult.LastModified); + + var expectedFormat = manifest.Metadata.Format; + if (manifest.Checkpoint is { } checkpoint) + { + using var checkpointResult = await _client.GetObjectAsync( + new GetObjectRequest + { + BucketName = _shared.BucketName, + Key = checkpoint.Name, + }, + cancellationToken).ConfigureAwait(false); + + var checkpointMetadata = ValidateCheckpointMetadata(checkpoint, CopyMetadata(checkpointResult.Metadata), expectedFormat); + var totalCheckpointBytes = await consumer.ReadAsync( + checkpointResult.ResponseStream, + checkpointMetadata, + complete: false, + cancellationToken).ConfigureAwait(false); + LogRead(_shared.Logger, totalCheckpointBytes, _shared.BucketName, checkpoint.Name); + bytes += totalCheckpointBytes; + expectedFormat = checkpointMetadata.Format; + } + + if (manifest.Checkpoint is { WalOffset: > 0 } checkpointOffset) + { + if (checkpointOffset.WalOffset > walResult.ContentLength) + { + throw new InvalidOperationException( + $"S3 journal checkpoint offset {checkpointOffset.WalOffset:N0} exceeds WAL length {walResult.ContentLength:N0}."); + } + + await SkipStreamAsync(walResult.ResponseStream, checkpointOffset.WalOffset, cancellationToken).ConfigureAwait(false); + } + + var metadata = manifest.Metadata.Format is { Length: > 0 } + ? manifest.Metadata + : expectedFormat is { Length: > 0 } + ? new JournalMetadata(expectedFormat) + : JournalMetadata.Empty; + var totalWalBytes = await consumer.ReadAsync( + walResult.ResponseStream, + metadata, + complete: false, + cancellationToken).ConfigureAwait(false); + LogRead(_shared.Logger, totalWalBytes, _shared.BucketName, _walObjectKey); + bytes += totalWalBytes; + + consumer.Complete(metadata); + } + + succeeded = true; + } + finally + { + S3JournalStorageInstruments.OnOperationCompleted( + S3JournalStorageInstruments.OperationRead, + Stopwatch.GetElapsedTime(startTimestamp), + bytes, + succeeded); + } + } + + public async ValueTask ReplaceAsync(ReadOnlySequence value, CancellationToken cancellationToken) + { + var startTimestamp = Stopwatch.GetTimestamp(); + var succeeded = false; + try + { + await EnsureWalAsync(cancellationToken).ConfigureAwait(false); + + var expectedWalETag = _walETag!; + var expectedProviderState = _walProviderState; + WalState? walState; + try + { + walState = await TryLoadWalStateAsync(expectedWalETag, cancellationToken).ConfigureAwait(false); + if (walState is null) + { + throw CreateInconsistentWalStateException( + "S3 journal WAL changed while publishing a checkpoint; recovery is required.", + expectedWalETag); + } + } + catch (AmazonS3Exception exception) when (IsWalMutationConflict(exception)) + { + walState = await TryRefreshWalStateAfterMetadataOnlyConflictAsync(expectedProviderState, cancellationToken).ConfigureAwait(false); + if (walState is null) + { + throw CreateInconsistentWalStateException( + "S3 journal WAL changed while publishing a checkpoint; recovery is required.", + expectedWalETag, + exception); + } + } + + var previousCheckpointName = _shared.Options.DeleteOldCheckpoints ? walState.Value.Manifest.Checkpoint?.Name : null; + + using var checkpointStream = new ReadOnlySequenceStream(value); + while (true) + { + var checkpointName = GetCheckpointName(Guid.NewGuid().ToString("N")); + try + { + checkpointStream.Position = 0; + var checkpointRequest = CreatePutObjectRequest( + checkpointName, + checkpointStream, + CreateCheckpointObjectMetadata()); + checkpointRequest.IfNoneMatch = "*"; + await _client.PutObjectAsync(checkpointRequest, cancellationToken).ConfigureAwait(false); + } + catch (AmazonS3Exception exception) when (IsObjectAlreadyExists(exception)) + { + continue; + } + + for (var attempt = 0; ; attempt++) + { + var publishWalState = walState.Value; + try + { + var created = await CreateWalAsync( + checkpointName, + ifMatch: publishWalState.ETag, + ifNoneMatch: false, + cancellationToken, + publishWalState.Manifest.Metadata.Properties).ConfigureAwait(false); + SetWal(created.Response.ETag, created.ProviderState, lastModified: null); + break; + } + catch (AmazonS3Exception exception) when (IsWalMutationConflict(exception)) + { + var refreshed = attempt < _shared.Options.MaxMetadataOnlyConflictRetries + ? await RetryAfterMetadataOnlyConflictAsync(attempt, publishWalState.ProviderState, cancellationToken).ConfigureAwait(false) + : null; + if (refreshed is { } refreshedState) + { + walState = refreshedState; + continue; + } + + throw CreateInconsistentWalStateException( + "S3 journal WAL changed while publishing a checkpoint; recovery is required.", + publishWalState.ETag, + exception); + } + } + + if (previousCheckpointName is not null && !string.Equals(previousCheckpointName, checkpointName, StringComparison.Ordinal)) + { + await DeleteCheckpointIfExistsAsync(previousCheckpointName, cancellationToken).ConfigureAwait(false); + } + + LogReplace(_shared.Logger, _shared.BucketName, checkpointName, checkpointStream.Length); + succeeded = true; + return; + } + } + finally + { + S3JournalStorageInstruments.OnOperationCompleted( + S3JournalStorageInstruments.OperationReplace, + Stopwatch.GetElapsedTime(startTimestamp), + value.Length, + succeeded); + } + } + + private async ValueTask AppendWithS3ExpressAsync( + ReadOnlySequence value, + string expectedETag, + WalProviderState expectedProviderState, + CancellationToken cancellationToken) + { + using var stream = new ReadOnlySequenceStream(value); + var response = await _client.PutObjectAsync( + new PutObjectRequest + { + BucketName = _shared.BucketName, + Key = _walObjectKey, + InputStream = stream, + AutoCloseStream = false, + IfMatch = expectedETag, + WriteOffsetBytes = expectedProviderState.ContentLength, + }, + cancellationToken).ConfigureAwait(false); + + SetWal( + response.ETag, + expectedProviderState with + { + ContentLength = expectedProviderState.ContentLength + value.Length, + PartsCount = expectedProviderState.PartsCount + 1, + }, + lastModified: null); + } + + private async ValueTask AppendWithConditionalRewriteAsync( + ReadOnlySequence value, + string expectedETag, + CancellationToken cancellationToken) + { + using var walResult = await _client.GetObjectAsync( + new GetObjectRequest + { + BucketName = _shared.BucketName, + Key = _walObjectKey, + EtagToMatch = expectedETag, + }, + cancellationToken).ConfigureAwait(false); + + var walMetadata = CopyMetadata(walResult.Metadata); + var manifest = CreateWalManifest(walMetadata); + var metadata = CreateWalMetadata(manifest); + await using var payload = new MemoryStream(); + await walResult.ResponseStream.CopyToAsync(payload, cancellationToken).ConfigureAwait(false); + foreach (var segment in value) + { + await payload.WriteAsync(segment, cancellationToken).ConfigureAwait(false); + } + + payload.Position = 0; + var request = CreatePutObjectRequest(_walObjectKey, payload, metadata); + request.IfMatch = expectedETag; + var response = await _client.PutObjectAsync(request, cancellationToken).ConfigureAwait(false); + SetWal( + response.ETag, + CreateWalProviderState(manifest, payload.Length, partsCount: 1), + lastModified: null); + } + + private void ThrowIfCompactionRequired() + { + if (_numParts < MaxAppendPartsPerObject - HeadroomPartCount) + { + return; + } + + throw new InvalidOperationException( + $"S3 journal WAL has {_numParts:N0} append parts and must be compacted before more appends. " + + $"S3 Express One Zone supports at most {MaxAppendPartsPerObject:N0} parts before the object must be copied."); + } + + private async ValueTask EnsureWalAsync(CancellationToken cancellationToken) + { + while (!WalExists) + { + try + { + var created = await CreateWalAsync( + checkpointName: null, + ifMatch: null, + ifNoneMatch: true, + cancellationToken).ConfigureAwait(false); + SetWal(created.Response.ETag, created.ProviderState, lastModified: null); + return; + } + catch (AmazonS3Exception exception) when (IsObjectAlreadyExists(exception)) + { + await TryLoadWalStateAsync(expectedETag: null, cancellationToken).ConfigureAwait(false); + } + } + } + + private async ValueTask TryLoadWalStateAsync( + string? expectedETag, + CancellationToken cancellationToken, + bool updateCache = true) + { + var walProperties = await GetPropertiesCoreAsync(expectedETag, cancellationToken).ConfigureAwait(false); + if (walProperties is null) + { + if (updateCache) + { + SetWal(eTag: null, providerState: default, lastModified: null); + } + + return null; + } + + var walState = CreateWalState(walProperties); + if (updateCache) + { + SetWal(walState.ETag, walState.ProviderState, walState.LastModified); + } + + return walState; + } + + private async ValueTask RetryAfterMetadataOnlyConflictAsync( + int attempt, + WalProviderState expectedProviderState, + CancellationToken cancellationToken) + { + var initial = _shared.Options.MetadataOnlyConflictInitialBackoff; + if (initial > TimeSpan.Zero) + { + var max = _shared.Options.MetadataOnlyConflictMaxBackoff; + if (max < initial) + { + max = initial; + } + + var multiplier = 1L << Math.Min(attempt, 16); + var scaledTicks = initial.Ticks * multiplier; + var cappedTicks = Math.Min(scaledTicks, max.Ticks); + await Task.Delay(TimeSpan.FromTicks(cappedTicks), cancellationToken).ConfigureAwait(false); + } + + return await TryRefreshWalStateAfterMetadataOnlyConflictAsync(expectedProviderState, cancellationToken).ConfigureAwait(false); + } + + private async ValueTask TryRefreshWalStateAfterMetadataOnlyConflictAsync( + WalProviderState expectedProviderState, + CancellationToken cancellationToken) + { + if (expectedProviderState.Generation is null) + { + return null; + } + + var walState = await TryLoadWalStateAsync(expectedETag: null, cancellationToken, updateCache: false).ConfigureAwait(false); + if (walState is null || !IsSameLogicalWal(walState.Value.ProviderState, expectedProviderState)) + { + return null; + } + + SetWal(walState.Value.ETag, walState.Value.ProviderState, walState.Value.LastModified); + return walState; + } + + private static bool IsSameLogicalWal(WalProviderState left, WalProviderState right) + => string.Equals(left.Format, right.Format, StringComparison.Ordinal) + && string.Equals(left.CheckpointName, right.CheckpointName, StringComparison.Ordinal) + && left.CheckpointOffset == right.CheckpointOffset + && string.Equals(left.Generation, right.Generation, StringComparison.Ordinal) + && left.ContentLength == right.ContentLength; + + private void SetWal(string? eTag, WalProviderState providerState, DateTime? lastModified) + { + _walETag = eTag; + _walProviderState = providerState; + _numParts = providerState.PartsCount; + } + + private string GetWalObjectKey() + { + var objectKey = _shared.Options.GetObjectKeyForJournal(_journalId); + return S3JournalStorageOptions.GetWalObjectKeyForJournal(_journalId, objectKey); + } + + private async ValueTask CreateWalAsync( + string? checkpointName, + string? ifMatch, + bool ifNoneMatch, + CancellationToken cancellationToken, + IReadOnlyDictionary? callerMetadata = null) + { + var marker = checkpointName is null ? [] : RandomNumberGenerator.GetBytes(CompactedWalMarkerBytes); + var metadata = CreateWalMetadata(checkpointName, marker.Length, callerMetadata); + using var stream = new MemoryStream(marker, writable: false); + var request = CreatePutObjectRequest(_walObjectKey, stream, metadata); + request.IfMatch = ifMatch; + request.IfNoneMatch = ifNoneMatch ? "*" : null; + var response = await _client.PutObjectAsync(request, cancellationToken).ConfigureAwait(false); + var manifest = CreateWalManifest(metadata); + return new CreatedWal(response, manifest, CreateWalProviderState(manifest, marker.Length, partsCount: marker.Length == 0 ? 0 : 1)); + } + + private PutObjectRequest CreatePutObjectRequest(string key, Stream input, IDictionary metadata) + { + var request = new PutObjectRequest + { + BucketName = _shared.BucketName, + Key = key, + InputStream = input, + AutoCloseStream = false, + UseChunkEncoding = false, + }; + ApplyObjectHeaders(request, metadata); + return request; + } + + private void ApplyObjectHeaders(PutObjectRequest request, IDictionary metadata) + { + if (_shared.MimeType is { Length: > 0 }) + { + request.ContentType = _shared.MimeType; + } + + if (_shared.Options.StorageClass is { } storageClass) + { + request.StorageClass = storageClass; + } + + AddMetadata(request.Metadata, metadata); + } + + private void ApplyObjectHeaders(CopyObjectRequest request, IDictionary metadata) + { + if (_shared.MimeType is { Length: > 0 }) + { + request.ContentType = _shared.MimeType; + } + + if (_shared.Options.StorageClass is { } storageClass) + { + request.StorageClass = storageClass; + } + + AddMetadata(request.Metadata, metadata); + } + + private string GetCheckpointName(string snapshotId) + { + var journalObjectKey = _shared.Options.GetObjectKeyForJournal(_journalId); + return S3JournalStorageOptions.GetCheckpointObjectKeyForJournal(_journalId, journalObjectKey, snapshotId); + } + + private async ValueTask DeleteCheckpointIfExistsAsync(string checkpointName, CancellationToken cancellationToken) + { + try + { + await _client.DeleteObjectAsync( + new DeleteObjectRequest + { + BucketName = _shared.BucketName, + Key = checkpointName, + }, + cancellationToken).ConfigureAwait(false); + } + catch (AmazonS3Exception exception) + { + LogCheckpointCleanupFailure(_shared.Logger, _shared.BucketName, checkpointName, exception); + } + } + + private Dictionary CreateObjectMetadata(string? format) + { + var metadata = new Dictionary(StringComparer.OrdinalIgnoreCase); + var formatKey = format ?? _shared.JournalFormatKey; + if (formatKey is { Length: > 0 }) + { + metadata[FormatMetadataKey] = formatKey; + } + + return metadata; + } + + private Dictionary CreateCheckpointObjectMetadata() => CreateObjectMetadata(_shared.JournalFormatKey); + + private Dictionary CreateWalMetadata( + string? checkpointName, + long checkpointOffset, + IReadOnlyDictionary? callerMetadata = null) + { + var metadata = CreateObjectMetadata(_shared.JournalFormatKey); + metadata[MetadataVersionMetadataKey] = CreateMetadataVersion(); + metadata[WalGenerationMetadataKey] = Guid.NewGuid().ToString("N"); + if (callerMetadata is not null) + { + foreach (var (key, value) in callerMetadata) + { + ValidateCallerMetadataProperty(key, value); + metadata[NormalizeMetadataKey(key)] = value; + } + } + + if (checkpointName is not null) + { + metadata[CheckpointMetadataKey] = checkpointName; + metadata[CheckpointOffsetMetadataKey] = checkpointOffset.ToString(CultureInfo.InvariantCulture); + } + + return metadata; + } + + private Dictionary CreateWalMetadata(WalManifest manifest) + { + var metadata = CreateObjectMetadata(manifest.Metadata.Format); + metadata[MetadataVersionMetadataKey] = manifest.MetadataVersion ?? CreateMetadataVersion(); + if (manifest.Generation is { Length: > 0 }) + { + metadata[WalGenerationMetadataKey] = manifest.Generation; + } + + foreach (var (key, value) in manifest.Metadata.Properties) + { + metadata[NormalizeMetadataKey(key)] = value; + } + + if (manifest.Checkpoint is { } checkpoint) + { + metadata[CheckpointMetadataKey] = checkpoint.Name; + metadata[CheckpointOffsetMetadataKey] = checkpoint.WalOffset.ToString(CultureInfo.InvariantCulture); + } + + return metadata; + } + + private static string? GetFormatKeyMetadata(IDictionary? metadata) + => metadata is not null + && metadata.TryGetValue(FormatMetadataKey, out var storedKey) + && storedKey is { Length: > 0 } + ? storedKey + : null; + + private static WalManifest CreateWalManifest(IDictionary? metadata) + { + var fileMetadata = CreateJournalMetadata(objectETag: null, metadata); + var generation = metadata is not null + && metadata.TryGetValue(WalGenerationMetadataKey, out var storedGeneration) + && storedGeneration is { Length: > 0 } + ? storedGeneration + : null; + var metadataVersion = GetMetadataVersion(metadata); + if (metadata is null || !metadata.TryGetValue(CheckpointMetadataKey, out var checkpointName) || checkpointName is not { Length: > 0 }) + { + return new WalManifest(fileMetadata, Checkpoint: null, generation, metadataVersion); + } + + var checkpointOffset = 0L; + if (metadata.TryGetValue(CheckpointOffsetMetadataKey, out var checkpointOffsetValue) + && checkpointOffsetValue is { Length: > 0 } + && (!long.TryParse(checkpointOffsetValue, NumberStyles.None, CultureInfo.InvariantCulture, out checkpointOffset) || checkpointOffset < 0)) + { + throw new InvalidOperationException( + $"S3 journal checkpoint offset metadata is invalid: '{checkpointOffsetValue}'."); + } + + return new WalManifest(fileMetadata, new CheckpointReference(checkpointName, checkpointOffset), generation, metadataVersion); + } + + private static WalState CreateWalState(GetObjectMetadataResponse properties) + { + var metadata = CopyMetadata(properties.Metadata); + var manifest = CreateWalManifest(metadata); + return new WalState( + properties.ETag, + properties.LastModified, + manifest, + CreateWalProviderState(manifest, properties.ContentLength, properties.PartsCount)); + } + + private static WalProviderState CreateWalProviderState(WalManifest manifest, long contentLength, int? partsCount) + => new( + manifest.Metadata.Format, + manifest.Checkpoint?.Name, + manifest.Checkpoint?.WalOffset ?? 0, + manifest.Generation, + contentLength, + Math.Max(0, partsCount ?? (contentLength == 0 ? 0 : 1))); + + private static IJournalMetadata ValidateCheckpointMetadata( + CheckpointReference checkpoint, + IDictionary checkpointMetadata, + string? expectedFormat) + { + var checkpointObjectFormat = GetFormatKeyMetadata(checkpointMetadata); + if (expectedFormat is { Length: > 0 }) + { + if (checkpointObjectFormat is null) + { + throw new InvalidOperationException( + $"S3 journal checkpoint '{checkpoint.Name}' does not include format metadata."); + } + + if (!string.Equals(expectedFormat, checkpointObjectFormat, StringComparison.Ordinal)) + { + throw new InvalidOperationException( + $"S3 journal checkpoint '{checkpoint.Name}' format metadata is '{checkpointObjectFormat}', but recovery expected '{expectedFormat}'."); + } + } + + return CreateJournalMetadata(objectETag: null, checkpointMetadata); + } + + private async ValueTask GetPropertiesCoreAsync( + string? expectedETag, + CancellationToken cancellationToken) + { + try + { + return await _client.GetObjectMetadataAsync( + new GetObjectMetadataRequest + { + BucketName = _shared.BucketName, + Key = _walObjectKey, + EtagToMatch = expectedETag, + }, + cancellationToken).ConfigureAwait(false); + } + catch (AmazonS3Exception exception) when (exception.StatusCode is HttpStatusCode.NotFound) + { + return null; + } + } + + private static IJournalMetadata CreateJournalMetadata(string? objectETag, IDictionary? metadata) + => new JournalMetadata(GetFormatKeyMetadata(metadata), CreateMetadataETag(objectETag, metadata), CopyCallerMetadata(metadata)); + + private static string? CreateMetadataETag(string? objectETag, IDictionary? metadata) + { + if (objectETag is null) + { + return null; + } + + return GetMetadataVersion(metadata) is { Length: > 0 } metadataVersion + ? $"{objectETag}:{metadataVersion}" + : objectETag; + } + + private static string? GetMetadataVersion(IDictionary? metadata) + => metadata is not null + && metadata.TryGetValue(MetadataVersionMetadataKey, out var metadataVersion) + && metadataVersion is { Length: > 0 } + ? metadataVersion + : null; + + private static string CreateMetadataVersion() => Guid.NewGuid().ToString("N"); + + private static Dictionary CopyCallerMetadata(IDictionary? metadata) + { + var result = new Dictionary(StringComparer.Ordinal); + if (metadata is null) + { + return result; + } + + foreach (var (rawKey, value) in metadata) + { + var key = NormalizeMetadataKey(rawKey); + if (IsProviderMetadataKey(key)) + { + continue; + } + + result[key] = value; + } + + return result; + } + + private static Dictionary CopyAndValidateCallerMetadata(IReadOnlyDictionary? metadata) + { + var result = new Dictionary(StringComparer.Ordinal); + if (metadata is null) + { + return result; + } + + foreach (var (rawKey, value) in metadata) + { + var key = NormalizeMetadataKey(rawKey); + ValidateCallerMetadataProperty(key, value); + result.Add(key, value); + } + + return result; + } + + private static Dictionary CopyMetadata(MetadataCollection? metadata) + { + var result = new Dictionary(StringComparer.OrdinalIgnoreCase); + if (metadata is null) + { + return result; + } + + foreach (var rawKey in metadata.Keys) + { + var key = NormalizeMetadataKey(rawKey); + result[key] = metadata[rawKey]; + } + + return result; + } + + private static Dictionary CopyMetadata(IDictionary? metadata) + => metadata is null + ? new Dictionary(StringComparer.OrdinalIgnoreCase) + : new Dictionary(metadata, StringComparer.OrdinalIgnoreCase); + + private static IReadOnlySet CopyRemove(IEnumerable? remove, IReadOnlyDictionary set) + { + if (remove is null) + { + return new HashSet(StringComparer.Ordinal); + } + + var result = new HashSet(StringComparer.Ordinal); + foreach (var rawPropertyName in remove) + { + var propertyName = NormalizeMetadataKey(rawPropertyName); + ValidateCallerMetadataPropertyName(propertyName); + if (set.ContainsKey(propertyName)) + { + throw new ArgumentException($"Journal metadata property '{propertyName}' cannot be both set and removed.", nameof(remove)); + } + + result.Add(propertyName); + } + + return result; + } + + private static bool ApplyCallerMetadataUpdate( + Dictionary metadata, + IReadOnlyDictionary set, + IReadOnlySet remove) + { + var changed = false; + foreach (var propertyName in remove) + { + ValidateCallerMetadataPropertyName(propertyName); + changed |= metadata.Remove(propertyName); + } + + foreach (var (propertyName, value) in set) + { + ValidateCallerMetadataProperty(propertyName, value); + if (!metadata.TryGetValue(propertyName, out var currentValue) + || !string.Equals(currentValue, value, StringComparison.Ordinal)) + { + metadata[propertyName] = value; + changed = true; + } + } + + return changed; + } + + private static void AddMetadata(MetadataCollection target, IDictionary metadata) + { + foreach (var (rawKey, value) in metadata) + { + target.Add(NormalizeMetadataKey(rawKey), value); + } + } + + private static void ValidateCallerMetadataProperty(string key, string value) + { + ValidateCallerMetadataPropertyName(key); + ArgumentNullException.ThrowIfNull(value); + } + + private static void ValidateCallerMetadataPropertyName(string key) + { + ArgumentException.ThrowIfNullOrWhiteSpace(key); + if (key.IndexOf('\0') >= 0) + { + throw new ArgumentException("Journal metadata property names must not contain null characters.", nameof(key)); + } + + if (IsProviderMetadataKey(key)) + { + throw new ArgumentException($"Journal metadata property '{key}' is provider-owned.", nameof(key)); + } + } + + private static bool IsProviderMetadataKey(string key) + { + key = NormalizeMetadataKey(key); + return string.Equals(key, FormatMetadataKey, StringComparison.OrdinalIgnoreCase) + || string.Equals(key, CheckpointMetadataKey, StringComparison.OrdinalIgnoreCase) + || string.Equals(key, CheckpointOffsetMetadataKey, StringComparison.OrdinalIgnoreCase) + || string.Equals(key, WalGenerationMetadataKey, StringComparison.OrdinalIgnoreCase) + || string.Equals(key, MetadataVersionMetadataKey, StringComparison.OrdinalIgnoreCase) + || key.StartsWith("$", StringComparison.Ordinal); + } + + private static string NormalizeMetadataKey(string key) + => key.StartsWith(MetadataHeaderPrefix, StringComparison.OrdinalIgnoreCase) + ? key[MetadataHeaderPrefix.Length..] + : key; + + private static bool IsObjectAlreadyExists(AmazonS3Exception exception) + => exception.StatusCode is HttpStatusCode.PreconditionFailed or HttpStatusCode.Conflict; + + private static bool IsWalMutationConflict(AmazonS3Exception exception) + => exception.StatusCode is HttpStatusCode.NotFound or HttpStatusCode.PreconditionFailed or HttpStatusCode.Conflict; + + private static InconsistentStateException CreateInconsistentWalStateException(string message, string? expectedETag, Exception? exception = null) + { + var currentETag = expectedETag ?? "Unknown"; + return exception is null + ? new InconsistentStateException(message, storedEtag: "Unknown", currentEtag: currentETag) + : new InconsistentStateException(message, storedEtag: "Unknown", currentEtag: currentETag, exception); + } + + private static async ValueTask SkipStreamAsync(Stream stream, long count, CancellationToken cancellationToken) + { + if (count <= 0) + { + return; + } + + if (stream.CanSeek) + { + stream.Seek(count, SeekOrigin.Current); + return; + } + + var buffer = new byte[Math.Min(81920, count)]; + var remaining = count; + while (remaining > 0) + { + var bytesRead = await stream.ReadAsync(buffer.AsMemory(0, (int)Math.Min(buffer.Length, remaining)), cancellationToken).ConfigureAwait(false); + if (bytesRead == 0) + { + throw new EndOfStreamException($"Unable to skip {count:N0} WAL bytes because the stream ended early."); + } + + remaining -= bytesRead; + } + } + + [LoggerMessage( + Level = LogLevel.Debug, + Message = "Appended {Length} bytes to S3 object \"{BucketName}/{ObjectKey}\"")] + private static partial void LogAppend(ILogger logger, long length, string bucketName, string objectKey); + + [LoggerMessage( + Level = LogLevel.Debug, + Message = "Read {Length} bytes from S3 object \"{BucketName}/{ObjectKey}\"")] + private static partial void LogRead(ILogger logger, long length, string bucketName, string objectKey); + + [LoggerMessage( + Level = LogLevel.Debug, + Message = "Wrote checkpoint S3 object \"{BucketName}/{ObjectKey}\" containing {Length} bytes")] + private static partial void LogReplace(ILogger logger, string bucketName, string objectKey, long length); + + [LoggerMessage( + Level = LogLevel.Warning, + Message = "Failed to delete obsolete S3 journal checkpoint \"{BucketName}/{ObjectKey}\"")] + private static partial void LogCheckpointCleanupFailure(ILogger logger, string bucketName, string objectKey, Exception exception); + + private sealed record WalManifest(IJournalMetadata Metadata, CheckpointReference? Checkpoint, string? Generation, string? MetadataVersion); + + private readonly record struct WalProviderState( + string? Format, + string? CheckpointName, + long CheckpointOffset, + string? Generation, + long ContentLength, + int PartsCount); + + private readonly record struct WalState(string ETag, DateTime? LastModified, WalManifest Manifest, WalProviderState ProviderState); + + private readonly record struct CreatedWal(PutObjectResponse Response, WalManifest Manifest, WalProviderState ProviderState); + + private readonly record struct CheckpointReference(string Name, long WalOffset); + + internal sealed class S3JournalStorageShared + { + public S3JournalStorageShared( + ILogger logger, + IOptions options, + string? mimeType = null, + string? journalFormatKey = null) + { + ArgumentNullException.ThrowIfNull(logger); + ArgumentNullException.ThrowIfNull(options); + + Logger = logger; + Options = options.Value; + ArgumentNullException.ThrowIfNull(Options); + if (string.IsNullOrWhiteSpace(Options.BucketName)) + { + throw new InvalidOperationException($"{nameof(S3JournalStorageOptions.BucketName)} must be configured."); + } + + BucketName = Options.BucketName; + MimeType = mimeType; + JournalFormatKey = journalFormatKey; + } + + public ILogger Logger { get; } + + public S3JournalStorageOptions Options { get; } + + public string BucketName { get; } + + public string? MimeType { get; } + + public string? JournalFormatKey { get; } + } +} diff --git a/src/AWS/Orleans.Journaling.S3/S3JournalStorageHostingExtensions.cs b/src/AWS/Orleans.Journaling.S3/S3JournalStorageHostingExtensions.cs new file mode 100644 index 00000000000..cd322ca0be8 --- /dev/null +++ b/src/AWS/Orleans.Journaling.S3/S3JournalStorageHostingExtensions.cs @@ -0,0 +1,32 @@ +using Microsoft.Extensions.DependencyInjection; +using Orleans.Configuration.Internal; +using Orleans.Runtime; + +namespace Orleans.Journaling; + +public static class S3JournalStorageHostingExtensions +{ + public static ISiloBuilder AddS3JournalStorage(this ISiloBuilder builder) => builder.AddS3JournalStorage(configure: null); + + public static ISiloBuilder AddS3JournalStorage(this ISiloBuilder builder, Action? configure) + { + builder.AddJournalStorage(); + + var services = builder.Services; + var options = services.AddOptions(); + if (configure is not null) + { + options.Configure(configure); + } + + if (!services.Any(service => service.ServiceType.Equals(typeof(S3JournalStorageProvider)))) + { + services.AddSingleton(); + services.AddFromExisting(); + services.AddFromExisting(); + services.AddFromExisting, S3JournalStorageProvider>(); + } + + return builder; + } +} diff --git a/src/AWS/Orleans.Journaling.S3/S3JournalStorageInstruments.cs b/src/AWS/Orleans.Journaling.S3/S3JournalStorageInstruments.cs new file mode 100644 index 00000000000..60ed5d7169f --- /dev/null +++ b/src/AWS/Orleans.Journaling.S3/S3JournalStorageInstruments.cs @@ -0,0 +1,52 @@ +using System.Diagnostics.Metrics; +using Orleans.Runtime; + +namespace Orleans.Journaling; + +internal static class S3JournalStorageInstruments +{ + private const string MillisecondsUnit = "ms"; + private const string BytesUnit = "bytes"; + private const string OperationTagName = "operation"; + private const string StatusTagName = "status"; + private const string StatusOk = "ok"; + private const string StatusError = "error"; + + internal const string OperationCreate = "create"; + internal const string OperationGetMetadata = "get_metadata"; + internal const string OperationUpdateMetadata = "update_metadata"; + internal const string OperationAppend = "append"; + internal const string OperationDelete = "delete"; + internal const string OperationRead = "read"; + internal const string OperationReplace = "replace"; + + private static readonly Counter Operations = Instruments.Meter.CreateCounter("orleans-journaling-s3-operations"); + private static readonly Counter OperationBytes = Instruments.Meter.CreateCounter("orleans-journaling-s3-operation-bytes", BytesUnit); + private static readonly Histogram OperationDuration = Instruments.Meter.CreateHistogram("orleans-journaling-s3-operation-duration", MillisecondsUnit); + + internal static void OnOperationCompleted(string operation, TimeSpan latency, long bytes, bool succeeded) + { + var tags = CreateTags(operation, succeeded); + if (Operations.Enabled) + { + Operations.Add(1, tags); + } + + if (OperationDuration.Enabled) + { + OperationDuration.Record(Math.Max(0, latency.TotalMilliseconds), tags); + } + + if (succeeded && bytes > 0 && OperationBytes.Enabled) + { + OperationBytes.Add(bytes, [new KeyValuePair(OperationTagName, operation)]); + } + } + + private static KeyValuePair[] CreateTags(string operation, bool succeeded) => + [ + new(OperationTagName, operation), + new(StatusTagName, succeeded ? StatusOk : StatusError) + ]; +} + diff --git a/src/AWS/Orleans.Journaling.S3/S3JournalStorageOptions.cs b/src/AWS/Orleans.Journaling.S3/S3JournalStorageOptions.cs new file mode 100644 index 00000000000..9b8917dd17a --- /dev/null +++ b/src/AWS/Orleans.Journaling.S3/S3JournalStorageOptions.cs @@ -0,0 +1,186 @@ +using Amazon; +using Amazon.Runtime; +using Amazon.S3; + +namespace Orleans.Journaling; + +/// +/// Options for configuring the Amazon S3 journal storage provider. +/// +public sealed class S3JournalStorageOptions +{ + private IAmazonS3? _s3Client; + + /// + /// Bucket name where journals are stored. + /// + public string? BucketName { get; set; } + + /// + /// Gets or sets the delegate used to generate the base object key for a journal. + /// + public Func GetObjectKey { get; set; } = DefaultGetObjectKey; + + /// + /// Gets or sets the delegate used to parse journal ids from catalog object keys. + /// + public Func TryParseJournalId { get; set; } = DefaultTryParseJournalId; + + /// + /// Options to use when creating the S3 client, or to use the AWS SDK defaults. + /// + public AmazonS3Config? ClientConfig { get; set; } + + /// + /// Gets or sets the client used to access S3. + /// + public IAmazonS3? S3Client + { + get => _s3Client; + set + { + ArgumentNullException.ThrowIfNull(value); + _s3Client = value; + CreateClient = _ => Task.FromResult(value); + } + } + + /// + /// Gets or sets a value indicating whether the configured bucket should be created if it does not exist. + /// + /// + /// This is intended for local emulators. AWS S3 Express One Zone directory buckets should generally be provisioned ahead of time. + /// + public bool CreateBucketIfNotExists { get; set; } + + /// + /// Gets or sets the S3 storage class applied to newly-created WAL and checkpoint objects. + /// + public S3StorageClass? StorageClass { get; set; } = S3StorageClass.ExpressOnezone; + + /// + /// Gets or sets a value indicating whether appends use S3 Express . + /// + /// + /// Set this to for S3-compatible emulators such as MinIO which do not support S3 Express append writes. + /// + public bool UseS3ExpressAppend { get; set; } = true; + + /// + /// Gets or sets a value indicating whether delete requests use S3 Express conditional delete headers. + /// + public bool UseConditionalDelete { get; set; } = true; + + /// + /// Gets or sets a value indicating whether obsolete checkpoint objects are deleted after a new checkpoint is published. Defaults to true. + /// + public bool DeleteOldCheckpoints { get; set; } = true; + + /// + /// Gets or sets the maximum number of times Append, Replace, or Delete will refresh its cached WAL ETag + /// and retry in place after observing a metadata-only conflict. Defaults to 5. + /// + public int MaxMetadataOnlyConflictRetries { get; set; } = DEFAULT_MAX_METADATA_ONLY_CONFLICT_RETRIES; + public const int DEFAULT_MAX_METADATA_ONLY_CONFLICT_RETRIES = 5; + + /// + /// Gets or sets the initial delay applied before retrying after a metadata-only conflict. Defaults to 10 ms. + /// + public TimeSpan MetadataOnlyConflictInitialBackoff { get; set; } = DEFAULT_METADATA_ONLY_CONFLICT_INITIAL_BACKOFF; + public static readonly TimeSpan DEFAULT_METADATA_ONLY_CONFLICT_INITIAL_BACKOFF = TimeSpan.FromMilliseconds(10); + + /// + /// Gets or sets the upper bound on metadata-only conflict retry backoff. Defaults to 200 ms. + /// + public TimeSpan MetadataOnlyConflictMaxBackoff { get; set; } = DEFAULT_METADATA_ONLY_CONFLICT_MAX_BACKOFF; + public static readonly TimeSpan DEFAULT_METADATA_ONLY_CONFLICT_MAX_BACKOFF = TimeSpan.FromMilliseconds(200); + + /// + /// The optional delegate used to create an instance. + /// + internal Func>? CreateClient { get; private set; } + + internal string GetObjectKeyForJournal(JournalId journalId) + { + if (journalId.IsDefault) + { + throw new ArgumentException("The journal id must not be the default value.", nameof(journalId)); + } + + var objectKey = GetObjectKey(journalId); + ArgumentException.ThrowIfNullOrWhiteSpace(objectKey); + return objectKey; + } + + internal static string GetWalObjectKeyForJournal(JournalId journalId, string journalObjectKey) + { + ArgumentException.ThrowIfNullOrWhiteSpace(journalObjectKey); + return GetDefaultWalObjectKey(journalObjectKey); + } + + internal static string GetCheckpointObjectKeyForJournal(JournalId journalId, string journalObjectKey, string snapshotId) + { + ArgumentException.ThrowIfNullOrWhiteSpace(journalObjectKey); + ArgumentException.ThrowIfNullOrWhiteSpace(snapshotId); + return GetDefaultCheckpointObjectKey(journalObjectKey, snapshotId); + } + + internal static string GetDefaultWalObjectKey(string journalObjectKey) => $"{journalObjectKey}/wal"; + + internal static string GetDefaultCheckpointObjectKey(string journalObjectKey, string snapshotId) => $"{journalObjectKey}/chk.{snapshotId}"; + + internal Func> GetCreateClient() + => CreateClient ?? (_ => Task.FromResult(new AmazonS3Client(ClientConfig ?? new AmazonS3Config()))); + + /// + /// Configures the S3 client using the provided callback. + /// + public void ConfigureS3Client(Func> createClientCallback) + => CreateClient = createClientCallback ?? throw new ArgumentNullException(nameof(createClientCallback)); + + /// + /// Configures the S3 client using the AWS SDK default credential chain and the provided client configuration. + /// + public void ConfigureS3Client(AmazonS3Config config) + { + ArgumentNullException.ThrowIfNull(config); + ClientConfig = config; + CreateClient = _ => Task.FromResult(new AmazonS3Client(config)); + } + + /// + /// Configures the S3 client using the AWS SDK default credential chain and the provided region. + /// + public void ConfigureS3Client(RegionEndpoint regionEndpoint) + { + ArgumentNullException.ThrowIfNull(regionEndpoint); + ConfigureS3Client(new AmazonS3Config { RegionEndpoint = regionEndpoint }); + } + + /// + /// Configures the S3 client using explicit credentials and client configuration. + /// + public void ConfigureS3Client(string accessKey, string secretKey, AmazonS3Config config) + { + ArgumentException.ThrowIfNullOrWhiteSpace(accessKey); + ArgumentException.ThrowIfNullOrWhiteSpace(secretKey); + ArgumentNullException.ThrowIfNull(config); + ClientConfig = config; + var credentials = new BasicAWSCredentials(accessKey, secretKey); + CreateClient = _ => Task.FromResult(new AmazonS3Client(credentials, config)); + } + + private static string DefaultGetObjectKey(JournalId journalId) => journalId.Value; + + private static JournalId? DefaultTryParseJournalId(string value) + { + try + { + return new JournalId(value); + } + catch (ArgumentException) + { + return null; + } + } +} diff --git a/src/AWS/Orleans.Journaling.S3/S3JournalStorageProvider.cs b/src/AWS/Orleans.Journaling.S3/S3JournalStorageProvider.cs new file mode 100644 index 00000000000..569106fd874 --- /dev/null +++ b/src/AWS/Orleans.Journaling.S3/S3JournalStorageProvider.cs @@ -0,0 +1,180 @@ +using System.Net; +using System.Runtime.CompilerServices; +using Amazon.S3; +using Amazon.S3.Model; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Orleans.Runtime; + +namespace Orleans.Journaling; + +internal sealed class S3JournalStorageProvider : ILifecycleParticipant, IJournalStorageProvider, IJournalStorageCatalog +{ + private readonly S3JournalStorageOptions _options; + private readonly S3JournalStorage.S3JournalStorageShared _shared; + private IAmazonS3? _client; + + public S3JournalStorageProvider( + IOptions options, + IOptions managerOptions, + IServiceProvider serviceProvider, + ILogger logger) + { + _options = options.Value; + ValidateOptions(_options); + var journalFormatKey = ValidateJournalFormatKey(managerOptions.Value.JournalFormatKey); + var journalFormat = GetJournalFormat(serviceProvider, journalFormatKey); + _shared = new S3JournalStorage.S3JournalStorageShared(logger, options, mimeType: journalFormat.MimeType, journalFormatKey); + } + + public IJournalStorage CreateStorage(JournalId journalId) + { + if (journalId.IsDefault) + { + throw new ArgumentException("The journal id must not be the default value.", nameof(journalId)); + } + + return new S3JournalStorage(_shared, GetClient(), journalId); + } + + public async IAsyncEnumerable ListAsync( + JournalId prefix = default, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + var client = GetClient(); + var bucketName = GetBucketName(); + var objectPrefix = prefix.IsDefault ? null : prefix.Value; + var journalIds = new List(); + string? continuationToken = null; + + do + { + var response = await client.ListObjectsV2Async( + new ListObjectsV2Request + { + BucketName = bucketName, + Prefix = objectPrefix, + ContinuationToken = continuationToken, + }, + cancellationToken).ConfigureAwait(false); + + foreach (var item in response.S3Objects) + { + cancellationToken.ThrowIfCancellationRequested(); + if (!item.Key.EndsWith("/wal", StringComparison.Ordinal)) + { + continue; + } + + var storageIdValue = item.Key[..^"/wal".Length]; + var journalId = _options.TryParseJournalId(storageIdValue); + if (journalId is { } id && prefix.IsPrefixOf(id)) + { + journalIds.Add(id); + } + } + + continuationToken = response.IsTruncated.GetValueOrDefault() ? response.NextContinuationToken : null; + } + while (continuationToken is not null); + + foreach (var journalId in journalIds.OrderBy(static journalId => journalId.Value, StringComparer.Ordinal)) + { + cancellationToken.ThrowIfCancellationRequested(); + yield return journalId; + } + } + + public void Participate(ISiloLifecycle observer) + { + observer.Subscribe( + nameof(S3JournalStorageProvider), + ServiceLifecycleStage.RuntimeInitialize, + onStart: InitializeAsync); + } + + internal async Task InitializeAsync(CancellationToken cancellationToken) + { + _client = await _options.GetCreateClient()(cancellationToken).ConfigureAwait(false); + await EnsureBucketAsync(_client, GetBucketName(), _options.CreateBucketIfNotExists, cancellationToken).ConfigureAwait(false); + } + + private IAmazonS3 GetClient() + => _client ?? throw new InvalidOperationException( + $"{nameof(S3JournalStorageProvider)} has not been initialized. Ensure the silo lifecycle has started before using journal storage."); + + private string GetBucketName() + { + var bucketName = _options.BucketName; + if (string.IsNullOrWhiteSpace(bucketName)) + { + throw new InvalidOperationException($"{nameof(S3JournalStorageOptions.BucketName)} must be configured."); + } + + return bucketName; + } + + private static async Task EnsureBucketAsync(IAmazonS3 client, string bucketName, bool createIfMissing, CancellationToken cancellationToken) + { + try + { + await client.HeadBucketAsync(new HeadBucketRequest { BucketName = bucketName }, cancellationToken).ConfigureAwait(false); + } + catch (AmazonS3Exception exception) when (exception.StatusCode is HttpStatusCode.NotFound && createIfMissing) + { + await client.PutBucketAsync(new PutBucketRequest { BucketName = bucketName }, cancellationToken).ConfigureAwait(false); + } + } + + private static void ValidateOptions(S3JournalStorageOptions options) + { + ArgumentNullException.ThrowIfNull(options); + ArgumentNullException.ThrowIfNull(options.GetObjectKey); + ArgumentNullException.ThrowIfNull(options.TryParseJournalId); + if (options.MaxMetadataOnlyConflictRetries < 0) + { + throw new ArgumentOutOfRangeException(nameof(options), $"{nameof(S3JournalStorageOptions.MaxMetadataOnlyConflictRetries)} must be non-negative."); + } + + if (options.MetadataOnlyConflictInitialBackoff < TimeSpan.Zero) + { + throw new ArgumentOutOfRangeException(nameof(options), $"{nameof(S3JournalStorageOptions.MetadataOnlyConflictInitialBackoff)} must be non-negative."); + } + + if (options.MetadataOnlyConflictMaxBackoff < TimeSpan.Zero) + { + throw new ArgumentOutOfRangeException(nameof(options), $"{nameof(S3JournalStorageOptions.MetadataOnlyConflictMaxBackoff)} must be non-negative."); + } + } + + private static IJournalFormat GetJournalFormat(IServiceProvider serviceProvider, string journalFormatKey) + { + var journalFormat = serviceProvider.GetKeyedService(journalFormatKey); + if (journalFormat is null) + { + throw new InvalidOperationException( + $"Journal format key '{journalFormatKey}' requires keyed service '{typeof(IJournalFormat).FullName}', but none was registered."); + } + + if (!string.Equals(journalFormat.FormatKey, journalFormatKey, StringComparison.Ordinal)) + { + throw new InvalidOperationException( + $"Journal format key '{journalFormatKey}' resolved format '{journalFormat.GetType().FullName}', but its {nameof(IJournalFormat.FormatKey)} is '{journalFormat.FormatKey}'. " + + "Register the journal format using the same key it reports."); + } + + return journalFormat; + } + + private static string ValidateJournalFormatKey(string? journalFormatKey) + { + if (string.IsNullOrWhiteSpace(journalFormatKey)) + { + throw new InvalidOperationException("The configured journal format key must be non-empty."); + } + + return journalFormatKey; + } +} + diff --git a/test/Orleans.Journaling.Tests/Orleans.Journaling.Tests.csproj b/test/Orleans.Journaling.Tests/Orleans.Journaling.Tests.csproj index 5532f47bfbe..217e17f79a4 100644 --- a/test/Orleans.Journaling.Tests/Orleans.Journaling.Tests.csproj +++ b/test/Orleans.Journaling.Tests/Orleans.Journaling.Tests.csproj @@ -16,7 +16,9 @@ + + @@ -25,6 +27,7 @@ + diff --git a/test/Orleans.Journaling.Tests/S3JournalStorageTests.cs b/test/Orleans.Journaling.Tests/S3JournalStorageTests.cs new file mode 100644 index 00000000000..8c1d0169798 --- /dev/null +++ b/test/Orleans.Journaling.Tests/S3JournalStorageTests.cs @@ -0,0 +1,297 @@ +using System.Buffers; +using System.Net; +using Amazon; +using Amazon.Runtime; +using Amazon.S3; +using Amazon.S3.Model; +using Docker.DotNet; +using DotNet.Testcontainers.Builders; +using DotNet.Testcontainers.Configurations; +using DotNet.Testcontainers.Containers; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using Orleans.Serialization; +using Orleans.Storage; +using Xunit; + +namespace Orleans.Journaling.Tests; + +[TestCategory("BVT")] +public sealed class S3JournalStorageTests : IAsyncLifetime +{ + private const int MinioPort = 9000; + private const string BucketName = "journaling-tests"; + private const string AccessKey = "minioadmin"; + private const string SecretKey = "minioadmin"; + private static readonly Lazy DockerSkipReason = new(GetDockerSkipReason); + private readonly IContainer? _container; + private AmazonS3Client? _client; + private string? _bucketName; + + public S3JournalStorageTests() + { + if (DockerSkipReason.Value is null) + { + _container = new ContainerBuilder("minio/minio:RELEASE.2025-09-07T16-13-09Z") + .WithEnvironment("MINIO_ROOT_USER", AccessKey) + .WithEnvironment("MINIO_ROOT_PASSWORD", SecretKey) + .WithCommand("server", "/data") + .WithPortBinding(MinioPort, true) + .WithWaitStrategy(Wait.ForUnixContainer().UntilHttpRequestIsSucceeded(request => request + .ForPort(MinioPort) + .ForPath("/minio/health/ready"))) + .Build(); + } + } + + public async Task InitializeAsync() + { + if (DockerSkipReason.Value is not null) + { + return; + } + + await _container!.StartAsync(); + _client = new AmazonS3Client( + new BasicAWSCredentials(AccessKey, SecretKey), + new AmazonS3Config + { + ServiceURL = $"http://127.0.0.1:{_container.GetMappedPublicPort(MinioPort)}", + ForcePathStyle = true, + AuthenticationRegion = RegionEndpoint.USEast1.SystemName, + }); + _bucketName = $"{BucketName}-{Guid.NewGuid():N}"; + await _client.PutBucketAsync(new PutBucketRequest { BucketName = _bucketName }); + } + + public async Task DisposeAsync() + { + _client?.Dispose(); + if (_container is not null) + { + await _container.DisposeAsync(); + } + } + + [SkippableFact] + public async Task AppendReadReplaceAndDelete_RoundTripsThroughMinio() + { + EnsureDockerAvailable(); + var storage = CreateStorage("journals/test", journalFormatKey: "json-lines"); + + Assert.True(await storage.CreateIfNotExistsAsync( + new Dictionary { ["catalog"] = "open" }, + CancellationToken.None)); + Assert.False(await storage.CreateIfNotExistsAsync(cancellationToken: CancellationToken.None)); + + var metadata = await storage.GetMetadataAsync(CancellationToken.None); + Assert.NotNull(metadata); + Assert.Equal("json-lines", metadata.Format); + Assert.Equal("open", metadata.Properties["catalog"]); + + await storage.AppendAsync(new ReadOnlySequence([1, 2]), CancellationToken.None); + await storage.AppendAsync(new ReadOnlySequence([3]), CancellationToken.None); + + var consumer = new CapturingJournalStorageConsumer(); + await storage.ReadAsync(consumer, CancellationToken.None); + Assert.Equal("json-lines", consumer.JournalFormatKey); + Assert.Equal([1, 2, 3], consumer.Bytes.ToArray()); + + await storage.ReplaceAsync(new ReadOnlySequence([4, 5]), CancellationToken.None); + await storage.AppendAsync(new ReadOnlySequence([6]), CancellationToken.None); + + var reloaded = CreateStorage("journals/test", journalFormatKey: "json-lines"); + var reloadedConsumer = new CapturingJournalStorageConsumer(); + await reloaded.ReadAsync(reloadedConsumer, CancellationToken.None); + Assert.Equal("json-lines", reloadedConsumer.JournalFormatKey); + Assert.Equal([4, 5, 6], reloadedConsumer.Bytes.ToArray()); + + await reloaded.DeleteAsync(CancellationToken.None); + var emptyConsumer = new CapturingJournalStorageConsumer(); + await CreateStorage("journals/test", journalFormatKey: "json-lines").ReadAsync(emptyConsumer, CancellationToken.None); + Assert.Empty(emptyConsumer.Bytes.ToArray()); + } + + [SkippableFact] + public async Task UpdateMetadataAsync_IsConditionalAndPreservesProviderMetadata() + { + EnsureDockerAvailable(); + var storage = CreateStorage("journals/metadata", journalFormatKey: "json-lines"); + await storage.AppendAsync(new ReadOnlySequence([1]), CancellationToken.None); + + var before = await storage.GetMetadataAsync(CancellationToken.None); + Assert.NotNull(before); + + var updated = await storage.UpdateMetadataAsync( + set: new Dictionary { ["catalog"] = "closed" }, + expectedETag: before.ETag, + cancellationToken: CancellationToken.None); + + Assert.NotNull(updated); + Assert.Equal("json-lines", updated.Format); + Assert.Equal("closed", updated.Properties["catalog"]); + Assert.NotEqual(before.ETag, updated.ETag); + + var stale = await storage.UpdateMetadataAsync( + set: new Dictionary { ["catalog"] = "stale" }, + expectedETag: before.ETag, + cancellationToken: CancellationToken.None); + Assert.Null(stale); + + var consumer = new CapturingJournalStorageConsumer(); + await CreateStorage("journals/metadata", journalFormatKey: "json-lines").ReadAsync(consumer, CancellationToken.None); + Assert.Equal([1], consumer.Bytes.ToArray()); + Assert.Equal("json-lines", consumer.JournalFormatKey); + } + + [SkippableFact] + public async Task ListAsync_ReturnsSortedJournalIdsMatchingPrefix() + { + EnsureDockerAvailable(); + var provider = CreateProvider(); + await CreateStorage("journals/zeta").AppendAsync(new ReadOnlySequence([1]), CancellationToken.None); + await CreateStorage("journals/alpha").AppendAsync(new ReadOnlySequence([1]), CancellationToken.None); + await CreateStorage("other/beta").AppendAsync(new ReadOnlySequence([1]), CancellationToken.None); + + var listed = new List(); + await foreach (var journalId in provider.ListAsync(new JournalId("journals"), CancellationToken.None)) + { + listed.Add(journalId); + } + + Assert.Equal(["journals/alpha", "journals/zeta"], listed.Select(static id => id.Value)); + } + + [SkippableFact] + public async Task AppendAsync_WhenWalChangedExternally_RequiresRecovery() + { + EnsureDockerAvailable(); + var storage = CreateStorage("journals/conflict"); + await storage.AppendAsync(new ReadOnlySequence([1]), CancellationToken.None); + + var other = CreateStorage("journals/conflict"); + await other.AppendAsync(new ReadOnlySequence([2]), CancellationToken.None); + + var exception = await Assert.ThrowsAsync( + () => storage.AppendAsync(new ReadOnlySequence([3]), CancellationToken.None).AsTask()); + Assert.Contains("recovery", exception.Message, StringComparison.OrdinalIgnoreCase); + } + + private S3JournalStorage CreateStorage(string journalId, string? journalFormatKey = null) + { + var options = CreateOptions(journalFormatKey); + return new S3JournalStorage( + new S3JournalStorage.S3JournalStorageShared( + NullLogger.Instance, + Options.Create(options), + mimeType: "application/octet-stream", + journalFormatKey), + _client!, + new JournalId(journalId)); + } + + private S3JournalStorageProvider CreateProvider() + { + var services = new ServiceCollection(); + services.AddSerializer(); + services.AddLogging(); + services.AddSingleton(); + services.AddKeyedSingleton( + OrleansBinaryJournalFormat.JournalFormatKey, + static (sp, _) => sp.GetRequiredService()); + using var serviceProvider = services.BuildServiceProvider(); + var provider = new S3JournalStorageProvider( + Options.Create(CreateOptions(journalFormatKey: null)), + Options.Create(new JournaledStateManagerOptions { JournalFormatKey = OrleansBinaryJournalFormat.JournalFormatKey }), + serviceProvider, + NullLogger.Instance); + provider.InitializeAsync(CancellationToken.None).GetAwaiter().GetResult(); + return provider; + } + + private S3JournalStorageOptions CreateOptions(string? journalFormatKey) + { + return new S3JournalStorageOptions + { + BucketName = _bucketName!, + S3Client = _client!, + UseS3ExpressAppend = false, + UseConditionalDelete = false, + StorageClass = null, + MetadataOnlyConflictInitialBackoff = TimeSpan.Zero, + GetObjectKey = id => id.Value, + }; + } + + private static void EnsureDockerAvailable() + { + Skip.If(DockerSkipReason.Value is not null, DockerSkipReason.Value); + } + + private static string? GetDockerSkipReason() + { + try + { + var endpointAuthConfig = TestcontainersSettings.OS?.DockerEndpointAuthConfig; + if (endpointAuthConfig is null) + { + return "Docker is unavailable, so MinIO S3 journal storage tests are skipped."; + } + + using var dockerClient = endpointAuthConfig + .GetDockerClientConfiguration(Guid.NewGuid()) + .CreateClient(); + var dockerInfo = dockerClient.System.GetSystemInfoAsync().GetAwaiter().GetResult(); + if (string.IsNullOrWhiteSpace(dockerInfo.OSType)) + { + return "Docker is unavailable, so MinIO S3 journal storage tests are skipped."; + } + + if (string.Equals(dockerInfo.OSType, "windows", StringComparison.OrdinalIgnoreCase)) + { + return "Docker is running in Windows container mode, so MinIO S3 journal storage tests are skipped."; + } + + return null; + } + catch (HttpRequestException) + { + return "Docker is unavailable, so MinIO S3 journal storage tests are skipped."; + } + catch (OperationCanceledException) + { + return "Docker is unavailable, so MinIO S3 journal storage tests are skipped."; + } + catch (DockerApiException) + { + return "Docker is unavailable, so MinIO S3 journal storage tests are skipped."; + } + catch (InvalidOperationException) + { + return "Docker is unavailable, so MinIO S3 journal storage tests are skipped."; + } + catch (NullReferenceException) + { + return "Docker is unavailable, so MinIO S3 journal storage tests are skipped."; + } + } + + private sealed class CapturingJournalStorageConsumer : IJournalStorageConsumer + { + public string? JournalFormatKey { get; private set; } + + public MemoryStream Bytes { get; } = new(); + + public void Read(JournalBufferReader buffer, IJournalMetadata? metadata) + { + JournalFormatKey = metadata?.Format; + while (buffer.Length > 0) + { + var chunk = new byte[buffer.Length]; + buffer.Read(chunk); + Bytes.Write(chunk); + } + } + } +}