2626import java .time .format .DateTimeFormatter ;
2727import java .util .Arrays ;
2828import java .util .HashMap ;
29- import java .util .List ;
29+ import java .util .HashSet ;
3030import java .util .Map ;
31- import java .util .Properties ;
31+ import java .util .Set ;
3232import java .util .UUID ;
3333import java .util .concurrent .ExecutionException ;
3434import java .util .concurrent .Future ;
3535
36- import org .apache .kafka .clients .admin .AdminClient ;
37- import org .apache .kafka .clients .admin .AdminClientConfig ;
38- import org .apache .kafka .clients .admin .NewTopic ;
36+ import io .aiven .commons .kafka .testkit .KafkaIntegrationTestBase ;
37+ import io .aiven .commons .kafka .testkit .KafkaManager ;
3938import org .apache .kafka .clients .producer .KafkaProducer ;
4039import org .apache .kafka .clients .producer .ProducerConfig ;
4140import org .apache .kafka .clients .producer .ProducerRecord ;
4241import org .apache .kafka .clients .producer .RecordMetadata ;
4342
43+ import io .aiven .kafka .connect .azure .source .AzureBlobSourceConnector ;
4444import io .aiven .kafka .connect .common .config .CompressionType ;
4545
4646import com .azure .storage .blob .BlobContainerAsyncClient ;
4747import com .azure .storage .blob .BlobServiceClient ;
4848import com .azure .storage .blob .BlobServiceClientBuilder ;
49- import com .github .dockerjava .api .model .Ulimit ;
5049import org .junit .jupiter .api .AfterEach ;
5150import org .junit .jupiter .api .BeforeAll ;
51+ import org .junit .jupiter .api .BeforeEach ;
5252import org .testcontainers .containers .FixedHostPortGenericContainer ;
5353import org .testcontainers .containers .GenericContainer ;
54- import org .testcontainers .containers .KafkaContainer ;
55- import org .testcontainers .containers .Network ;
5654import org .testcontainers .junit .jupiter .Container ;
5755import org .testcontainers .junit .jupiter .Testcontainers ;
5856
5957@ SuppressWarnings ({ "deprecation" , "PMD.TestClassWithoutTestCases" })
6058@ Testcontainers
61- class AbstractIntegrationTest <K , V > {
59+ class AbstractIntegrationTest <K , V > extends KafkaIntegrationTestBase {
6260 protected final String testTopic0 ;
6361 protected final String testTopic1 ;
6462
65- private AdminClient adminClient ;
66- private ConnectRunner connectRunner ;
63+ private KafkaManager kafkaManager ;
64+
6765 private KafkaProducer <K , V > producer ;
6866
6967 protected static final int OFFSET_FLUSH_INTERVAL_MS = 5000 ;
@@ -85,6 +83,7 @@ class AbstractIntegrationTest<K, V> {
8583 private static final String AZURE_ENDPOINT = "http://127.0.0.1:10000" ;
8684 private static final String ACCOUNT_NAME = "devstoreaccount1" ;
8785 private static final String ACCOUNT_KEY = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" ;
86+ private static final Set <String > CONNECTOR_NAMES = new HashSet <>();
8887
8988 @ Container
9089 private static final GenericContainer <?> AZURITE_CONTAINER = new FixedHostPortGenericContainer <>( // NOPMD
@@ -94,15 +93,9 @@ class AbstractIntegrationTest<K, V> {
9493 .withFixedExposedPort (AZURE_TABLE_PORT , AZURE_TABLE_PORT )
9594 .withCommand ("azurite --blobHost 0.0.0.0 --queueHost 0.0.0.0 --tableHost 0.0.0.0" )
9695 .withReuse (true );
97- @ Container
98- protected static final KafkaContainer KAFKA = new KafkaContainer ("7.1.0" )
99- .withEnv ("KAFKA_AUTO_CREATE_TOPICS_ENABLE" , "false" )
100- .withNetwork (Network .newNetwork ())
101- .withExposedPorts (KafkaContainer .KAFKA_PORT , 9092 )
102- .withCreateContainerCmdModifier (
103- cmd -> cmd .getHostConfig ().withUlimits (List .of (new Ulimit ("nofile" , 30_000L , 30_000L ))));
10496
10597 protected AbstractIntegrationTest () {
98+ super ();
10699 testTopic0 = "test-topic-0-" + UUID .randomUUID ();
107100 testTopic1 = "test-topic-1-" + UUID .randomUUID ();
108101 }
@@ -144,13 +137,16 @@ static void setUpAll() throws IOException, InterruptedException {
144137 assert process .waitFor () == 0 ;
145138 }
146139
140+ @ BeforeEach
141+ void setupKafka () throws IOException {
142+ kafkaManager = setupKafka (true , AzureBlobSourceConnector .class );
143+ }
144+
147145 @ AfterEach
148146 void tearDown () {
149- connectRunner .stop ();
150- adminClient .close ();
151147 producer .close ();
152148 testBlobAccessor .clear (azurePrefix );
153- connectRunner . awaitStop ( );
149+ CONNECTOR_NAMES . forEach ( kafkaManager :: deleteConnector );
154150 }
155151
156152 protected BlobContainerAsyncClient getAsyncContainerClient () {
@@ -197,27 +193,20 @@ protected Future<RecordMetadata> sendMessageAsync(final String topicName, final
197193 return producer .send (msg );
198194 }
199195
200- protected ConnectRunner getConnectRunner () {
201- return connectRunner ;
202- }
203-
204196 protected void startConnectRunner (final Map <String , Object > testSpecificProducerProperties )
205197 throws ExecutionException , InterruptedException {
206198 testBlobAccessor .clear (azurePrefix );
207199
208- final Properties adminClientConfig = new Properties ();
209- adminClientConfig .put (AdminClientConfig .BOOTSTRAP_SERVERS_CONFIG , KAFKA .getBootstrapServers ());
210- adminClient = AdminClient .create (adminClientConfig );
200+ kafkaManager .createTopics (Arrays .asList (testTopic0 , testTopic1 ));
211201
212202 final Map <String , Object > producerProps = new HashMap <>(testSpecificProducerProperties );
213- producerProps .put (ProducerConfig .BOOTSTRAP_SERVERS_CONFIG , KAFKA . getBootstrapServers ());
203+ producerProps .put (ProducerConfig .BOOTSTRAP_SERVERS_CONFIG , kafkaManager . bootstrapServers ());
214204 producer = new KafkaProducer <>(producerProps );
215205
216- final NewTopic newTopic0 = new NewTopic (testTopic0 , 4 , (short ) 1 );
217- final NewTopic newTopic1 = new NewTopic (testTopic1 , 4 , (short ) 1 );
218- adminClient .createTopics (Arrays .asList (newTopic0 , newTopic1 )).all ().get ();
206+ }
219207
220- connectRunner = new ConnectRunner (pluginDir , KAFKA .getBootstrapServers (), OFFSET_FLUSH_INTERVAL_MS );
221- connectRunner .start ();
208+ protected void createConnector (final Map <String , String > connectorConfig ) {
209+ CONNECTOR_NAMES .add (connectorConfig .get ("name" ));
210+ kafkaManager .configureConnector (connectorConfig .get ("name" ), connectorConfig );
222211 }
223212}
0 commit comments