Skip to content

Fix .NET system test for Kinesis #4877

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion manifests/dotnet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ tests/:
Test_DsmContext_Injection_Base64: missing_feature
Test_DsmHttp: missing_feature
Test_DsmKafka: v2.29.0
Test_DsmKinesis: missing_feature
Test_DsmKinesis: v3.19.0
Test_DsmRabbitmq: v2.29.0
Test_DsmRabbitmq_FanoutExchange: missing_feature
Test_DsmRabbitmq_TopicExchange: missing_feature
Expand Down
5 changes: 5 additions & 0 deletions tests/integrations/test_dsm.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,11 @@ def test_dsm_kinesis(self):
tags_in = ("direction:in", f"topic:{self.stream}", "type:kinesis")
producer_hash = 2387568642918822206
consumer_hash = 10101425062685840509
elif context.library == "dotnet":
tags_out = ("direction:out", f"topic:{self.stream}", "type:kinesis")
tags_in = ("direction:in", f"topic:{self.stream}", "type:kinesis")
producer_hash = compute_dsm_hash(0, tags_out)
consumer_hash = compute_dsm_hash(producer_hash, tags_in)
else:
tags_out = ("direction:out", f"topic:{stream_arn}", "type:kinesis")
tags_in = ("direction:in", f"topic:{stream_arn}", "type:kinesis")
Expand Down
227 changes: 227 additions & 0 deletions utils/build/docker/dotnet/weblog/Endpoints/DsmEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@
using Microsoft.AspNetCore.Http;
using System.Collections.Generic;
using System;
using System.IO;
using System.Net;
using System.Globalization;
using System.Threading;
using System.Threading.Tasks;
using Amazon.SQS;
using Amazon.SQS.Model;
using Amazon.Kinesis;
using Amazon.Kinesis.Model;
using RabbitMQ.Client;
using Newtonsoft.Json;

namespace weblog
{
Expand All @@ -25,6 +29,7 @@ public void Register(Microsoft.AspNetCore.Routing.IEndpointRouteBuilder routeBui
string routing_key = context.Request.Query["routing_key"]!;
string group = context.Request.Query["group"]!;
string message = context.Request.Query["message"]!;
string stream = context.Request.Query["stream"]!;

Console.WriteLine("Hello World! Received dsm call with integration " + integration);
if ("kafka".Equals(integration)) {
Expand Down Expand Up @@ -53,6 +58,12 @@ public void Register(Microsoft.AspNetCore.Routing.IEndpointRouteBuilder routeBui
Console.WriteLine($"[SQS] Begin consuming DSM message: {message}");
await Task.Run(() => SqsConsumer.DoWork(queue, message));
await context.Response.WriteAsync("ok");
} else if ("kinesis".Equals(integration)) {
Console.WriteLine($"[Kinesis] Begin producing DSM message: {message}");
await Task.Run(() => KinesisProducer.DoWork(stream, message));
Console.WriteLine($"[Kinesis] Begin consuming DSM message: {message}");
await Task.Run(() => KinesisConsumer.DoWork(stream, message));
await context.Response.WriteAsync("ok");
} else {
await context.Response.WriteAsync("unknown integration: " + integration);
}
Expand Down Expand Up @@ -253,4 +264,220 @@ public static async Task DoWork(string queue, string message)
}
}
}

class KinesisProducer
{
public static async Task DoWork(string stream, string message)
{
string awsUrl = Environment.GetEnvironmentVariable("SYSTEM_TESTS_AWS_URL");

IAmazonKinesis kinesisClient;
if (!string.IsNullOrEmpty(awsUrl))
{
// If SYSTEM_TESTS_AWS_URL is set, use it for ServiceURL
var config = new AmazonKinesisConfig { ServiceURL = awsUrl };
kinesisClient = new AmazonKinesisClient(config);
}
else
{
// If SYSTEM_TESTS_AWS_URL is not set, create a default client
kinesisClient = new AmazonKinesisClient();
}

// Create stream
Console.WriteLine($"[Kinesis] Produce: Creating stream {stream}");
try
{
await kinesisClient.CreateStreamAsync(new CreateStreamRequest
{
StreamName = stream,
ShardCount = 1
});
Console.WriteLine($"[Kinesis] Created stream {stream}");
}
catch (Exception e)
{
Console.WriteLine($"[Kinesis] Error creating stream (may already exist): {e.Message}");
}

// Wait for stream to be active
bool streamActive = false;
DateTime startTime = DateTime.UtcNow;
while (!streamActive && DateTime.UtcNow - startTime < TimeSpan.FromMinutes(2))
{
try
{
var describeResponse = await kinesisClient.DescribeStreamAsync(new DescribeStreamRequest
{
StreamName = stream
});

if (describeResponse.StreamDescription.StreamStatus == StreamStatus.ACTIVE)
{
streamActive = true;
Console.WriteLine($"[Kinesis] Stream {stream} is now active");
}
else
{
await Task.Delay(1000);
}
}
catch (Exception e)
{
Console.WriteLine($"[Kinesis] Error describing stream: {e.Message}");
await Task.Delay(1000);
}
}

if (!streamActive)
{
Console.WriteLine($"[Kinesis] Stream {stream} did not become active in time");
return;
}

// Prepare message as JSON (matching Python implementation)
var messageObj = new { message = message };
string jsonMessage = JsonConvert.SerializeObject(messageObj);
byte[] messageBytes = System.Text.Encoding.UTF8.GetBytes(jsonMessage);

using (Datadog.Trace.Tracer.Instance.StartActive("KinesisProduce"))
{
try
{
var putRecordRequest = new PutRecordRequest
{
StreamName = stream,
Data = new MemoryStream(messageBytes),
PartitionKey = "1"
};

var putRecordResponse = await kinesisClient.PutRecordAsync(putRecordRequest);
Console.WriteLine($"[Kinesis] Successfully produced message to stream {stream}. Sequence number: {putRecordResponse.SequenceNumber}");
}
catch (Exception e)
{
Console.WriteLine($"[Kinesis] Error producing message: {e.Message}");
}
}
}
}

