From c1c54f194b6c79ee11f1a7ad5ad3b8b1395e3667 Mon Sep 17 00:00:00 2001 From: timmy-cca Date: Wed, 25 Jun 2025 13:27:01 -0400 Subject: [PATCH] Pass basic props to RabbitMQActivitySource.BasicPublish to allow messageid tag to be included in trace span updated tests for messageid tag corrected broken tests after bad conflict resolution from merging main into feature branch --- .../Impl/Channel.BasicPublish.cs | 4 +- .../Impl/RabbitMQActivitySource.cs | 4 +- .../TestActivitySource.cs | 146 +++--------------- .../SequentialIntegration/TestHeartbeats.cs | 6 +- .../TestOpenTelemetry.cs | 85 +++------- 5 files changed, 48 insertions(+), 197 deletions(-) diff --git a/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs b/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs index 2ae3e4cfe..bd4bdaa05 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs @@ -61,7 +61,7 @@ await MaybeEnforceFlowControlAsync(cancellationToken) var cmd = new BasicPublish(exchange, routingKey, mandatory, default); using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners - ? RabbitMQActivitySource.BasicPublish(routingKey, exchange, body.Length) + ? RabbitMQActivitySource.BasicPublish(routingKey, exchange, body.Length, basicProperties) : default; ulong publishSequenceNumber = 0; @@ -116,7 +116,7 @@ await MaybeEnforceFlowControlAsync(cancellationToken) var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default); using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners - ? RabbitMQActivitySource.BasicPublish(routingKey.Value, exchange.Value, body.Length) + ? RabbitMQActivitySource.BasicPublish(routingKey.Value, exchange.Value, body.Length, basicProperties) : default; ulong publishSequenceNumber = 0; diff --git a/projects/RabbitMQ.Client/Impl/RabbitMQActivitySource.cs b/projects/RabbitMQ.Client/Impl/RabbitMQActivitySource.cs index 764828ce3..faa813bdd 100644 --- a/projects/RabbitMQ.Client/Impl/RabbitMQActivitySource.cs +++ b/projects/RabbitMQ.Client/Impl/RabbitMQActivitySource.cs @@ -66,7 +66,7 @@ public static bool UseRoutingKeyAsOperationName new KeyValuePair(ProtocolVersion, "0.9.1") }; - internal static Activity? BasicPublish(string routingKey, string exchange, int bodySize, + internal static Activity? BasicPublish(string routingKey, string exchange, int bodySize, IReadOnlyBasicProperties basicProperties, ActivityContext linkedContext = default) { if (!s_publisherSource.HasListeners()) @@ -83,7 +83,7 @@ public static bool UseRoutingKeyAsOperationName ActivityKind.Producer, linkedContext); if (activity != null && activity.IsAllDataRequested) { - PopulateMessagingTags(MessagingOperationTypeSend, MessagingOperationNameBasicPublish, routingKey, exchange, 0, bodySize, activity); + PopulateMessagingTags(MessagingOperationTypeSend, MessagingOperationNameBasicPublish, routingKey, exchange, 0, basicProperties, bodySize, activity); } return activity; diff --git a/projects/Test/SequentialIntegration/TestActivitySource.cs b/projects/Test/SequentialIntegration/TestActivitySource.cs index 6591638ce..08ae4406c 100644 --- a/projects/Test/SequentialIntegration/TestActivitySource.cs +++ b/projects/Test/SequentialIntegration/TestActivitySource.cs @@ -75,120 +75,6 @@ void AssertIntTagGreaterThanZero(Activity activity, string name) Assert.True(activity.GetTagItem(name) is int result && result > 0); } - [Theory] - [InlineData(true, true)] - [InlineData(true, false)] - [InlineData(false, true)] - [InlineData(false, false)] - public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent) - { - RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; - RabbitMQActivitySource.TracingOptions.UsePublisherAsParent = usePublisherAsParent; - var _activities = new List(); - using ActivityListener activityListener = StartActivityListener(_activities); - await Task.Delay(500); - string queueName = $"{Guid.NewGuid()}"; - QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); - byte[] sendBody = Encoding.UTF8.GetBytes("hi"); - byte[] consumeBody = null; - var consumer = new AsyncEventingBasicConsumer(_channel); - var consumerReceivedTcs = - new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - consumer.ReceivedAsync += (o, a) => - { - consumeBody = a.Body.ToArray(); - consumerReceivedTcs.SetResult(true); - return Task.CompletedTask; - }; - - string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); - await _channel.BasicPublishAsync("", q.QueueName, true, sendBody); - - await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); - Assert.True(await consumerReceivedTcs.Task); - - await _channel.BasicCancelAsync(consumerTag); - await Task.Delay(500); - AssertActivityData(useRoutingKeyAsOperationName, usePublisherAsParent, queueName, _activities, true); - } - - [Theory] - [InlineData(true, true)] - [InlineData(true, false)] - [InlineData(false, true)] - [InlineData(false, false)] - public async Task TestPublisherWithCachedStringsAndConsumerActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent) - { - RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; - RabbitMQActivitySource.TracingOptions.UsePublisherAsParent = usePublisherAsParent; - var _activities = new List(); - using ActivityListener activityListener = StartActivityListener(_activities); - await Task.Delay(500); - string queueName = $"{Guid.NewGuid()}"; - QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); - byte[] sendBody = Encoding.UTF8.GetBytes("hi"); - byte[] consumeBody = null; - var consumer = new AsyncEventingBasicConsumer(_channel); - var consumerReceivedTcs = - new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - consumer.ReceivedAsync += (o, a) => - { - consumeBody = a.Body.ToArray(); - consumerReceivedTcs.SetResult(true); - return Task.CompletedTask; - }; - - string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); - CachedString exchange = new CachedString(""); - CachedString routingKey = new CachedString(q.QueueName); - await _channel.BasicPublishAsync(exchange, routingKey, true, sendBody); - - await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); - Assert.True(await consumerReceivedTcs.Task); - - await _channel.BasicCancelAsync(consumerTag); - await Task.Delay(500); - AssertActivityData(useRoutingKeyAsOperationName, usePublisherAsParent, queueName, _activities, true); - } - - [Theory] - [InlineData(true, true)] - [InlineData(true, false)] - [InlineData(false, true)] - [InlineData(false, false)] - public async Task TestPublisherWithPublicationAddressAndConsumerActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent) - { - RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; - RabbitMQActivitySource.TracingOptions.UsePublisherAsParent = usePublisherAsParent; - var _activities = new List(); - using ActivityListener activityListener = StartActivityListener(_activities); - await Task.Delay(500); - string queueName = $"{Guid.NewGuid()}"; - QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); - byte[] sendBody = Encoding.UTF8.GetBytes("hi"); - byte[] consumeBody = null; - var consumer = new AsyncEventingBasicConsumer(_channel); - var consumerReceivedTcs = - new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - consumer.ReceivedAsync += (o, a) => - { - consumeBody = a.Body.ToArray(); - consumerReceivedTcs.SetResult(true); - return Task.CompletedTask; - }; - - string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); - PublicationAddress publicationAddress = new PublicationAddress(ExchangeType.Direct, "", q.QueueName); - await _channel.BasicPublishAsync(publicationAddress, new BasicProperties(), sendBody); - - await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); - Assert.True(await consumerReceivedTcs.Task); - - await _channel.BasicCancelAsync(consumerTag); - await Task.Delay(500); - AssertActivityData(useRoutingKeyAsOperationName, usePublisherAsParent, queueName, _activities, true); - } - [Theory] [InlineData(true, true)] [InlineData(true, false)] @@ -307,11 +193,15 @@ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsyn } [Theory] - [InlineData(true, true)] - [InlineData(true, false)] - [InlineData(false, true)] - [InlineData(false, false)] - public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent) + [InlineData(true, true, true)] + [InlineData(true, true, false)] + [InlineData(true, false, true)] + [InlineData(true, false, false)] + [InlineData(false, true, true)] + [InlineData(false, true, false)] + [InlineData(false, false, true)] + [InlineData(false, false, false)] + public async Task TestPublisherAndBasicGetActivityTagsAsync(bool useRoutingKeyAsOperationName, bool usePublisherAsParent, bool useMessageId) { RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; RabbitMQActivitySource.TracingOptions.UsePublisherAsParent = usePublisherAsParent; @@ -321,10 +211,12 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera string queue = $"queue-{Guid.NewGuid()}"; const string msg = "for basic.get"; + var basicProps = useMessageId ? new BasicProperties() { MessageId = Guid.NewGuid().ToString() } : new BasicProperties(); + try { await _channel.QueueDeclareAsync(queue, false, false, false, null); - await _channel.BasicPublishAsync("", queue, true, Encoding.UTF8.GetBytes(msg)); + await _channel.BasicPublishAsync("", queue, true, basicProps, Encoding.UTF8.GetBytes(msg)); QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(queue); Assert.Equal(1u, ok.MessageCount); BasicGetResult res = await _channel.BasicGetAsync(queue, true); @@ -332,7 +224,7 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera ok = await _channel.QueueDeclarePassiveAsync(queue); Assert.Equal(0u, ok.MessageCount); await Task.Delay(500); - AssertActivityData(useRoutingKeyAsOperationName, usePublisherAsParent, queue, activities, false); + AssertActivityData(useRoutingKeyAsOperationName, usePublisherAsParent, queue, activities, false, basicProps.MessageId); } finally { @@ -345,7 +237,7 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera [InlineData(true, false)] [InlineData(false, true)] [InlineData(false, false)] - public async Task TestPublisherWithCachedStringsAndBasicGetActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent) + public async Task TestPublisherWithCachedStringsAndBasicGetActivityTagsAsync(bool useRoutingKeyAsOperationName, bool usePublisherAsParent) { RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; RabbitMQActivitySource.TracingOptions.UsePublisherAsParent = usePublisherAsParent; @@ -381,7 +273,7 @@ public async Task TestPublisherWithCachedStringsAndBasicGetActivityTags(bool use [InlineData(true, false)] [InlineData(false, true)] [InlineData(false, false)] - public async Task TestPublisherWithPublicationAddressAndBasicGetActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent) + public async Task TestPublisherWithPublicationAddressAndBasicGetActivityTagsAsync(bool useRoutingKeyAsOperationName, bool usePublisherAsParent) { RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; RabbitMQActivitySource.TracingOptions.UsePublisherAsParent = usePublisherAsParent; @@ -427,7 +319,7 @@ private static ActivityListener StartActivityListener(List activities) } private void AssertActivityData(bool useRoutingKeyAsOperationName, bool usePublisherAsParent, string queueName, - List activityList, bool isDeliver = false) + List activityList, bool isDeliver = false, string messageId = null) { string childName = isDeliver ? "deliver" : "fetch"; Activity[] activities = activityList.ToArray(); @@ -480,6 +372,12 @@ private void AssertActivityData(bool useRoutingKeyAsOperationName, bool usePubli AssertIntTagGreaterThanZero(sendActivity, RabbitMQActivitySource.MessagingEnvelopeSize); AssertIntTagGreaterThanZero(sendActivity, RabbitMQActivitySource.MessagingBodySize); AssertIntTagGreaterThanZero(receiveActivity, RabbitMQActivitySource.MessagingBodySize); + + if (messageId is not null) + { + AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessageId, messageId); + AssertStringTagEquals(receiveActivity, RabbitMQActivitySource.MessageId, messageId); + } } } } diff --git a/projects/Test/SequentialIntegration/TestHeartbeats.cs b/projects/Test/SequentialIntegration/TestHeartbeats.cs index 976ccbb1d..520cdbfbc 100644 --- a/projects/Test/SequentialIntegration/TestHeartbeats.cs +++ b/projects/Test/SequentialIntegration/TestHeartbeats.cs @@ -59,7 +59,7 @@ public override Task InitializeAsync() [SkippableFact(Timeout = 35000)] [Trait("Category", "LongRunning")] - public async Task TestThatHeartbeatWriterUsesConfigurableInterval() + public async Task TestThatHeartbeatWriterUsesConfigurableIntervalAsync() { Skip.IfNot(LongRunningTestsEnabled(), "RABBITMQ_LONG_RUNNING_TESTS is not set, skipping test"); @@ -72,7 +72,7 @@ public async Task TestThatHeartbeatWriterUsesConfigurableInterval() [SkippableFact] [Trait("Category", "LongRunning")] - public async Task TestThatHeartbeatWriterWithTLSEnabled() + public async Task TestThatHeartbeatWriterWithTLSEnabledAsync() { Skip.IfNot(LongRunningTestsEnabled(), "RABBITMQ_LONG_RUNNING_TESTS is not set, skipping test"); @@ -94,7 +94,7 @@ public async Task TestThatHeartbeatWriterWithTLSEnabled() [SkippableFact(Timeout = 90000)] [Trait("Category", "LongRunning")] - public async Task TestHundredsOfConnectionsWithRandomHeartbeatInterval() + public async Task TestHundredsOfConnectionsWithRandomHeartbeatIntervalAsync() { Skip.IfNot(LongRunningTestsEnabled(), "RABBITMQ_LONG_RUNNING_TESTS is not set, skipping test"); diff --git a/projects/Test/SequentialIntegration/TestOpenTelemetry.cs b/projects/Test/SequentialIntegration/TestOpenTelemetry.cs index 254c310b5..0523e9836 100644 --- a/projects/Test/SequentialIntegration/TestOpenTelemetry.cs +++ b/projects/Test/SequentialIntegration/TestOpenTelemetry.cs @@ -94,64 +94,6 @@ public void TestDefaultTracingOptions() Assert.True(RabbitMQActivitySource.TracingOptions.UsePublisherAsParent); } - [Theory] - [InlineData(true, true)] - [InlineData(true, false)] - [InlineData(false, true)] - [InlineData(false, false)] - public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent) - { - var exportedItems = new List(); - using var tracer = Sdk.CreateTracerProviderBuilder() - .AddRabbitMQInstrumentation(options => - { - options.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; - options.UsePublisherAsParent = usePublisherAsParent; - }) - .AddInMemoryExporter(exportedItems) - .Build(); - string baggageGuid = Guid.NewGuid().ToString(); - Baggage.SetBaggage("TestItem", baggageGuid); - Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem")); - - await Task.Delay(500); - string queueName = $"{Guid.NewGuid()}"; - QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); - byte[] sendBody = Encoding.UTF8.GetBytes("hi"); - byte[] consumeBody = null; - var consumer = new AsyncEventingBasicConsumer(_channel); - var consumerReceivedTcs = - new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - consumer.ReceivedAsync += (o, a) => - { - consumeBody = a.Body.ToArray(); - string baggageItem = Baggage.GetBaggage("TestItem"); - if (baggageItem == baggageGuid) - { - consumerReceivedTcs.SetResult(true); - } - else - { - consumerReceivedTcs.SetException( - EqualException.ForMismatchedStrings(baggageGuid, baggageItem, 0, 0)); - } - - return Task.CompletedTask; - }; - - string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); - await _channel.BasicPublishAsync("", q.QueueName, true, sendBody); - Baggage.ClearBaggage(); - Assert.Null(Baggage.GetBaggage("TestItem")); - - await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); - Assert.True(await consumerReceivedTcs.Task); - - await _channel.BasicCancelAsync(consumerTag); - await Task.Delay(500); - AssertActivityData(useRoutingKeyAsOperationName, usePublisherAsParent, queueName, exportedItems, true); - } - [Theory] [InlineData(true, true)] [InlineData(true, false)] @@ -333,11 +275,15 @@ public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(boo } [Theory] - [InlineData(true, true)] - [InlineData(true, false)] - [InlineData(false, true)] - [InlineData(false, false)] - public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent) + [InlineData(true, true, true)] + [InlineData(true, true, false)] + [InlineData(true, false, true)] + [InlineData(true, false, false)] + [InlineData(false, true, true)] + [InlineData(false, true, false)] + [InlineData(false, false, true)] + [InlineData(false, false, false)] + public async Task TestPublisherAndBasicGetActivityTagsAsync(bool useRoutingKeyAsOperationName, bool usePublisherAsParent, bool useMessageId) { var exportedItems = new List(); using var tracer = Sdk.CreateTracerProviderBuilder() @@ -355,10 +301,12 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera string queue = $"queue-{Guid.NewGuid()}"; const string msg = "for basic.get"; + var basicProps = useMessageId ? new BasicProperties() { MessageId = Guid.NewGuid().ToString() } : new BasicProperties(); + try { await _channel.QueueDeclareAsync(queue, false, false, false, null); - await _channel.BasicPublishAsync("", queue, true, Encoding.UTF8.GetBytes(msg)); + await _channel.BasicPublishAsync("", queue, true, basicProps, Encoding.UTF8.GetBytes(msg)); Baggage.ClearBaggage(); Assert.Null(Baggage.GetBaggage("TestItem")); QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(queue); @@ -368,7 +316,7 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera ok = await _channel.QueueDeclarePassiveAsync(queue); Assert.Equal(0u, ok.MessageCount); await Task.Delay(500); - AssertActivityData(useRoutingKeyAsOperationName, usePublisherAsParent, queue, exportedItems, false); + AssertActivityData(useRoutingKeyAsOperationName, usePublisherAsParent, queue, exportedItems, false, basicProps.MessageId); } finally { @@ -377,7 +325,7 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera } private void AssertActivityData(bool useRoutingKeyAsOperationName, bool usePublisherAsParent, string queueName, - List activityList, bool isDeliver = false, string baggageGuid = null) + List activityList, bool isDeliver = false, string messageId = null) { string childName = isDeliver ? "deliver" : "fetch"; string childType = isDeliver ? "process" : "receive"; @@ -432,6 +380,11 @@ private void AssertActivityData(bool useRoutingKeyAsOperationName, bool usePubli AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessagingOperationType, "send"); AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessagingOperationName, "publish"); + if (messageId is not null) + { + AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessageId, messageId); + AssertStringTagEquals(receiveActivity, RabbitMQActivitySource.MessageId, messageId); + } } } }