diff --git a/native-schema-registry/csharp/AWSGsrSerDe/AWSGsrSerDe/KafkaFlowSerDeMiddleware/GlueSchemaRegistryKafkaFlowProtobufDeserializeMiddleware.cs b/native-schema-registry/csharp/AWSGsrSerDe/AWSGsrSerDe/KafkaFlowSerDeMiddleware/GlueSchemaRegistryKafkaFlowProtobufDeserializeMiddleware.cs new file mode 100644 index 00000000..e8eba44f --- /dev/null +++ b/native-schema-registry/csharp/AWSGsrSerDe/AWSGsrSerDe/KafkaFlowSerDeMiddleware/GlueSchemaRegistryKafkaFlowProtobufDeserializeMiddleware.cs @@ -0,0 +1,69 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using AWSGsrSerDe.deserializer; +using AWSGsrSerDe.common; +using Google.Protobuf; +using KafkaFlow; + +namespace AWSGsrSerDe.KafkaFlow +{ + /// + /// KafkaFlow middleware that DESERIALIZES byte[] -> T using AWS Glue SR. + /// Register it with MiddlewareLifetime.Worker for one instance per worker. + /// + public sealed class GlueSchemaRegistryKafkaFlowProtobufDeserializeMiddleware : IMessageMiddleware + where T : class, IMessage, new() + { + private readonly GlueSchemaRegistryKafkaDeserializer _gsr; + + public GlueSchemaRegistryKafkaFlowProtobufDeserializeMiddleware(string configPath) + { + try + { + var dataConfig = new GlueSchemaRegistryDataFormatConfiguration(new Dictionary + { + { GlueSchemaRegistryConstants.ProtobufMessageDescriptor, new T().Descriptor } + }); + + _gsr = new GlueSchemaRegistryKafkaDeserializer(configPath, dataConfig); + } + catch (Exception ex) + { + throw new InvalidOperationException( + $"Failed to initialize GlueSchemaRegistry deserializer for {typeof(T).Name}: {ex.Message}", ex); + } + } + + public async Task Invoke(IMessageContext context, MiddlewareDelegate next) + { + if (context is null) throw new ArgumentNullException(nameof(context)); + + // Expect raw bytes coming from Kafka + if (context.Message.Value is not byte[] payload) + throw new InvalidOperationException( + $"Expected byte[] payload before deserialization, got {context.Message.Value?.GetType().FullName ?? "null"}"); + + // Topic to assist SR lookup / subject resolution + var topic = context.ConsumerContext?.Topic + ?? throw new InvalidOperationException("Topic not available in ConsumerContext"); + + T value; + try + { + // Use the deserializer to get the typed object + value = (T)_gsr.Deserialize(topic, payload); + } + catch (Exception ex) + { + throw new InvalidOperationException( + $"Failed to deserialize {typeof(T).Name} from topic '{topic}': {ex.Message}", ex); + } + + // Create a new message with the deserialized value and continue to next middleware + var transformedContext = context.SetMessage(context.Message.Key, value); + + await next(transformedContext); + } + } +} diff --git a/native-schema-registry/demos/csharp/Consumer/Program.cs b/native-schema-registry/demos/csharp/Consumer/Program.cs index f1aeb3d1..405f7dc4 100644 --- a/native-schema-registry/demos/csharp/Consumer/Program.cs +++ b/native-schema-registry/demos/csharp/Consumer/Program.cs @@ -91,11 +91,14 @@ static IHost CreateHost() .Topic("users") .WithGroupId("user-consumer-group") .WithBufferSize(100) - .WithWorkersCount(1) + .WithWorkersCount(10) .WithAutoOffsetReset(AutoOffsetReset.Earliest) .AddMiddlewares(middlewares => middlewares .Add() - .AddDeserializer(resolver => new GlueSchemaRegistryKafkaFlowProtobufDeserializer(CONFIG_PATH)) + .Add( + resolver => new GlueSchemaRegistryKafkaFlowProtobufDeserializeMiddleware(CONFIG_PATH), + MiddlewareLifetime.Worker + ) .AddTypedHandlers(h => h.AddHandler()) ) ) @@ -103,11 +106,14 @@ static IHost CreateHost() .Topic("products") .WithGroupId("product-consumer-group") .WithBufferSize(100) - .WithWorkersCount(1) + .WithWorkersCount(10) .WithAutoOffsetReset(AutoOffsetReset.Earliest) .AddMiddlewares(middlewares => middlewares .Add() - .AddDeserializer(resolver => new GlueSchemaRegistryKafkaFlowProtobufDeserializer(CONFIG_PATH)) + .Add( + resolver => new GlueSchemaRegistryKafkaFlowProtobufDeserializeMiddleware(CONFIG_PATH), + MiddlewareLifetime.Worker + ) .AddTypedHandlers(h => h.AddHandler()) ) ) @@ -115,11 +121,14 @@ static IHost CreateHost() .Topic("orders") .WithGroupId("order-consumer-group") .WithBufferSize(100) - .WithWorkersCount(1) + .WithWorkersCount(10) .WithAutoOffsetReset(AutoOffsetReset.Earliest) .AddMiddlewares(middlewares => middlewares .Add() - .AddDeserializer(resolver => new GlueSchemaRegistryKafkaFlowProtobufDeserializer(CONFIG_PATH)) + .Add( + resolver => new GlueSchemaRegistryKafkaFlowProtobufDeserializeMiddleware(CONFIG_PATH), + MiddlewareLifetime.Worker + ) .AddTypedHandlers(h => h.AddHandler()) ) ) @@ -127,11 +136,14 @@ static IHost CreateHost() .Topic("payments") .WithGroupId("payment-consumer-group") .WithBufferSize(100) - .WithWorkersCount(1) + .WithWorkersCount(10) .WithAutoOffsetReset(AutoOffsetReset.Earliest) .AddMiddlewares(middlewares => middlewares .Add() - .AddDeserializer(resolver => new GlueSchemaRegistryKafkaFlowProtobufDeserializer(CONFIG_PATH)) + .Add( + resolver => new GlueSchemaRegistryKafkaFlowProtobufDeserializeMiddleware(CONFIG_PATH), + MiddlewareLifetime.Worker + ) .AddTypedHandlers(h => h.AddHandler()) ) ) @@ -139,12 +151,15 @@ static IHost CreateHost() .Topic("events") .WithGroupId("event-consumer-group") .WithBufferSize(100) - .WithWorkersCount(1) + .WithWorkersCount(10) .WithAutoOffsetReset(AutoOffsetReset.Earliest) .AddMiddlewares(middlewares => middlewares - .Add() - .AddDeserializer(resolver => new GlueSchemaRegistryKafkaFlowProtobufDeserializer(CONFIG_PATH)) - .AddTypedHandlers(h => h.AddHandler()) + .Add() + .Add( + resolver => new GlueSchemaRegistryKafkaFlowProtobufDeserializeMiddleware(CONFIG_PATH), + MiddlewareLifetime.Worker + ) + .AddTypedHandlers(h => h.AddHandler()) ) ) ) diff --git a/native-schema-registry/demos/csharp/Producer/Program.cs b/native-schema-registry/demos/csharp/Producer/Program.cs index 1e412011..d854bf31 100644 --- a/native-schema-registry/demos/csharp/Producer/Program.cs +++ b/native-schema-registry/demos/csharp/Producer/Program.cs @@ -116,7 +116,7 @@ static void ProduceUsers(IHost host) { var producer = host.Services.GetRequiredService().GetProducer("user-producer"); - for (int counter = 1; counter <= 10 && !_shutdown; counter++) + for (int counter = 1; counter <= 1000 && !_shutdown; counter++) { try { @@ -130,12 +130,9 @@ static void ProduceUsers(IHost host) producer.Produce(user.Id, user); - lock (_lock) - { - Console.WriteLine($"[UserProducer] Sent: {user.Name} ({user})"); - } + Console.WriteLine($"[UserProducer] Sent: {user.Name} ({user})"); + - Thread.Sleep(1000); } catch (Exception ex) { @@ -153,7 +150,7 @@ static void ProduceProducts(IHost host) { var producer = host.Services.GetRequiredService().GetProducer("product-producer"); - for (int counter = 1; counter <= 10 && !_shutdown; counter++) + for (int counter = 1; counter <= 1000 && !_shutdown; counter++) { try { @@ -168,12 +165,9 @@ static void ProduceProducts(IHost host) producer.Produce(product.Sku, product); - lock (_lock) - { - Console.WriteLine($"[ProductProducer] Sent: {product.Name} (${product})"); - } + Console.WriteLine($"[ProductProducer] Sent: {product.Name} (${product})"); + - Thread.Sleep(1200); } catch (Exception ex) { @@ -191,7 +185,7 @@ static void ProduceOrders(IHost host) { var producer = host.Services.GetRequiredService().GetProducer("order-producer"); - for (int counter = 1; counter <= 10 && !_shutdown; counter++) + for (int counter = 1; counter <= 1000 && !_shutdown; counter++) { try { @@ -231,12 +225,8 @@ static void ProduceOrders(IHost host) producer.Produce(order.OrderId, order); - lock (_lock) - { - Console.WriteLine($"[OrderProducer] Sent: Order {order.OrderId} (${order.Header})"); - } + Console.WriteLine($"[OrderProducer] Sent: Order {order.OrderId} (${order.Header})"); - Thread.Sleep(1500); } catch (Exception ex) { @@ -254,7 +244,7 @@ static void ProducePayments(IHost host) { var producer = host.Services.GetRequiredService().GetProducer("payment-producer"); - for (int counter = 1; counter <= 10 && !_shutdown; counter++) + for (int counter = 1; counter <= 1000 && !_shutdown; counter++) { try { @@ -299,12 +289,9 @@ static void ProducePayments(IHost host) producer.Produce(payment.PaymentId, payment); - lock (_lock) - { - Console.WriteLine($"[PaymentProducer] Sent: Payment {payment.PaymentId} (${payment})"); - } + Console.WriteLine($"[PaymentProducer] Sent: Payment {payment.PaymentId} (${payment})"); + - Thread.Sleep(1800); } catch (Exception ex) { @@ -322,7 +309,7 @@ static void ProduceEvents(IHost host) { var producer = host.Services.GetRequiredService().GetProducer("event-producer"); - for (int counter = 1; counter <= 10 && !_shutdown; counter++) + for (int counter = 1; counter <= 1000 && !_shutdown; counter++) { try { @@ -369,12 +356,8 @@ static void ProduceEvents(IHost host) producer.Produce(eventMsg.EventId, eventMsg); - lock (_lock) - { - Console.WriteLine($"[EventProducer] Sent: Event {eventMsg.EventId} ({eventMsg})"); - } + Console.WriteLine($"[EventProducer] Sent: Event {eventMsg.EventId} ({eventMsg})"); - Thread.Sleep(800); } catch (Exception ex) {