class KinesisConsumer
{
public static async Task DoWork(string stream, string expectedMessage)
{
string awsUrl = Environment.GetEnvironmentVariable("SYSTEM_TESTS_AWS_URL");

IAmazonKinesis kinesisClient;
if (!string.IsNullOrEmpty(awsUrl))
{
// If SYSTEM_TESTS_AWS_URL is set, use it for ServiceURL
var config = new AmazonKinesisConfig { ServiceURL = awsUrl };
kinesisClient = new AmazonKinesisClient(config);
}
else
{
// If SYSTEM_TESTS_AWS_URL is not set, create a default client
kinesisClient = new AmazonKinesisClient();
}

Console.WriteLine($"[Kinesis] Consume: Looking for messages in stream {stream}");

string shardIterator = null;
DateTime startTime = DateTime.UtcNow;
bool messageFound = false;

while (!messageFound && DateTime.UtcNow - startTime < TimeSpan.FromMinutes(2))
{
try
{
if (shardIterator == null)
{
// Get stream description to find shard
var describeResponse = await kinesisClient.DescribeStreamAsync(new DescribeStreamRequest
{
StreamName = stream
});

if (describeResponse.StreamDescription.StreamStatus == StreamStatus.ACTIVE)
{
string shardId = describeResponse.StreamDescription.Shards[0].ShardId;

// Get shard iterator
var shardIteratorResponse = await kinesisClient.GetShardIteratorAsync(new GetShardIteratorRequest
{
StreamName = stream,
ShardId = shardId,
ShardIteratorType = ShardIteratorType.TRIM_HORIZON
});

shardIterator = shardIteratorResponse.ShardIterator;
Console.WriteLine($"[Kinesis] Got shard iterator: {shardIterator}");
}
else
{
await Task.Delay(1000);
continue;
}
}

// Get records
using (Datadog.Trace.Tracer.Instance.StartActive("KinesisConsume"))
{
var getRecordsResponse = await kinesisClient.GetRecordsAsync(new GetRecordsRequest
{
ShardIterator = shardIterator
});

foreach (var record in getRecordsResponse.Records)
{
Console.WriteLine($"[Kinesis] Received record: {record.SequenceNumber}");

// Decode the message
string recordData = System.Text.Encoding.UTF8.GetString(record.Data.ToArray());
Console.WriteLine($"[Kinesis] Record data: {recordData}");

try
{
var messageObj = JsonConvert.DeserializeObject<dynamic>(recordData);
string messageStr = messageObj.message;
Console.WriteLine($"[Kinesis] Decoded message: {messageStr}");

if (messageStr == expectedMessage)
{
Console.WriteLine($"[Kinesis] Success! Found expected message: {messageStr}");
messageFound = true;
break;
}
}
catch (Exception e)
{
Console.WriteLine($"[Kinesis] Error parsing message: {e.Message}");
}
}

shardIterator = getRecordsResponse.NextShardIterator;
}

if (!messageFound)
{
await Task.Delay(1000);
}
}
catch (Exception e)
{
Console.WriteLine($"[Kinesis] Error consuming messages: {e.Message}");
await Task.Delay(1000);
}
}

// Sleep
await Task.Delay(15000);

if (!messageFound)
{
Console.WriteLine($"[Kinesis] Did not find expected message '{expectedMessage}' within timeout");
}
}
}
}
1 change: 1 addition & 0 deletions utils/build/docker/dotnet/weblog/app.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

<ItemGroup>
<PackageReference Include="AWSSDK.SQS" Version="3.7.300.52"/>
<PackageReference Include="AWSSDK.Kinesis" Version="3.7.300.52"/>
<PackageReference Include="Google.Protobuf" Version="3.25.3" />
<PackageReference Include="MySql.Data" Version="8.0.30"/>
<PackageReference Include="Npgsql" Version="4.0.10"/>
Expand Down
Loading