Skip to content
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
ab10cc1
FS clean up
lsfera Jul 4, 2024
977f203
suppress CS1591 (Missing XML comment for publicly visible type or me…
lsfera Jul 4, 2024
ad74d3c
made UseTable() optional
lsfera Jul 4, 2024
b59d386
enforce AOT
lsfera Jul 4, 2024
71c1f14
rename IConsumes to IHandles
lsfera Jul 4, 2024
86656ec
Use double quote
lsfera Jul 4, 2024
438cf21
use ILKE. Replication slot name is forced to lcase
lsfera Jul 4, 2024
b24c01f
unused directive
lsfera Jul 4, 2024
b024c6f
remove static variables to enable multiple instances on same process
lsfera Jul 4, 2024
8cc1fed
Added DependencyIbjection project
lsfera Jul 4, 2024
380d06a
Added demo projrct for DI
lsfera Jul 4, 2024
acf5f4d
Enhanced logging
lsfera Jul 5, 2024
e918dfa
bumped version to 0.1.1
lsfera Jul 5, 2024
2055fed
allow tableDescriptor access
lsfera Jul 5, 2024
eadba73
renamed vars
lsfera Jul 5, 2024
c097047
expose NpgsqlDataSource along with connection string - close #16
lsfera Jul 15, 2024
ef9ac95
rename extension method
lsfera Jul 15, 2024
345cad6
move to files
lsfera Jul 15, 2024
645ec1a
simplified di registration
lsfera Jul 16, 2024
ca28054
collapse project
lsfera Jul 16, 2024
f4cfe0c
rename folder
lsfera Jul 16, 2024
f9bf48f
mark as implicit usage
lsfera Jul 17, 2024
7fff1b4
switch to prepared statement
lsfera Jul 17, 2024
f0c4160
dispose resources
lsfera Jul 17, 2024
8db4187
explicit defaults
lsfera Jul 17, 2024
c446721
add PublisherOptions
lsfera Jul 17, 2024
e71999b
Expose shortcut for table validation when bootstrapping publisher
lsfera Jul 17, 2024
2351af1
explain different available to publisher for validating table
lsfera Jul 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Blumchen.sln
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "postgres", "postgres", "{8A
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "demo", "demo", "{A4044484-FE08-4399-8239-14AABFA30AD7}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Blumchen.DependencyInjection", "src\Blumchen.DependencyInjection\Blumchen.DependencyInjection.csproj", "{A07167E3-4CF7-40EF-8E55-A37A0F57B89D}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SubscriberWorker", "src\SubscriberWorker\SubscriberWorker.csproj", "{DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -69,6 +73,14 @@ Global
{2BBDA071-FB1C-4D62-A954-B22EA6B1C738}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2BBDA071-FB1C-4D62-A954-B22EA6B1C738}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2BBDA071-FB1C-4D62-A954-B22EA6B1C738}.Release|Any CPU.Build.0 = Release|Any CPU
{A07167E3-4CF7-40EF-8E55-A37A0F57B89D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A07167E3-4CF7-40EF-8E55-A37A0F57B89D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A07167E3-4CF7-40EF-8E55-A37A0F57B89D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A07167E3-4CF7-40EF-8E55-A37A0F57B89D}.Release|Any CPU.Build.0 = Release|Any CPU
{DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -79,6 +91,7 @@ Global
{F81E2D5B-FC59-4396-A911-56BE65E4FE80} = {A4044484-FE08-4399-8239-14AABFA30AD7}
{C050E9E8-3FB6-4581-953F-31826E385FB4} = {CD59A1A0-F40D-4047-87A3-66C0F1519FA5}
{8AAAA344-B5FD-48D9-B2BA-379E374448D4} = {CD59A1A0-F40D-4047-87A3-66C0F1519FA5}
{DB58DB36-0366-4ABA-BC06-FCA9BB10EB92} = {A4044484-FE08-4399-8239-14AABFA30AD7}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {9A868C51-0460-4700-AF33-E1A921192614}
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Main logic is placed in [EventsSubscription](./src/Blumchen/Subscriptions/Subscr
```shell
docker-compose up
```
2. Run(order doesn't matter) Publisher and Subscriber apps, under 'demo' folder, from vs-studio, and follow Publisher instructions.
2. Run(order doesn't matter) Publisher and (Subscriber or SubscriberWorker) apps, under 'demo' folder, from vs-studio, and follow Publisher instructions.

## Testing (against default docker instance)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FIX: Let's merge it into the regular project. I'm fine with having hosting.abstractions and Polly dependency in the main project. It's much more accessible when user installs just a package and have all available (of course, unless that's enforcing too much from the peer dependency side).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've no strong opinion against it

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lsfera, could you change that in the PR?

<VersionPrefix>0.1.1</VersionPrefix>
<TargetFramework>net8.0</TargetFramework>
<GenerateAssemblyTitleAttribute>true</GenerateAssemblyTitleAttribute>
<GenerateAssemblyDescriptionAttribute>true</GenerateAssemblyDescriptionAttribute>
<GenerateAssemblyProductAttribute>true</GenerateAssemblyProductAttribute>
<GenerateAssemblyCopyrightAttribute>false</GenerateAssemblyCopyrightAttribute>
<GenerateAssemblyVersionAttribute>true</GenerateAssemblyVersionAttribute>
<GenerateAssemblyFileVersionAttribute>true</GenerateAssemblyFileVersionAttribute>
<GenerateAssemblyInformationalVersionAttribute>true</GenerateAssemblyInformationalVersionAttribute>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<LangVersion>12.0</LangVersion>
<Authors>Oskar Dudycz</Authors>
<!-- <PackageIconUrl>https://github.com/event-driven-io/Blumchen/content/images/emblem.png</PackageIconUrl>-->
<PackageProjectUrl>https://github.com/event-driven-io/Blumchen</PackageProjectUrl>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
<RepositoryUrl>https://github.com/event-driven-io/Blumchen.git</RepositoryUrl>
<PublishRepositoryUrl>true</PublishRepositoryUrl>
<Product>Blumchen</Product>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<EmbedUntrackedSources>true</EmbedUntrackedSources>
<IncludeSymbols>true</IncludeSymbols>
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
<RootNamespace>Blumchen</RootNamespace>
<PublishAot>true</PublishAot>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<NoWarn>1591</NoWarn>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'">
<NoWarn>1591</NoWarn>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="8.0.0" />
<PackageReference Include="Polly" Version="8.4.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Blumchen\Blumchen.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
namespace Blumchen.Configuration;
public record DatabaseOptions(string ConnectionString);
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using Microsoft.Extensions.DependencyInjection;
#pragma warning disable IL2091

namespace Blumchen.Workers;

public static class ServiceCollectionExtensions
{
public static IServiceCollection AddBlumchen<T,TU>(this IServiceCollection service, T? instance = default)
where T : Worker<TU> where TU : class =>
instance is null
? service.AddHostedService<T>()
: service.AddHostedService(_=>instance);
}
66 changes: 66 additions & 0 deletions src/Blumchen.DependencyInjection/Workers/Worker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
using System.Collections.Concurrent;
using System.Text.Json.Serialization;
using Blumchen.Configuration;
using Blumchen.Serialization;
using Blumchen.Subscriptions;
using Blumchen.Subscriptions.Management;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Polly;


namespace Blumchen.Workers;

public abstract class Worker<T>(
DatabaseOptions databaseOptions,
IHandler<T> handler,
JsonSerializerContext jsonSerializerContext,
IErrorProcessor errorProcessor,
ResiliencePipeline pipeline,
INamingPolicy namingPolicy,
PublicationManagement.PublicationSetupOptions publicationSetupOptions,
ReplicationSlotManagement.ReplicationSlotSetupOptions replicationSlotSetupOptions,
Func<TableDescriptorBuilder,TableDescriptorBuilder> tableDescriptorBuilder,
ILoggerFactory loggerFactory): BackgroundService where T : class
{
private readonly ILogger<Worker<T>> _logger = loggerFactory.CreateLogger<Worker<T>>();
private string WorkerName { get; } = $"{nameof(Worker<T>)}<{typeof(T).Name}>";
private static readonly ConcurrentDictionary<string, Action<ILogger, string, object[]>> LoggingActions = new(StringComparer.OrdinalIgnoreCase);
private static void Notify(ILogger logger, LogLevel level, string template, params object[] parameters)
{
static Action<ILogger, string, object[]> LoggerAction(LogLevel ll, bool enabled) =>
(ll, enabled) switch
{
(LogLevel.Information, true) => (logger, template, parameters) => logger.LogInformation(template, parameters),
(LogLevel.Debug, true) => (logger, template, parameters) => logger.LogDebug(template, parameters),
(_, _) => (_, __, ___) => { }
};
LoggingActions.GetOrAdd(template,s => LoggerAction(level, logger.IsEnabled(level)))(logger, template, parameters);
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await pipeline.ExecuteAsync(async token =>
{
await using var subscription = new Subscription();
await using var cursor = subscription.Subscribe(builder =>
builder
.ConnectionString(databaseOptions.ConnectionString)
.WithTable(tableDescriptorBuilder)
.WithErrorProcessor(errorProcessor)
.Handles<T, IHandler<T>>(handler)
.NamingPolicy(namingPolicy)
.JsonContext(jsonSerializerContext)
.WithPublicationOptions(publicationSetupOptions)
.WithReplicationOptions(replicationSlotSetupOptions)
, ct: token, loggerFactory: loggerFactory).GetAsyncEnumerator(token);
Notify(_logger, LogLevel.Information,"{WorkerName} started", WorkerName);
while (await cursor.MoveNextAsync().ConfigureAwait(false) && !token.IsCancellationRequested)
Notify(_logger, LogLevel.Debug, "{cursor.Current} processed", cursor.Current);

}, stoppingToken).ConfigureAwait(false);
Notify(_logger, LogLevel.Information, "{WorkerName} stopped", WorkerName);
return;
}

}
12 changes: 11 additions & 1 deletion src/Blumchen/Blumchen.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<VersionPrefix>0.1.0</VersionPrefix>
<VersionPrefix>0.1.1</VersionPrefix>
<TargetFramework>net8.0</TargetFramework>
<GenerateAssemblyTitleAttribute>true</GenerateAssemblyTitleAttribute>
<GenerateAssemblyDescriptionAttribute>true</GenerateAssemblyDescriptionAttribute>
Expand All @@ -25,6 +25,16 @@
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
<RootNamespace>Blumchen</RootNamespace>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<NoWarn>1591</NoWarn>
<WarningsNotAsErrors></WarningsNotAsErrors>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'">
<NoWarn>1591</NoWarn>
<WarningsNotAsErrors></WarningsNotAsErrors>
</PropertyGroup>
<ItemGroup>
<AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleToAttribute">
<_Parameter1>Tests</_Parameter1>
Expand Down
2 changes: 0 additions & 2 deletions src/Blumchen/Database/Run.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
using Blumchen.Subscriptions.ReplicationMessageHandlers;
using Npgsql;

#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member

namespace Blumchen.Database;

public static class Run
Expand Down
3 changes: 2 additions & 1 deletion src/Blumchen/MessageTableOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

namespace Blumchen;

#pragma warning disable CS1591
public record TableDescriptorBuilder
{
private MessageTable TableDescriptor { get; set; } = new();
Expand Down Expand Up @@ -34,6 +33,8 @@ public TableDescriptorBuilder MessageType(string name, int dimension = 250)
return this;
}

public TableDescriptorBuilder UseDefaults() => this;

public record MessageTable(string Name = MessageTable.DefaultName)
{
internal const string DefaultName = "outbox";
Expand Down
1 change: 0 additions & 1 deletion src/Blumchen/Publications/MessageAppender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using Npgsql;

namespace Blumchen.Publications;
#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member

public static class MessageAppender
{
Expand Down
1 change: 0 additions & 1 deletion src/Blumchen/Publications/PublisherSetupOptionsBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

namespace Blumchen.Publications;

#pragma warning disable CS1591
public class PublisherSetupOptionsBuilder
{
private INamingPolicy? _namingPolicy;
Expand Down
1 change: 0 additions & 1 deletion src/Blumchen/Serialization/IDictionaryExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member
namespace Blumchen.Serialization;


Expand Down
1 change: 0 additions & 1 deletion src/Blumchen/Serialization/INamingPolicy.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
namespace Blumchen.Serialization;
#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member

public interface INamingPolicy
{
Expand Down
1 change: 0 additions & 1 deletion src/Blumchen/Serialization/ITypeResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using System.Text.Json.Serialization.Metadata;

namespace Blumchen.Serialization;
#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member

public interface ITypeResolver<T>
{
Expand Down
1 change: 0 additions & 1 deletion src/Blumchen/Serialization/JsonSerialization.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using Blumchen.Streams;

namespace Blumchen.Serialization;
#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member

public static class JsonSerialization
{
Expand Down
1 change: 0 additions & 1 deletion src/Blumchen/Serialization/MessageUrnAttribute.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System.Collections.Concurrent;

namespace Blumchen.Serialization;
#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member

[AttributeUsage(AttributeTargets.Class | AttributeTargets.Interface)]
public class MessageUrnAttribute:
Expand Down
9 changes: 0 additions & 9 deletions src/Blumchen/Subscriptions/IConsume.cs

This file was deleted.

8 changes: 8 additions & 0 deletions src/Blumchen/Subscriptions/IHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace Blumchen.Subscriptions;

public interface IHandler;

public interface IHandler<in T>: IHandler where T : class
{
Task Handle(T value);
}
4 changes: 2 additions & 2 deletions src/Blumchen/Subscriptions/ISubscriptionOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ void Deconstruct(
out ReplicationSlotSetupOptions replicationSlotSetupOptions,
out IErrorProcessor errorProcessor,
out IReplicationDataMapper dataMapper,
out Dictionary<Type, IConsume> registry);
out Dictionary<Type, IHandler> registry);
}

internal record SubscriptionOptions(
Expand All @@ -28,4 +28,4 @@ internal record SubscriptionOptions(
ReplicationSlotSetupOptions ReplicationOptions,
IErrorProcessor ErrorProcessor,
IReplicationDataMapper DataMapper,
Dictionary<Type, IConsume> Registry): ISubscriptionOptions;
Dictionary<Type, IHandler> Registry): ISubscriptionOptions;
13 changes: 4 additions & 9 deletions src/Blumchen/Subscriptions/Management/PublicationManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using Npgsql;

#pragma warning disable CA2208
#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member

namespace Blumchen.Subscriptions.Management;

Expand Down Expand Up @@ -70,14 +69,11 @@ internal static Task CreatePublication(
ISet<string> eventTypes,
CancellationToken ct
) {
var sql = $"CREATE PUBLICATION \"{publicationName}\" FOR TABLE {tableName} {{0}} WITH (publish = 'insert');";
return eventTypes.Count switch
{
0 => Execute(dataSource, $"CREATE PUBLICATION {publicationName} FOR TABLE {tableName} WITH (publish = 'insert');",
ct
),
_ => Execute(dataSource, $"CREATE PUBLICATION {publicationName} FOR TABLE {tableName} WHERE ({PublicationFilter(eventTypes)}) WITH (publish = 'insert');",
ct
)
0 => Execute(dataSource, string.Format(sql,string.Empty), ct),
_ => Execute(dataSource, string.Format(sql, $"WHERE ({PublicationFilter(eventTypes)})"), ct)
};
static string PublicationFilter(ICollection<string> input) => string.Join(" OR ", input.Select(s => $"message_type = '{s}'"));
}
Expand Down Expand Up @@ -129,8 +125,7 @@ private static Task<bool> PublicationExists(
this NpgsqlDataSource dataSource,
string publicationName,
CancellationToken ct
) =>
dataSource.Exists("pg_publication", "pubname = $1", [publicationName], ct);
) => dataSource.Exists("pg_publication", "pubname = $1", [publicationName], ct);

public abstract record SetupPublicationResult
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@

namespace Blumchen.Subscriptions.Management;
using static ReplicationSlotManagement.CreateReplicationSlotResult;
#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member

public static class ReplicationSlotManagement
{
#pragma warning disable CA2208
private static Task<bool> ReplicationSlotExists(
this NpgsqlDataSource dataSource,
string slotName,
CancellationToken ct
) => dataSource.Exists("pg_replication_slots", "slot_name ILIKE $1", [slotName], ct);

public static async Task<CreateReplicationSlotResult> SetupReplicationSlot(
this NpgsqlDataSource dataSource,
LogicalReplicationConnection connection,
Expand Down Expand Up @@ -54,18 +59,12 @@ static async Task<CreateReplicationSlotResult> Create(
}
}

private static Task<bool> ReplicationSlotExists(
this NpgsqlDataSource dataSource,
string slotName,
CancellationToken ct
) => dataSource.Exists("pg_replication_slots", "slot_name = $1", [slotName], ct);

public record ReplicationSlotSetupOptions(
string SlotName = $"{TableDescriptorBuilder.MessageTable.DefaultName}_slot",
Subscription.CreateStyle CreateStyle = Subscription.CreateStyle.WhenNotExists,
bool Binary = false //https://www.postgresql.org/docs/current/sql-createsubscription.html#SQL-CREATESUBSCRIPTION-WITH-BINARY
bool Binary =
false //https://www.postgresql.org/docs/current/sql-createsubscription.html#SQL-CREATESUBSCRIPTION-WITH-BINARY
);

public abstract record CreateReplicationSlotResult
{
public record None: CreateReplicationSlotResult;
Expand Down
1 change: 0 additions & 1 deletion src/Blumchen/Subscriptions/MimeType.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
namespace Blumchen.Subscriptions;

#pragma warning disable CS1591
public abstract record MimeType(string mimeType)
{
public record Json(): MimeType("application/json");
Expand Down
Loading