Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using AWSGsrSerDe.deserializer;
using AWSGsrSerDe.common;
using Google.Protobuf;
using KafkaFlow;

namespace AWSGsrSerDe.KafkaFlow
{
/// <summary>
/// KafkaFlow middleware that DESERIALIZES byte[] -> T using AWS Glue SR.
/// Register it with MiddlewareLifetime.Worker for one instance per worker.
/// </summary>
public sealed class GlueSchemaRegistryKafkaFlowProtobufDeserializeMiddleware<T> : IMessageMiddleware
where T : class, IMessage<T>, new()
{
private readonly GlueSchemaRegistryKafkaDeserializer _gsr;

public GlueSchemaRegistryKafkaFlowProtobufDeserializeMiddleware(string configPath)
{
try
{
var dataConfig = new GlueSchemaRegistryDataFormatConfiguration(new Dictionary<string, dynamic>
{
{ GlueSchemaRegistryConstants.ProtobufMessageDescriptor, new T().Descriptor }
});

_gsr = new GlueSchemaRegistryKafkaDeserializer(configPath, dataConfig);
}
catch (Exception ex)
{
throw new InvalidOperationException(
$"Failed to initialize GlueSchemaRegistry deserializer for {typeof(T).Name}: {ex.Message}", ex);
}
}

public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
{
if (context is null) throw new ArgumentNullException(nameof(context));

// Expect raw bytes coming from Kafka
if (context.Message.Value is not byte[] payload)
throw new InvalidOperationException(
$"Expected byte[] payload before deserialization, got {context.Message.Value?.GetType().FullName ?? "null"}");

// Topic to assist SR lookup / subject resolution
var topic = context.ConsumerContext?.Topic
?? throw new InvalidOperationException("Topic not available in ConsumerContext");

T value;
try
{
// Use the deserializer to get the typed object
value = (T)_gsr.Deserialize(topic, payload);
}
catch (Exception ex)
{
throw new InvalidOperationException(
$"Failed to deserialize {typeof(T).Name} from topic '{topic}': {ex.Message}", ex);
}

// Create a new message with the deserialized value and continue to next middleware
var transformedContext = context.SetMessage(context.Message.Key, value);

await next(transformedContext);
}
}
}
39 changes: 27 additions & 12 deletions native-schema-registry/demos/csharp/Consumer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,60 +91,75 @@ static IHost CreateHost()
.Topic("users")
.WithGroupId("user-consumer-group")
.WithBufferSize(100)
.WithWorkersCount(1)
.WithWorkersCount(10)
.WithAutoOffsetReset(AutoOffsetReset.Earliest)
.AddMiddlewares(middlewares => middlewares
.Add<ExceptionMiddleware>()
.AddDeserializer(resolver => new GlueSchemaRegistryKafkaFlowProtobufDeserializer<User>(CONFIG_PATH))
.Add(
resolver => new GlueSchemaRegistryKafkaFlowProtobufDeserializeMiddleware<User>(CONFIG_PATH),
MiddlewareLifetime.Worker
)
.AddTypedHandlers(h => h.AddHandler<UserHandler>())
)
)
.AddConsumer(consumer => consumer
.Topic("products")
.WithGroupId("product-consumer-group")
.WithBufferSize(100)
.WithWorkersCount(1)
.WithWorkersCount(10)
.WithAutoOffsetReset(AutoOffsetReset.Earliest)
.AddMiddlewares(middlewares => middlewares
.Add<ExceptionMiddleware>()
.AddDeserializer(resolver => new GlueSchemaRegistryKafkaFlowProtobufDeserializer<Product>(CONFIG_PATH))
.Add(
resolver => new GlueSchemaRegistryKafkaFlowProtobufDeserializeMiddleware<Product>(CONFIG_PATH),
MiddlewareLifetime.Worker
)
.AddTypedHandlers(h => h.AddHandler<ProductHandler>())
)
)
.AddConsumer(consumer => consumer
.Topic("orders")
.WithGroupId("order-consumer-group")
.WithBufferSize(100)
.WithWorkersCount(1)
.WithWorkersCount(10)
.WithAutoOffsetReset(AutoOffsetReset.Earliest)
.AddMiddlewares(middlewares => middlewares
.Add<ExceptionMiddleware>()
.AddDeserializer(resolver => new GlueSchemaRegistryKafkaFlowProtobufDeserializer<Order>(CONFIG_PATH))
.Add(
resolver => new GlueSchemaRegistryKafkaFlowProtobufDeserializeMiddleware<Order>(CONFIG_PATH),
MiddlewareLifetime.Worker
)
.AddTypedHandlers(h => h.AddHandler<OrderHandler>())
)
)
.AddConsumer(consumer => consumer
.Topic("payments")
.WithGroupId("payment-consumer-group")
.WithBufferSize(100)
.WithWorkersCount(1)
.WithWorkersCount(10)
.WithAutoOffsetReset(AutoOffsetReset.Earliest)
.AddMiddlewares(middlewares => middlewares
.Add<ExceptionMiddleware>()
.AddDeserializer(resolver => new GlueSchemaRegistryKafkaFlowProtobufDeserializer<Payment>(CONFIG_PATH))
.Add(
resolver => new GlueSchemaRegistryKafkaFlowProtobufDeserializeMiddleware<Payment>(CONFIG_PATH),
MiddlewareLifetime.Worker
)
.AddTypedHandlers(h => h.AddHandler<PaymentHandler>())
)
)
.AddConsumer(consumer => consumer
.Topic("events")
.WithGroupId("event-consumer-group")
.WithBufferSize(100)
.WithWorkersCount(1)
.WithWorkersCount(10)
.WithAutoOffsetReset(AutoOffsetReset.Earliest)
.AddMiddlewares(middlewares => middlewares
.Add<ExceptionMiddleware>()
.AddDeserializer(resolver => new GlueSchemaRegistryKafkaFlowProtobufDeserializer<Event>(CONFIG_PATH))
.AddTypedHandlers(h => h.AddHandler<EventHandler>())
.Add<ExceptionMiddleware>()
.Add(
resolver => new GlueSchemaRegistryKafkaFlowProtobufDeserializeMiddleware<Event>(CONFIG_PATH),
MiddlewareLifetime.Worker
)
.AddTypedHandlers(h => h.AddHandler<EventHandler>())
)
)
)
Expand Down
43 changes: 13 additions & 30 deletions native-schema-registry/demos/csharp/Producer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ static void ProduceUsers(IHost host)
{
var producer = host.Services.GetRequiredService<IProducerAccessor>().GetProducer("user-producer");

for (int counter = 1; counter <= 10 && !_shutdown; counter++)
for (int counter = 1; counter <= 1000 && !_shutdown; counter++)
{
try
{
Expand All @@ -130,12 +130,9 @@ static void ProduceUsers(IHost host)

producer.Produce(user.Id, user);

lock (_lock)
{
Console.WriteLine($"[UserProducer] Sent: {user.Name} ({user})");
}
Console.WriteLine($"[UserProducer] Sent: {user.Name} ({user})");


Thread.Sleep(1000);
}
catch (Exception ex)
{
Expand All @@ -153,7 +150,7 @@ static void ProduceProducts(IHost host)
{
var producer = host.Services.GetRequiredService<IProducerAccessor>().GetProducer("product-producer");

for (int counter = 1; counter <= 10 && !_shutdown; counter++)
for (int counter = 1; counter <= 1000 && !_shutdown; counter++)
{
try
{
Expand All @@ -168,12 +165,9 @@ static void ProduceProducts(IHost host)

producer.Produce(product.Sku, product);

lock (_lock)
{
Console.WriteLine($"[ProductProducer] Sent: {product.Name} (${product})");
}
Console.WriteLine($"[ProductProducer] Sent: {product.Name} (${product})");


Thread.Sleep(1200);
}
catch (Exception ex)
{
Expand All @@ -191,7 +185,7 @@ static void ProduceOrders(IHost host)
{
var producer = host.Services.GetRequiredService<IProducerAccessor>().GetProducer("order-producer");

for (int counter = 1; counter <= 10 && !_shutdown; counter++)
for (int counter = 1; counter <= 1000 && !_shutdown; counter++)
{
try
{
Expand Down Expand Up @@ -231,12 +225,8 @@ static void ProduceOrders(IHost host)

producer.Produce(order.OrderId, order);

lock (_lock)
{
Console.WriteLine($"[OrderProducer] Sent: Order {order.OrderId} (${order.Header})");
}
Console.WriteLine($"[OrderProducer] Sent: Order {order.OrderId} (${order.Header})");

Thread.Sleep(1500);
}
catch (Exception ex)
{
Expand All @@ -254,7 +244,7 @@ static void ProducePayments(IHost host)
{
var producer = host.Services.GetRequiredService<IProducerAccessor>().GetProducer("payment-producer");

for (int counter = 1; counter <= 10 && !_shutdown; counter++)
for (int counter = 1; counter <= 1000 && !_shutdown; counter++)
{
try
{
Expand Down Expand Up @@ -299,12 +289,9 @@ static void ProducePayments(IHost host)

producer.Produce(payment.PaymentId, payment);

lock (_lock)
{
Console.WriteLine($"[PaymentProducer] Sent: Payment {payment.PaymentId} (${payment})");
}
Console.WriteLine($"[PaymentProducer] Sent: Payment {payment.PaymentId} (${payment})");


Thread.Sleep(1800);
}
catch (Exception ex)
{
Expand All @@ -322,7 +309,7 @@ static void ProduceEvents(IHost host)
{
var producer = host.Services.GetRequiredService<IProducerAccessor>().GetProducer("event-producer");

for (int counter = 1; counter <= 10 && !_shutdown; counter++)
for (int counter = 1; counter <= 1000 && !_shutdown; counter++)
{
try
{
Expand Down Expand Up @@ -369,12 +356,8 @@ static void ProduceEvents(IHost host)

producer.Produce(eventMsg.EventId, eventMsg);

lock (_lock)
{
Console.WriteLine($"[EventProducer] Sent: Event {eventMsg.EventId} ({eventMsg})");
}
Console.WriteLine($"[EventProducer] Sent: Event {eventMsg.EventId} ({eventMsg})");

Thread.Sleep(800);
}
catch (Exception ex)
{
Expand Down
Loading