Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Akka.Streams.Kafka.sln
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventHub.Consumer", "exampl
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Streams.Kafka.Benchmark", "src\Akka.Streams.Kafka.Benchmark\Akka.Streams.Kafka.Benchmark.csproj", "{1BF52F4E-93FA-40C4-8985-C21D779C7997}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Streams.Kafka.Testkit", "src\Akka.Streams.Kafka.Testkit\Akka.Streams.Kafka.Testkit.csproj", "{49F2E094-BB98-41D8-9B3D-CF6557DCC420}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -62,6 +64,10 @@ Global
{1BF52F4E-93FA-40C4-8985-C21D779C7997}.Debug|Any CPU.Build.0 = Debug|Any CPU
{1BF52F4E-93FA-40C4-8985-C21D779C7997}.Release|Any CPU.ActiveCfg = Release|Any CPU
{1BF52F4E-93FA-40C4-8985-C21D779C7997}.Release|Any CPU.Build.0 = Release|Any CPU
{49F2E094-BB98-41D8-9B3D-CF6557DCC420}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{49F2E094-BB98-41D8-9B3D-CF6557DCC420}.Debug|Any CPU.Build.0 = Debug|Any CPU
{49F2E094-BB98-41D8-9B3D-CF6557DCC420}.Release|Any CPU.ActiveCfg = Release|Any CPU
{49F2E094-BB98-41D8-9B3D-CF6557DCC420}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
25 changes: 25 additions & 0 deletions src/Akka.Streams.Kafka.Testkit/Akka.Streams.Kafka.Testkit.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<Project Sdk="Microsoft.NET.Sdk">
<Import Project="..\common.props" />

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<LangVersion>8.0</LangVersion>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Akka.Streams.TestKit" Version="$(AkkaVersion)" />
<PackageReference Include="Akka.TestKit.Xunit2" Version="$(AkkaVersion)" />
<PackageReference Include="Docker.DotNet" Version="3.125.5" />
<PackageReference Include="DotNet.Testcontainers" Version="1.6.0-beta.2028" />
<PackageReference Include="FakeItEasy" Version="7.2.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Akka.Streams.Kafka\Akka.Streams.Kafka.csproj" />
</ItemGroup>

<ItemGroup>
<None Remove="Resources\reference.conf" />
<EmbeddedResource Include="Resources\reference.conf" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
using Akka.Streams.Kafka.Messages;
using Akka.Streams.Kafka.Stages.Consumers;

