Skip to content

[dotnet] Implement .NET system test for SNS #4897

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jul 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion manifests/dotnet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ tests/:
Test_DsmRabbitmq: v3.21.0
Test_DsmRabbitmq_FanoutExchange: v3.21.0
Test_DsmRabbitmq_TopicExchange: v3.21.0
Test_DsmSNS: missing_feature
Test_DsmSNS: v3.21.0
Test_DsmSQS: v2.48.0
Test_Dsm_Manual_Checkpoint_Inter_Process: missing_feature
Test_Dsm_Manual_Checkpoint_Intra_Process: missing_feature
Expand Down
3 changes: 2 additions & 1 deletion tests/integrations/test_dsm.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,8 @@ def setup_dsm_sns(self):
def test_dsm_sns(self):
assert self.r.text == "ok"

topic = self.topic if context.library == "java" else f"arn:aws:sns:us-east-1:{AWS_ACCT}:{self.topic}"
arn = f"arn:aws:sns:us-east-1:{AWS_ACCT}:{self.topic}"
topic = self.topic if context.library in ["java", "dotnet"] else arn
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SNS api for .NET & Java allow sending messages just by topic name, can't deduce full ARN for these tracers


if context.library == "nodejs":
producer_hash = 15466202493380574985 if AWS_TESTING == "remote" else 3703335291192845713
Expand Down
190 changes: 190 additions & 0 deletions utils/build/docker/dotnet/weblog/Endpoints/DsmEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
using Microsoft.AspNetCore.Http;
using System.Collections.Generic;
using System;
using System.IO;
using System.Net;
using System.Globalization;
using System.Threading;
using System.Threading.Tasks;
using Amazon.SQS;
using Amazon.SQS.Model;
using Amazon.SimpleNotificationService;
using Amazon.SimpleNotificationService.Model;
using RabbitMQ.Client;

namespace weblog
Expand All @@ -25,6 +28,7 @@ public void Register(Microsoft.AspNetCore.Routing.IEndpointRouteBuilder routeBui
string routing_key = context.Request.Query["routing_key"]!;
string group = context.Request.Query["group"]!;
string message = context.Request.Query["message"]!;
string topic = context.Request.Query["topic"]!;

Console.WriteLine("Hello World! Received dsm call with integration " + integration);
if ("kafka".Equals(integration)) {
Expand Down Expand Up @@ -57,6 +61,13 @@ public void Register(Microsoft.AspNetCore.Routing.IEndpointRouteBuilder routeBui
Console.WriteLine($"[SQS] Begin consuming DSM message: {message}");
await Task.Run(() => SqsConsumer.DoWork(queue, message));
await context.Response.WriteAsync("ok");
} else if ("sns".Equals(integration))
{
Console.WriteLine($"[SNS] Begin producing DSM message: {message}");
await Task.Run(() => SnsProducer.DoWork(queue, topic, message));
Console.WriteLine($"[SNS] Begin consuming DSM message: {message}");
await Task.Run(() => SnsConsumer.DoWork(queue, message));
await context.Response.WriteAsync("ok");
} else {
await context.Response.WriteAsync("unknown integration: " + integration);
}
Expand Down Expand Up @@ -308,4 +319,183 @@ public static async Task DoWork(string queue, string message)
}
}
}

class SnsProducer
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This practically follows the Java implementation as close as possible

{
public static async Task DoWork(string queue, string topic, string message)
{
string? awsUrl = Environment.GetEnvironmentVariable("SYSTEM_TESTS_AWS_URL");

IAmazonSimpleNotificationService snsClient;
IAmazonSQS sqsClient;
if (!string.IsNullOrEmpty(awsUrl))
{
// If SYSTEM_TESTS_AWS_URL is set, use it for ServiceURL
snsClient = new AmazonSimpleNotificationServiceClient(new AmazonSimpleNotificationServiceConfig { ServiceURL = awsUrl });
sqsClient = new AmazonSQSClient(new AmazonSQSConfig { ServiceURL = awsUrl });
}
else
{
// If SYSTEM_TESTS_AWS_URL is not set, create default clients
snsClient = new AmazonSimpleNotificationServiceClient();
sqsClient = new AmazonSQSClient();
}

// Create SNS topic
Console.WriteLine($"[SNS] Produce: Creating topic {topic}");
CreateTopicResponse createTopicResponse = await snsClient.CreateTopicAsync(topic);
string topicArn = createTopicResponse.TopicArn;

// Create SQS queue
Console.WriteLine($"[SNS] Produce: Creating queue {queue}");
CreateQueueResponse createQueueResponse = await sqsClient.CreateQueueAsync(queue);
string queueUrl = createQueueResponse.QueueUrl;

// Get queue ARN
GetQueueAttributesResponse queueAttributes = await sqsClient.GetQueueAttributesAsync(new GetQueueAttributesRequest
{
QueueUrl = queueUrl,
AttributeNames = new List<string> { "QueueArn" }
});
string queueArn = queueAttributes.Attributes["QueueArn"];

// Set queue policy to allow SNS to send messages
string policy = $@"{{
""Version"": ""2012-10-17"",
""Id"": ""{queueArn}/SQSDefaultPolicy"",
""Statement"": [
{{
""Sid"": ""Allow-SNS-SendMessage"",
""Effect"": ""Allow"",
""Principal"": {{
""Service"": ""sns.amazonaws.com""
}},
""Action"": ""sqs:SendMessage"",
""Resource"": ""{queueArn}"",
""Condition"": {{
""ArnEquals"": {{
""aws:SourceArn"": ""{topicArn}""
}}
}}
}}
]
}}";

