diff --git a/manifests/dotnet.yml b/manifests/dotnet.yml index ef095e3f649..e1deff834ef 100644 --- a/manifests/dotnet.yml +++ b/manifests/dotnet.yml @@ -491,7 +491,7 @@ tests/: Test_DsmContext_Injection_Base64: missing_feature Test_DsmHttp: missing_feature Test_DsmKafka: v2.29.0 - Test_DsmKinesis: missing_feature + Test_DsmKinesis: v3.19.0 Test_DsmRabbitmq: v2.29.0 Test_DsmRabbitmq_FanoutExchange: missing_feature Test_DsmRabbitmq_TopicExchange: missing_feature diff --git a/tests/integrations/test_dsm.py b/tests/integrations/test_dsm.py index e37c6bc0cb9..999255f090a 100644 --- a/tests/integrations/test_dsm.py +++ b/tests/integrations/test_dsm.py @@ -383,6 +383,11 @@ def test_dsm_kinesis(self): tags_in = ("direction:in", f"topic:{self.stream}", "type:kinesis") producer_hash = 2387568642918822206 consumer_hash = 10101425062685840509 + elif context.library == "dotnet": + tags_out = ("direction:out", f"topic:{self.stream}", "type:kinesis") + tags_in = ("direction:in", f"topic:{self.stream}", "type:kinesis") + producer_hash = compute_dsm_hash(0, tags_out) + consumer_hash = compute_dsm_hash(producer_hash, tags_in) else: tags_out = ("direction:out", f"topic:{stream_arn}", "type:kinesis") tags_in = ("direction:in", f"topic:{stream_arn}", "type:kinesis") diff --git a/utils/build/docker/dotnet/weblog/Endpoints/DsmEndpoint.cs b/utils/build/docker/dotnet/weblog/Endpoints/DsmEndpoint.cs index e1eb9120c18..b5f0b5b44fb 100644 --- a/utils/build/docker/dotnet/weblog/Endpoints/DsmEndpoint.cs +++ b/utils/build/docker/dotnet/weblog/Endpoints/DsmEndpoint.cs @@ -3,13 +3,17 @@ using Microsoft.AspNetCore.Http; using System.Collections.Generic; using System; +using System.IO; using System.Net; using System.Globalization; using System.Threading; using System.Threading.Tasks; using Amazon.SQS; using Amazon.SQS.Model; +using Amazon.Kinesis; +using Amazon.Kinesis.Model; using RabbitMQ.Client; +using Newtonsoft.Json; namespace weblog { @@ -25,6 +29,7 @@ public void Register(Microsoft.AspNetCore.Routing.IEndpointRouteBuilder routeBui string routing_key = context.Request.Query["routing_key"]!; string group = context.Request.Query["group"]!; string message = context.Request.Query["message"]!; + string stream = context.Request.Query["stream"]!; Console.WriteLine("Hello World! Received dsm call with integration " + integration); if ("kafka".Equals(integration)) { @@ -53,6 +58,12 @@ public void Register(Microsoft.AspNetCore.Routing.IEndpointRouteBuilder routeBui Console.WriteLine($"[SQS] Begin consuming DSM message: {message}"); await Task.Run(() => SqsConsumer.DoWork(queue, message)); await context.Response.WriteAsync("ok"); + } else if ("kinesis".Equals(integration)) { + Console.WriteLine($"[Kinesis] Begin producing DSM message: {message}"); + await Task.Run(() => KinesisProducer.DoWork(stream, message)); + Console.WriteLine($"[Kinesis] Begin consuming DSM message: {message}"); + await Task.Run(() => KinesisConsumer.DoWork(stream, message)); + await context.Response.WriteAsync("ok"); } else { await context.Response.WriteAsync("unknown integration: " + integration); } @@ -253,4 +264,220 @@ public static async Task DoWork(string queue, string message) } } } + + class KinesisProducer + { + public static async Task DoWork(string stream, string message) + { + string awsUrl = Environment.GetEnvironmentVariable("SYSTEM_TESTS_AWS_URL"); + + IAmazonKinesis kinesisClient; + if (!string.IsNullOrEmpty(awsUrl)) + { + // If SYSTEM_TESTS_AWS_URL is set, use it for ServiceURL + var config = new AmazonKinesisConfig { ServiceURL = awsUrl }; + kinesisClient = new AmazonKinesisClient(config); + } + else + { + // If SYSTEM_TESTS_AWS_URL is not set, create a default client + kinesisClient = new AmazonKinesisClient(); + } + + // Create stream + Console.WriteLine($"[Kinesis] Produce: Creating stream {stream}"); + try + { + await kinesisClient.CreateStreamAsync(new CreateStreamRequest + { + StreamName = stream, + ShardCount = 1 + }); + Console.WriteLine($"[Kinesis] Created stream {stream}"); + } + catch (Exception e) + { + Console.WriteLine($"[Kinesis] Error creating stream (may already exist): {e.Message}"); + } + + // Wait for stream to be active + bool streamActive = false; + DateTime startTime = DateTime.UtcNow; + while (!streamActive && DateTime.UtcNow - startTime < TimeSpan.FromMinutes(2)) + { + try + { + var describeResponse = await kinesisClient.DescribeStreamAsync(new DescribeStreamRequest + { + StreamName = stream + }); + + if (describeResponse.StreamDescription.StreamStatus == StreamStatus.ACTIVE) + { + streamActive = true; + Console.WriteLine($"[Kinesis] Stream {stream} is now active"); + } + else + { + await Task.Delay(1000); + } + } + catch (Exception e) + { + Console.WriteLine($"[Kinesis] Error describing stream: {e.Message}"); + await Task.Delay(1000); + } + } + + if (!streamActive) + { + Console.WriteLine($"[Kinesis] Stream {stream} did not become active in time"); + return; + } + + // Prepare message as JSON (matching Python implementation) + var messageObj = new { message = message }; + string jsonMessage = JsonConvert.SerializeObject(messageObj); + byte[] messageBytes = System.Text.Encoding.UTF8.GetBytes(jsonMessage); + + using (Datadog.Trace.Tracer.Instance.StartActive("KinesisProduce")) + { + try + { + var putRecordRequest = new PutRecordRequest + { + StreamName = stream, + Data = new MemoryStream(messageBytes), + PartitionKey = "1" + }; + + var putRecordResponse = await kinesisClient.PutRecordAsync(putRecordRequest); + Console.WriteLine($"[Kinesis] Successfully produced message to stream {stream}. Sequence number: {putRecordResponse.SequenceNumber}"); + } + catch (Exception e) + { + Console.WriteLine($"[Kinesis] Error producing message: {e.Message}"); + } + } + } + } + + class KinesisConsumer + { + public static async Task DoWork(string stream, string expectedMessage) + { + string awsUrl = Environment.GetEnvironmentVariable("SYSTEM_TESTS_AWS_URL"); + + IAmazonKinesis kinesisClient; + if (!string.IsNullOrEmpty(awsUrl)) + { + // If SYSTEM_TESTS_AWS_URL is set, use it for ServiceURL + var config = new AmazonKinesisConfig { ServiceURL = awsUrl }; + kinesisClient = new AmazonKinesisClient(config); + } + else + { + // If SYSTEM_TESTS_AWS_URL is not set, create a default client + kinesisClient = new AmazonKinesisClient(); + } + + Console.WriteLine($"[Kinesis] Consume: Looking for messages in stream {stream}"); + + string shardIterator = null; + DateTime startTime = DateTime.UtcNow; + bool messageFound = false; + + while (!messageFound && DateTime.UtcNow - startTime < TimeSpan.FromMinutes(2)) + { + try + { + if (shardIterator == null) + { + // Get stream description to find shard + var describeResponse = await kinesisClient.DescribeStreamAsync(new DescribeStreamRequest + { + StreamName = stream + }); + + if (describeResponse.StreamDescription.StreamStatus == StreamStatus.ACTIVE) + { + string shardId = describeResponse.StreamDescription.Shards[0].ShardId; + + // Get shard iterator + var shardIteratorResponse = await kinesisClient.GetShardIteratorAsync(new GetShardIteratorRequest + { + StreamName = stream, + ShardId = shardId, + ShardIteratorType = ShardIteratorType.TRIM_HORIZON + }); + + shardIterator = shardIteratorResponse.ShardIterator; + Console.WriteLine($"[Kinesis] Got shard iterator: {shardIterator}"); + } + else + { + await Task.Delay(1000); + continue; + } + } + + // Get records + using (Datadog.Trace.Tracer.Instance.StartActive("KinesisConsume")) + { + var getRecordsResponse = await kinesisClient.GetRecordsAsync(new GetRecordsRequest + { + ShardIterator = shardIterator + }); + + foreach (var record in getRecordsResponse.Records) + { + Console.WriteLine($"[Kinesis] Received record: {record.SequenceNumber}"); + + // Decode the message + string recordData = System.Text.Encoding.UTF8.GetString(record.Data.ToArray()); + Console.WriteLine($"[Kinesis] Record data: {recordData}"); + + try + { + var messageObj = JsonConvert.DeserializeObject(recordData); + string messageStr = messageObj.message; + Console.WriteLine($"[Kinesis] Decoded message: {messageStr}"); + + if (messageStr == expectedMessage) + { + Console.WriteLine($"[Kinesis] Success! Found expected message: {messageStr}"); + messageFound = true; + break; + } + } + catch (Exception e) + { + Console.WriteLine($"[Kinesis] Error parsing message: {e.Message}"); + } + } + + shardIterator = getRecordsResponse.NextShardIterator; + } + + if (!messageFound) + { + await Task.Delay(1000); + } + } + catch (Exception e) + { + Console.WriteLine($"[Kinesis] Error consuming messages: {e.Message}"); + await Task.Delay(1000); + } + } + + // Sleep + await Task.Delay(15000); + + if (!messageFound) + { + Console.WriteLine($"[Kinesis] Did not find expected message '{expectedMessage}' within timeout"); + } + } + } } diff --git a/utils/build/docker/dotnet/weblog/app.csproj b/utils/build/docker/dotnet/weblog/app.csproj index 8c6f4d36bae..b6b44ddcfb0 100644 --- a/utils/build/docker/dotnet/weblog/app.csproj +++ b/utils/build/docker/dotnet/weblog/app.csproj @@ -33,6 +33,7 @@ +