Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
<PackageVersion Include="Autofac.Extensions.DependencyInjection" Version="10.0.0" />
<PackageVersion Include="AwesomeAssertions" Version="9.3.0" />
<PackageVersion Include="AWSSDK.DynamoDBv2" Version="4.0.14" />
<PackageVersion Include="AWSSDK.S3" Version="4.0.23.4" />
<PackageVersion Include="AWSSDK.SQS" Version="4.0.2.14" />
<PackageVersion Include="Azure.Core" Version="1.50.0" />
<PackageVersion Include="Azure.Data.Tables" Version="12.11.0" />
Expand Down
1 change: 1 addition & 0 deletions Orleans.slnx
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
</Folder>
<Folder Name="/src/Extensions/AWS/">
<Project Path="src/AWS/Orleans.Clustering.DynamoDB/Orleans.Clustering.DynamoDB.csproj" />
<Project Path="src/AWS/Orleans.Journaling.S3/Orleans.Journaling.S3.csproj" />
<Project Path="src/AWS/Orleans.Persistence.DynamoDB/Orleans.Persistence.DynamoDB.csproj" />
<Project Path="src/AWS/Orleans.Reminders.DynamoDB/Orleans.Reminders.DynamoDB.csproj" />
<Project Path="src/AWS/Orleans.Streaming.SQS/Orleans.Streaming.SQS.csproj" />
Expand Down
30 changes: 30 additions & 0 deletions src/AWS/Orleans.Journaling.S3/Orleans.Journaling.S3.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<PackageId>Microsoft.Orleans.Journaling.S3</PackageId>
<PackageReadmeFile>README.md</PackageReadmeFile>
<TargetFrameworks>$(DefaultTargetFrameworks)</TargetFrameworks>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<NoWarn>$(NoWarn);ORLEANSEXP005</NoWarn>
<VersionSuffix Condition="$(VersionSuffix) != ''">$(VersionSuffix).alpha.1</VersionSuffix>
<VersionSuffix Condition="$(VersionSuffix) == ''">alpha.1</VersionSuffix>
<OrleansBuildTimeCodeGen>true</OrleansBuildTimeCodeGen>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="AWSSDK.S3" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="$(SourceRoot)src\Orleans.Journaling\Orleans.Journaling.csproj" />
</ItemGroup>

<ItemGroup>
<InternalsVisibleTo Include="Orleans.Journaling.Tests" />
</ItemGroup>

<ItemGroup>
<None Include="README.md" Pack="true" PackagePath="\" />
</ItemGroup>
</Project>
4 changes: 4 additions & 0 deletions src/AWS/Orleans.Journaling.S3/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
using System.Diagnostics.CodeAnalysis;

[assembly: Experimental("ORLEANSEXP005")]

8 changes: 8 additions & 0 deletions src/AWS/Orleans.Journaling.S3/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Microsoft Orleans S3 Journaling

`Microsoft.Orleans.Journaling.S3` provides an `IJournalStorage` implementation backed by Amazon S3 Express One Zone directory buckets.

The provider uses S3 Express append writes (`WriteOffsetBytes`) for WAL appends. For local development and tests against S3-compatible emulators such as MinIO, set `UseS3ExpressAppend = false` to use a conditional read-modify-write append emulation.

Buckets should be created ahead of time for AWS S3 Express One Zone. `CreateBucketIfNotExists` is intended for local emulators.

97 changes: 97 additions & 0 deletions src/AWS/Orleans.Journaling.S3/ReadOnlySequenceStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
using System.Buffers;

namespace Orleans.Journaling;

internal sealed class ReadOnlySequenceStream(ReadOnlySequence<byte> sequence) : Stream
{
private readonly ReadOnlySequence<byte> _sequence = sequence;
private long _position;
private bool _disposed;

public override bool CanRead => !_disposed;

public override bool CanSeek => !_disposed;

public override bool CanWrite => false;

public override long Length
{
get
{
ObjectDisposedException.ThrowIf(_disposed, this);
return _sequence.Length;
}
}

public override long Position
{
get
{
ObjectDisposedException.ThrowIf(_disposed, this);
return _position;
}

set
{
ObjectDisposedException.ThrowIf(_disposed, this);
if (value < 0 || value > _sequence.Length)
{
throw new ArgumentOutOfRangeException(nameof(value));
}

_position = value;
}
}

public override void Flush() => ObjectDisposedException.ThrowIf(_disposed, this);

public override int Read(byte[] buffer, int offset, int count) => Read(buffer.AsSpan(offset, count));

public override int Read(Span<byte> buffer)
{
ObjectDisposedException.ThrowIf(_disposed, this);
if (buffer.IsEmpty || _position >= _sequence.Length)
{
return 0;
}

var length = (int)Math.Min(buffer.Length, _sequence.Length - _position);
_sequence.Slice(_position, length).CopyTo(buffer);
_position += length;
return length;
}

public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
return new ValueTask<int>(Read(buffer.Span));
}

public override long Seek(long offset, SeekOrigin origin)
{
ObjectDisposedException.ThrowIf(_disposed, this);
var newPosition = origin switch
{
SeekOrigin.Begin => offset,
SeekOrigin.Current => _position + offset,
SeekOrigin.End => _sequence.Length + offset,
_ => throw new ArgumentOutOfRangeException(nameof(origin))
};

Position = newPosition;
return _position;
}

public override void SetLength(long value) => throw new NotSupportedException("This stream is read-only.");

public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException("This stream is read-only.");

public override void Write(ReadOnlySpan<byte> buffer) => throw new NotSupportedException("This stream is read-only.");

protected override void Dispose(bool disposing)
{
_disposed = true;
base.Dispose(disposing);
}
}

Loading
Loading