namespace Akka.Streams.Kafka.Tests.TestKit.Internal
namespace Akka.Streams.Kafka.Testkit
{
public static class ConsumerResultFactory
{
Expand Down
55 changes: 55 additions & 0 deletions src/Akka.Streams.Kafka.Testkit/Dsl/ConsumerControlFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.Streams.Kafka.Helpers;

namespace Akka.Streams.Kafka.Testkit.Dsl
{
public static class ConsumerControlFactory
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Helper functions for consumer controls

{
public static Source<TA, IControl> AttachControl<TA, TB>(Source<TA, TB> source)
=> source.ViaMaterialized(ControlFlow<TA>(), Keep.Right);

public static Flow<TA, TA, IControl> ControlFlow<TA>()
=> Flow.Create<TA>()
.ViaMaterialized(KillSwitches.Single<TA>(), Keep.Right)
.MapMaterializedValue(Control);

public static IControl Control(IKillSwitch killSwitch)
=> new FakeControl(killSwitch);

public class FakeControl : IControl
{
private readonly IKillSwitch _killSwitch;
private readonly TaskCompletionSource<Done> _shutdownPromise;

public FakeControl(IKillSwitch killSwitch)
{
_killSwitch = killSwitch;
_shutdownPromise = new TaskCompletionSource<Done>();
}

public Task Stop()
{
_killSwitch.Shutdown();
_shutdownPromise.SetResult(Done.Instance);
return _shutdownPromise.Task;
}

public Task Shutdown()
{
_killSwitch.Shutdown();
_shutdownPromise.SetResult(Done.Instance);
return _shutdownPromise.Task;
}

public Task IsShutdown => _shutdownPromise.Task;

public Task<TResult> DrainAndShutdown<TResult>(Task<TResult> streamCompletion)
{
_killSwitch.Shutdown();
_shutdownPromise.SetResult(Done.Instance);
return Task.FromResult(default(TResult));
}
}
}
}
177 changes: 177 additions & 0 deletions src/Akka.Streams.Kafka.Testkit/Dsl/KafkaSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor.Setup;
using Akka.Streams.Dsl;
using Akka.Streams.Kafka.Dsl;
using Akka.Streams.Kafka.Helpers;
using Akka.Streams.Kafka.Messages;
using Akka.Streams.Kafka.Settings;
using Akka.Streams.Kafka.Testkit.Internal;
using Akka.Streams.TestKit;
using Akka.Util;
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using Xunit;
using Xunit.Abstractions;
using Config = Akka.Configuration.Config;

namespace Akka.Streams.Kafka.Testkit.Dsl
{
public abstract class KafkaSpec : KafkaTestKit, IAsyncLifetime
{
protected KafkaSpec(string config, string actorSystemName = null, ITestOutputHelper output = null) : base(config, actorSystemName, output)
{
}

protected KafkaSpec(Config config, string actorSystemName = null, ITestOutputHelper output = null) : base(config, actorSystemName, output)
{
}

protected KafkaSpec(ActorSystemSetup config, string actorSystemName = null, ITestOutputHelper output = null) : base(config, actorSystemName, output)
{
}

protected IProducer<string, string> TestProducer { get; private set; }


public virtual Task InitializeAsync()
{
TestProducer = ProducerDefaults().CreateKafkaProducer();
SetUpAdminClient();
return Task.CompletedTask;
}

public virtual Task DisposeAsync()
{
TestProducer?.Dispose();
CleanUpAdminClient();
Shutdown();
return Task.CompletedTask;
}

protected void Sleep(TimeSpan time, string msg)
{
Log.Debug($"Sleeping {time}: {msg}");
Thread.Sleep(time);
}

protected List<T> AwaitMultiple<T>(TimeSpan timeout, IEnumerable<Task<T>> tasks)
{
var completedTasks = new List<Task<T>>();
using (var cts = new CancellationTokenSource(timeout))
{
var waitingTasks = tasks.ToList();
while (waitingTasks.Count > 0)
{
var anyTask = Task.WhenAny(waitingTasks);
try
{
anyTask.Wait(cts.Token);
}
catch (Exception e)
{
throw new Exception($"AwaitMultiple failed. Exception: {e.Message}", e);
}

var completedTask = anyTask.Result;
waitingTasks.Remove(completedTask);
completedTasks.Add(completedTask);
}
}

return completedTasks.Select(t => t.Result).ToList();
}

protected TimeSpan SleepAfterProduce => TimeSpan.FromSeconds(4);

protected void AwaitProduce(IEnumerable<Task<Done>> tasks)
{
AwaitMultiple(TimeSpan.FromSeconds(4), tasks);
Sleep(SleepAfterProduce, "to be sure producing has happened");
}

protected readonly Partition Partition0 = new Partition(0);

// Not implemented
[Obsolete("Kafka DescribeCluster API isn't supported by the .NET driver")]
protected void WaitUntilCluster(Func<object, bool> predicate)
=> Checks.WaitUntilCluster(Settings.ClusterTimeout, Settings.CheckInterval, AdminClient, predicate, Log);

protected void WaitUntilConsumerGroup(string groupId, Func<GroupInfo, bool> predicate)
=> Checks.WaitUntilConsumerGroup(
groupId: groupId,
timeout: Settings.ConsumerGroupTimeout,
sleepInBetween: Settings.CheckInterval,
adminClient: AdminClient,
predicate: predicate,
log: Log);

protected void WaitUntilConsumerSummary(string groupId, Func<List<GroupMemberInfo>, bool> predicate)
=> WaitUntilConsumerGroup(groupId, info =>
{
return info.State == "Stable" && Try<bool>.From(() => predicate(info.Members)).OrElse(false).Success.Value;
});

protected ImmutableList<string> CreateTopics(IEnumerable<int> topics)
=> CreateTopicsAsync(topics).Result;

protected async Task<ImmutableList<string>> CreateTopicsAsync(IEnumerable<int> topics)
{
var topicNames = topics.Select(CreateTopicName).ToImmutableList();
var configs = new Dictionary<string, string>();
var newTopics = topicNames.Select(topic =>
new TopicSpecification
{
Name = topic,
NumPartitions = 1,
ReplicationFactor = 1,
Configs = configs
});
await AdminClient.CreateTopicsAsync(
topics: newTopics,
options: new CreateTopicsOptions {RequestTimeout = TimeSpan.FromSeconds(10)});
return topicNames;
}

protected void PeriodicalCheck<T>(string description, int maxTries, TimeSpan sleepInBetween, Func<T> data, Func<T, bool> predicate)
=> Checks.PeriodicalCheck(description, new TimeSpan(sleepInBetween.Ticks * maxTries), sleepInBetween, data, predicate, Log);

/// <summary>
/// Produce messages to topic using specified range and return a Future so the caller can synchronize consumption.
/// </summary>
protected Task Produce(string topic, IEnumerable<int> range, int? partition = null)
=> ProduceString(topic, range.Select(i => i.ToString()), partition);

protected Task ProduceString(string topic, IEnumerable<string> range, int? partition = null)
{
partition ??= Partition0;
return Source.From(range)
// NOTE: If no partition is specified but a key is present a partition will be chosen
// using a hash of the key. If neither key nor partition is present a partition
// will be assigned in a round-robin fashion.
.Select(n => new ProducerRecord<string, string>(topic, partition, DefaultKey, n))
.RunWith(KafkaProducer.PlainSink(ProducerDefaults().WithProducer(TestProducer)), Sys.Materializer());
}

protected Task ProduceTimestamped(string topic, IEnumerable<(int, long)> timestampedRange)
=> Source.From(timestampedRange)
.Select( tuple =>
{
var (n, ts) = tuple;
return new ProducerRecord<string, string>(topic, Partition0, ts, DefaultKey, n.ToString());
})
.RunWith(KafkaProducer.PlainSink(ProducerDefaults().WithProducer(TestProducer)), Sys.Materializer());

protected (IControl, TestSubscriber.Probe<string>) CreateProbe(
ConsumerSettings<string, string> consumerSettings,
string[] topics)
=> KafkaConsumer.PlainSource(consumerSettings, Subscriptions.Topics(topics))
.Select(s => s.Message.Value)
.ToMaterialized(this.SinkProbe<string>(), Keep.Both)
.Run(Sys.Materializer());
}
}
91 changes: 91 additions & 0 deletions src/Akka.Streams.Kafka.Testkit/Extensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Streams.Implementation;
using Akka.Util.Internal;

namespace Akka.Streams.Kafka.Testkit
{
public static class Extensions
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved from Kafka.Test

{
public static async Task WithTimeout(this Task task, TimeSpan timeout)
{
using (var cts = new CancellationTokenSource())
{
var timeoutTask = Task.Delay(timeout, cts.Token);
var completed = await Task.WhenAny(task, timeoutTask);
if (completed == timeoutTask)
throw new OperationCanceledException("Operation timed out");
else
cts.Cancel();
}
}

public static List<List<T>> Grouped<T>(this IEnumerable<T> messages, int size)
{
var groups = new List<List<T>>();
var list = new List<T>();
var index = 0;
foreach (var message in messages)
{
list.Add(message);
if(index != 0 && index % size == 0)
{
groups.Add(list);
list = new List<T>();
}

index++;
}
if(list.Count > 0)
groups.Add(list);
return groups;
}

public static void AssertAllStagesStopped(this Akka.TestKit.Xunit2.TestKit spec, Action block, IMaterializer materializer)
{
AssertAllStagesStopped(spec, () =>
{
block();
return NotUsed.Instance;
}, materializer);
}

public static T AssertAllStagesStopped<T>(this Akka.TestKit.Xunit2.TestKit spec, Func<T> block, IMaterializer materializer)
{
if (!(materializer is ActorMaterializerImpl impl))
return block();

var probe = spec.CreateTestProbe(impl.System);
probe.Send(impl.Supervisor, StreamSupervisor.StopChildren.Instance);
probe.ExpectMsg<StreamSupervisor.StoppedChildren>();
var result = block();

probe.Within(TimeSpan.FromSeconds(5), () =>
{
IImmutableSet<IActorRef> children = ImmutableHashSet<IActorRef>.Empty;
try
{
probe.AwaitAssert(() =>
{
impl.Supervisor.Tell(StreamSupervisor.GetChildren.Instance, probe.Ref);
children = probe.ExpectMsg<StreamSupervisor.Children>().Refs;
if (children.Count != 0)
throw new Exception($"expected no StreamSupervisor children, but got {children.Aggregate("", (s, @ref) => s + @ref + ", ")}");
});
}
catch
{
children.ForEach(c=>c.Tell(StreamSupervisor.PrintDebugDump.Instance));
throw;
}
});

return result;
}
}
}
Loading