await sqsClient.SetQueueAttributesAsync(new SetQueueAttributesRequest
{
QueueUrl = queueUrl,
Attributes = new Dictionary<string, string>
{
{ "Policy", policy }
}
});

// Subscribe queue to topic
await snsClient.SubscribeAsync(new SubscribeRequest
{
TopicArn = topicArn,
Protocol = "sqs",
Endpoint = queueArn,
Attributes = new Dictionary<string, string>
{
{ "RawMessageDelivery", "true" }
}
});

using (Datadog.Trace.Tracer.Instance.StartActive("SnsProduce"))
{
// Publish message to SNS topic
await snsClient.PublishAsync(new PublishRequest
{
TopicArn = topicArn,
Message = message
});
Console.WriteLine($"[SNS] Done with producing message: {message}");
}
}
}

class SnsConsumer
{
public static async Task DoWork(string queue, string message)
{
string? awsUrl = Environment.GetEnvironmentVariable("SYSTEM_TESTS_AWS_URL");

IAmazonSQS sqsClient;
if (!string.IsNullOrEmpty(awsUrl))
{
// If SYSTEM_TESTS_AWS_URL is set, use it for ServiceURL
sqsClient = new AmazonSQSClient(new AmazonSQSConfig { ServiceURL = awsUrl });
}
else
{
// If SYSTEM_TESTS_AWS_URL is not set, create a default client
sqsClient = new AmazonSQSClient();
}

// Create queue
Console.WriteLine($"[SNS] Consume: Creating queue {queue}");
CreateQueueResponse responseCreate = await sqsClient.CreateQueueAsync(queue);
var qUrl = responseCreate.QueueUrl;

Console.WriteLine($"[SNS] looking for messages in queue {qUrl}");

bool continueProcessing = true;

while (continueProcessing)
{
using (Datadog.Trace.Tracer.Instance.StartActive("SnsConsume"))
{
var result = await sqsClient.ReceiveMessageAsync(new ReceiveMessageRequest
{
QueueUrl = qUrl,
MaxNumberOfMessages = 1,
WaitTimeSeconds = 1
});

if (result == null || result.Messages.Count == 0)
{
Console.WriteLine("[SNS] No messages to consume at this time");
await Task.Delay(1000);
continue;
}

var receivedMessage = result.Messages[0];
Console.WriteLine("[SNS] Message dump:");
Console.WriteLine($" MessageId: {receivedMessage.MessageId}");
Console.WriteLine($" ReceiptHandle: {receivedMessage.ReceiptHandle}");
Console.WriteLine($" MD5OfBody: {receivedMessage.MD5OfBody}");
Console.WriteLine($" Body: {receivedMessage.Body}");

if (receivedMessage.Attributes != null && receivedMessage.Attributes.Count > 0)
{
Console.WriteLine(" Attributes:");
foreach (var attr in receivedMessage.Attributes)
{
Console.WriteLine($" {attr.Key}: {attr.Value}");
}
}

if (receivedMessage.MessageAttributes != null && receivedMessage.MessageAttributes.Count > 0)
{
Console.WriteLine(" MessageAttributes:");
foreach (var attr in receivedMessage.MessageAttributes)
{
Console.WriteLine($" {attr.Key}: {attr.Value.StringValue}");
}
}

// Check if the message body matches directly
if (receivedMessage.Body == message)
{
Console.WriteLine($"[SNS] Consumed message from {qUrl}: {receivedMessage.Body}");
continueProcessing = false;
break;
}

await Task.Delay(1000);
}
}
}
}
}
1 change: 1 addition & 0 deletions utils/build/docker/dotnet/weblog/app.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

<ItemGroup>
<PackageReference Include="AWSSDK.SQS" Version="3.7.300.52"/>
<PackageReference Include="AWSSDK.SimpleNotificationService" Version="3.7.300.52"/>
<PackageReference Include="Google.Protobuf" Version="3.25.3" />
<PackageReference Include="MySql.Data" Version="8.0.30"/>
<PackageReference Include="Npgsql" Version="4.0.10"/>
Expand Down