diff --git a/samples/KafkaFlow.Retry.Common.Sample/Helpers/PostgresHelper.cs b/samples/KafkaFlow.Retry.Common.Sample/Helpers/PostgresHelper.cs new file mode 100644 index 00000000..63abb074 --- /dev/null +++ b/samples/KafkaFlow.Retry.Common.Sample/Helpers/PostgresHelper.cs @@ -0,0 +1,51 @@ +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Reflection; +using System.Threading.Tasks; +using Npgsql; + +namespace KafkaFlow.Retry.Common.Sample.Helpers; + +public static class PostgresHelper +{ + public static async Task RecreateSqlSchema(string databaseName, string connectionString) + { + await using (var openCon = new NpgsqlConnection(connectionString)) + { + openCon.Open(); + openCon.ChangeDatabase(databaseName); + + var scripts = GetScriptsForSchemaCreation(); + + foreach (var script in scripts) + { + await using (var queryCommand = new NpgsqlCommand(script)) + { + queryCommand.Connection = openCon; + + await queryCommand.ExecuteNonQueryAsync(); + } + } + } + } + + private static IEnumerable GetScriptsForSchemaCreation() + { + var postgresAssembly = Assembly.LoadFrom("KafkaFlow.Retry.Postgres.dll"); + return postgresAssembly + .GetManifestResourceNames() + .OrderBy(x => x) + .Select(script => + { + using (var s = postgresAssembly.GetManifestResourceStream(script)) + { + using (var sr = new StreamReader(s)) + { + return sr.ReadToEnd(); + } + } + }) + .ToList(); + } +} \ No newline at end of file diff --git a/samples/KafkaFlow.Retry.Common.Sample/KafkaFlow.Retry.Common.Sample.csproj b/samples/KafkaFlow.Retry.Common.Sample/KafkaFlow.Retry.Common.Sample.csproj index f30beb3f..7fa4ab6e 100644 --- a/samples/KafkaFlow.Retry.Common.Sample/KafkaFlow.Retry.Common.Sample.csproj +++ b/samples/KafkaFlow.Retry.Common.Sample/KafkaFlow.Retry.Common.Sample.csproj @@ -9,6 +9,7 @@ + diff --git a/samples/KafkaFlow.Retry.Sample/Helpers/KafkaClusterConfigurationBuilderHelper.cs b/samples/KafkaFlow.Retry.Sample/Helpers/KafkaClusterConfigurationBuilderHelper.cs index 0bcbf739..80b4853e 100644 --- a/samples/KafkaFlow.Retry.Sample/Helpers/KafkaClusterConfigurationBuilderHelper.cs +++ b/samples/KafkaFlow.Retry.Sample/Helpers/KafkaClusterConfigurationBuilderHelper.cs @@ -2,6 +2,7 @@ using Confluent.Kafka; using KafkaFlow.Configuration; using KafkaFlow.Retry.MongoDb; +using KafkaFlow.Retry.Postgres; using KafkaFlow.Retry.Sample.Exceptions; using KafkaFlow.Retry.Sample.Handlers; using KafkaFlow.Retry.Sample.Messages; @@ -112,8 +113,102 @@ internal static IClusterConfigurationBuilder SetupRetryDurableMongoDb( return cluster; } - internal static IClusterConfigurationBuilder SetupRetryDurableSqlServer( + internal static IClusterConfigurationBuilder SetupRetryDurablePostgres( this IClusterConfigurationBuilder cluster, + string postgresConnectionString, + string postgresDatabaseName) + { + cluster + .AddProducer( + "kafka-flow-retry-durable-postgres-producer", + producer => producer + .DefaultTopic("sample-kafka-flow-retry-durable-postgres-topic") + .WithCompression(CompressionType.Gzip) + .AddMiddlewares( + middlewares => middlewares + .AddSerializer() + ) + .WithAcks(Acks.All) + ) + .AddConsumer( + consumer => consumer + .Topic("sample-kafka-flow-retry-durable-postgres-topic") + .WithGroupId("sample-consumer-kafka-flow-retry-durable-postgres") + .WithName("kafka-flow-retry-durable-postgres-consumer") + .WithBufferSize(10) + .WithWorkersCount(20) + .WithAutoOffsetReset(AutoOffsetReset.Latest) + .AddMiddlewares( + middlewares => middlewares + .AddDeserializer() + .RetryDurable( + configure => configure + .Handle() + .WithMessageType(typeof(RetryDurableTestMessage)) + .WithPostgresDataProvider( + postgresConnectionString, + postgresDatabaseName) + .WithRetryPlanBeforeRetryDurable( + configure => configure + .TryTimes(3) + .WithTimeBetweenTriesPlan( + TimeSpan.FromMilliseconds(250), + TimeSpan.FromMilliseconds(500), + TimeSpan.FromMilliseconds(1000)) + .ShouldPauseConsumer(false) + ) + .WithEmbeddedRetryCluster( + cluster, + configure => configure + .WithRetryTopicName("sample-kafka-flow-retry-durable-postgres-topic-retry") + .WithRetryConsumerBufferSize(4) + .WithRetryConsumerWorkersCount(2) + .WithRetryConsumerStrategy(RetryConsumerStrategy.LatestConsumption) + .WithRetryTypedHandlers( + handlers => handlers + .WithHandlerLifetime(InstanceLifetime.Transient) + .AddHandler() + ) + .Enabled(true) + ) + .WithPollingJobsConfiguration( + configure => configure + .WithSchedulerId("retry-durable-postgres-polling-id") + .WithRetryDurablePollingConfiguration( + configure => configure + .WithCronExpression("0 0/1 * 1/1 * ? *") + .WithExpirationIntervalFactor(1) + .WithFetchSize(10) + .Enabled(true) + ) + .WithCleanupPollingConfiguration( + configure => configure + .Enabled(false) + .WithCronExpression("0 0/1 * 1/1 * ? *") + ) + .WithRetryDurableActiveQueuesCountPollingConfiguration( + configure => configure + .Enabled(true) + .WithCronExpression("0 0/1 * 1/1 * ? *") + .Do((numberOfActiveQueues) => + { + Console.Write($"Number of postgres active queues {numberOfActiveQueues}"); + }) + ) + + )) + .AddTypedHandlers( + handlers => handlers + .WithHandlerLifetime(InstanceLifetime.Transient) + .AddHandler()) + ) + ); + + return cluster; + } + + internal static IClusterConfigurationBuilder SetupRetryDurableSqlServer( + this IClusterConfigurationBuilder cluster, string sqlServerConnectionString, string sqlServerDatabaseName) { diff --git a/samples/KafkaFlow.Retry.Sample/KafkaFlow.Retry.Sample.csproj b/samples/KafkaFlow.Retry.Sample/KafkaFlow.Retry.Sample.csproj index 04fe992e..cde1261e 100644 --- a/samples/KafkaFlow.Retry.Sample/KafkaFlow.Retry.Sample.csproj +++ b/samples/KafkaFlow.Retry.Sample/KafkaFlow.Retry.Sample.csproj @@ -18,6 +18,7 @@ + diff --git a/samples/KafkaFlow.Retry.Sample/Program.cs b/samples/KafkaFlow.Retry.Sample/Program.cs index c4ac2fbb..0a0b908f 100644 --- a/samples/KafkaFlow.Retry.Sample/Program.cs +++ b/samples/KafkaFlow.Retry.Sample/Program.cs @@ -23,7 +23,9 @@ private static async Task Main() var mongoDbRetryQueueItemCollectionName = "RetryQueueItems"; var sqlServerConnectionString = string.Join( string.Empty, - "Server=localhost;", + "Server=sqlserver.docker.internal;", + "User ID = sa;", + "Password=Finance123.;", "Trusted_Connection=false;", "TrustServerCertificate=true;", "Integrated Security=false;", @@ -35,6 +37,15 @@ private static async Task Main() "Encrypt=false;" ); var sqlServerDatabaseName = "kafka_flow_retry_durable_sample"; + var postgresConnectionString = string.Join( + string.Empty, + "Server=localhost;", + "User Id=postgres;", + "Password=Postgres123123;", + "Port=5432;", + "Application Name=KafkaFlow Retry Tests;" + ); + var postgresDatabaseName = "kafka_flow_retry_durable_sample"; var topics = new[] { "sample-kafka-flow-retry-simple-topic", @@ -42,10 +53,13 @@ private static async Task Main() "sample-kafka-flow-retry-durable-sqlserver-topic", "sample-kafka-flow-retry-durable-sqlserver-topic-retry", "sample-kafka-flow-retry-durable-mongodb-topic", - "sample-kafka-flow-retry-durable-mongodb-topic-retry" + "sample-kafka-flow-retry-durable-mongodb-topic-retry", + "sample-kafka-flow-retry-durable-postgres-topic", + "sample-kafka-flow-retry-durable-postgres-topic-retry", }; SqlServerHelper.RecreateSqlSchema(sqlServerDatabaseName, sqlServerConnectionString).GetAwaiter().GetResult(); + PostgresHelper.RecreateSqlSchema(postgresDatabaseName, postgresConnectionString).GetAwaiter().GetResult(); KafkaHelper.CreateKafkaTopics(brokers, topics).GetAwaiter().GetResult(); services.AddKafka( @@ -65,6 +79,9 @@ private static async Task Main() .SetupRetryDurableSqlServer( sqlServerConnectionString, sqlServerDatabaseName) + .SetupRetryDurablePostgres( + postgresConnectionString, + postgresDatabaseName) ) ); @@ -81,7 +98,7 @@ private static async Task Main() while (true) { - Console.Write("retry-simple, retry-forever, retry-durable-mongodb, retry-durable-sqlserver or exit: "); + Console.Write("\nChoose a command:\nretry-simple\nretry-forever\nretry-durable-mongodb\nretry-durable-sqlserver\nretry-durable-postgres\nexit\n: "); var input = Console.ReadLine().ToLower(CultureInfo.InvariantCulture); switch (input) @@ -149,6 +166,38 @@ await producers["kafka-flow-retry-durable-sqlserver-producer"] } break; + case "retry-durable-postgres": + { + Console.Write("Number of the distinct messages to produce: "); + int.TryParse(Console.ReadLine(), out var numOfMessages); + Console.Write("Number of messages with same partition key: "); + int.TryParse(Console.ReadLine(), out var numOfMessagesWithSamePartitionkey); + + var messages = Enumerable + .Range(0, numOfMessages) + .SelectMany( + x => + { + var partitionKey = Guid.NewGuid().ToString(); + return Enumerable + .Range(0, numOfMessagesWithSamePartitionkey) + .Select(y => new BatchProduceItem( + "sample-kafka-flow-retry-durable-postgres-topic", + partitionKey, + new RetryDurableTestMessage { Text = $"Message({y}): {Guid.NewGuid()}" }, + null)) + .ToList(); + } + ) + .ToList(); + + await producers["kafka-flow-retry-durable-postgres-producer"] + .BatchProduceAsync(messages) + .ConfigureAwait(false); + Console.WriteLine("Published"); + } + break; + case "retry-forever": { Console.Write("Number of messages to produce: "); @@ -195,7 +244,7 @@ await producers["kafka-flow-retry-simple-producer"] default: Console.Write( - "USE: retry-simple, retry-forever, retry-durable-mongodb, retry-durable-sqlserver or exit: "); + "USE: retry-simple, retry-forever, retry-durable-mongodb, retry-durable-sqlserver, retry-durable-postgres or exit: "); break; } }