Skip to content

Closes#1849: include a message id tag in rabbitmqactivitysource for published messages #1852

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ await MaybeEnforceFlowControlAsync(cancellationToken)
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);

using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
? RabbitMQActivitySource.BasicPublish(routingKey, exchange, body.Length)
? RabbitMQActivitySource.BasicPublish(routingKey, exchange, body.Length, basicProperties)
: default;

ulong publishSequenceNumber = 0;
Expand Down Expand Up @@ -116,7 +116,7 @@ await MaybeEnforceFlowControlAsync(cancellationToken)
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);

using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
? RabbitMQActivitySource.BasicPublish(routingKey.Value, exchange.Value, body.Length)
? RabbitMQActivitySource.BasicPublish(routingKey.Value, exchange.Value, body.Length, basicProperties)
: default;

ulong publishSequenceNumber = 0;
Expand Down
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/Impl/RabbitMQActivitySource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public static bool UseRoutingKeyAsOperationName
new KeyValuePair<string, object?>(ProtocolVersion, "0.9.1")
};

internal static Activity? BasicPublish(string routingKey, string exchange, int bodySize,
internal static Activity? BasicPublish(string routingKey, string exchange, int bodySize, IReadOnlyBasicProperties basicProperties,
ActivityContext linkedContext = default)
{
if (!s_publisherSource.HasListeners())
Expand All @@ -83,7 +83,7 @@ public static bool UseRoutingKeyAsOperationName
ActivityKind.Producer, linkedContext);
if (activity != null && activity.IsAllDataRequested)
{
PopulateMessagingTags(MessagingOperationTypeSend, MessagingOperationNameBasicPublish, routingKey, exchange, 0, bodySize, activity);
PopulateMessagingTags(MessagingOperationTypeSend, MessagingOperationNameBasicPublish, routingKey, exchange, 0, basicProperties, bodySize, activity);
}

return activity;
Expand Down
28 changes: 20 additions & 8 deletions projects/Test/SequentialIntegration/TestActivitySource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -307,11 +307,15 @@ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsyn
}

[Theory]
[InlineData(true, true)]
[InlineData(true, false)]
[InlineData(false, true)]
[InlineData(false, false)]
public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent)
[InlineData(true, true, true)]
[InlineData(true, true, false)]
[InlineData(true, false, true)]
[InlineData(true, false, false)]
[InlineData(false, true, true)]
[InlineData(false, true, false)]
[InlineData(false, false, true)]
[InlineData(false, false, false)]
public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent, bool useMessageId)
{
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
RabbitMQActivitySource.TracingOptions.UsePublisherAsParent = usePublisherAsParent;
Expand All @@ -321,18 +325,20 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera
string queue = $"queue-{Guid.NewGuid()}";
const string msg = "for basic.get";

var basicProps = useMessageId ? new BasicProperties() { MessageId = Guid.NewGuid().ToString() } : new BasicProperties();

try
{
await _channel.QueueDeclareAsync(queue, false, false, false, null);
await _channel.BasicPublishAsync("", queue, true, Encoding.UTF8.GetBytes(msg));
await _channel.BasicPublishAsync("", queue, true, basicProps, Encoding.UTF8.GetBytes(msg));
QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(queue);
Assert.Equal(1u, ok.MessageCount);
BasicGetResult res = await _channel.BasicGetAsync(queue, true);
Assert.Equal(msg, Encoding.UTF8.GetString(res.Body.ToArray()));
ok = await _channel.QueueDeclarePassiveAsync(queue);
Assert.Equal(0u, ok.MessageCount);
await Task.Delay(500);
AssertActivityData(useRoutingKeyAsOperationName, usePublisherAsParent, queue, activities, false);
AssertActivityData(useRoutingKeyAsOperationName, usePublisherAsParent, queue, activities, false, basicProps.MessageId);
}
finally
{
Expand Down Expand Up @@ -427,7 +433,7 @@ private static ActivityListener StartActivityListener(List<Activity> activities)
}

private void AssertActivityData(bool useRoutingKeyAsOperationName, bool usePublisherAsParent, string queueName,
List<Activity> activityList, bool isDeliver = false)
List<Activity> activityList, bool isDeliver = false, string messageId = null)
{
string childName = isDeliver ? "deliver" : "fetch";
Activity[] activities = activityList.ToArray();
Expand Down Expand Up @@ -480,6 +486,12 @@ private void AssertActivityData(bool useRoutingKeyAsOperationName, bool usePubli
AssertIntTagGreaterThanZero(sendActivity, RabbitMQActivitySource.MessagingEnvelopeSize);
AssertIntTagGreaterThanZero(sendActivity, RabbitMQActivitySource.MessagingBodySize);
AssertIntTagGreaterThanZero(receiveActivity, RabbitMQActivitySource.MessagingBodySize);

if (messageId is not null)
{
AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessageId, messageId);
AssertStringTagEquals(receiveActivity, RabbitMQActivitySource.MessageId, messageId);
}
}
}
}
27 changes: 19 additions & 8 deletions projects/Test/SequentialIntegration/TestOpenTelemetry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -333,11 +333,15 @@ public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(boo
}

[Theory]
[InlineData(true, true)]
[InlineData(true, false)]
[InlineData(false, true)]
[InlineData(false, false)]
public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent)
[InlineData(true, true, true)]
[InlineData(true, true, false)]
[InlineData(true, false, true)]
[InlineData(true, false, false)]
[InlineData(false, true, true)]
[InlineData(false, true, false)]
[InlineData(false, false, true)]
[InlineData(false, false, false)]
public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent, bool useMessageId)
{
var exportedItems = new List<Activity>();
using var tracer = Sdk.CreateTracerProviderBuilder()
Expand All @@ -355,10 +359,12 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera
string queue = $"queue-{Guid.NewGuid()}";
const string msg = "for basic.get";

var basicProps = useMessageId ? new BasicProperties() { MessageId = Guid.NewGuid().ToString() } : new BasicProperties();

try
{
await _channel.QueueDeclareAsync(queue, false, false, false, null);
await _channel.BasicPublishAsync("", queue, true, Encoding.UTF8.GetBytes(msg));
await _channel.BasicPublishAsync("", queue, true, basicProps, Encoding.UTF8.GetBytes(msg));
Baggage.ClearBaggage();
Assert.Null(Baggage.GetBaggage("TestItem"));
QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(queue);
Expand All @@ -368,7 +374,7 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera
ok = await _channel.QueueDeclarePassiveAsync(queue);
Assert.Equal(0u, ok.MessageCount);
await Task.Delay(500);
AssertActivityData(useRoutingKeyAsOperationName, usePublisherAsParent, queue, exportedItems, false);
AssertActivityData(useRoutingKeyAsOperationName, usePublisherAsParent, queue, exportedItems, false, basicProps.MessageId);
}
finally
{
Expand All @@ -377,7 +383,7 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera
}

private void AssertActivityData(bool useRoutingKeyAsOperationName, bool usePublisherAsParent, string queueName,
List<Activity> activityList, bool isDeliver = false, string baggageGuid = null)
List<Activity> activityList, bool isDeliver = false, string messageId = null)
{
string childName = isDeliver ? "deliver" : "fetch";
string childType = isDeliver ? "process" : "receive";
Expand Down Expand Up @@ -432,6 +438,11 @@ private void AssertActivityData(bool useRoutingKeyAsOperationName, bool usePubli
AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessagingOperationType, "send");
AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessagingOperationName, "publish");

if (messageId is not null)
{
AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessageId, messageId);
AssertStringTagEquals(receiveActivity, RabbitMQActivitySource.MessageId, messageId);
}
}
}
}